From 785a1f49f5aa68b8dd46cc630f4d72e8d7bf476c Mon Sep 17 00:00:00 2001 From: yujun Date: Tue, 25 Jun 2024 13:35:01 +0800 Subject: [PATCH] [fix](txn) Fix coordidator be restart not abort txn #35342 (#36437) cherry pick from #35342 --- .../stream_load/stream_load_executor.cpp | 4 + .../doris/analysis/NativeInsertStmt.java | 5 +- .../apache/doris/httpv2/rest/LoadAction.java | 6 + .../java/org/apache/doris/load/DeleteJob.java | 6 +- .../doris/load/loadv2/BrokerLoadJob.java | 5 +- .../doris/load/loadv2/SparkLoadJob.java | 5 +- .../load/routineload/RoutineLoadTaskInfo.java | 5 +- .../load/sync/canal/CanalSyncChannel.java | 7 +- .../plans/commands/insert/InsertUtils.java | 8 +- .../commands/insert/OlapInsertExecutor.java | 5 +- .../org/apache/doris/qe/StmtExecutor.java | 8 +- .../doris/service/FrontendServiceImpl.java | 29 +++- .../org/apache/doris/system/HeartbeatMgr.java | 13 +- .../transaction/DatabaseTransactionMgr.java | 7 +- .../transaction/GlobalTransactionMgr.java | 36 +++-- .../doris/transaction/TransactionEntry.java | 5 +- .../doris/transaction/TransactionState.java | 15 +- .../DatabaseTransactionMgrTest.java | 10 +- .../transaction/GlobalTransactionMgrTest.java | 11 +- .../transaction/TransactionStateTest.java | 5 +- gensrc/thrift/FrontendService.thrift | 2 + .../doris/regression/suite/Suite.groovy | 27 ++++ .../suites/demo_p0/streamLoad_action.groovy | 5 + .../test_coordidator_be_restart.groovy | 135 ++++++++++++++++++ 24 files changed, 318 insertions(+), 46 deletions(-) create mode 100644 regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 58621c77a2..051aca3e13 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -45,6 +45,7 @@ #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" #include "thrift/protocol/TDebugProtocol.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/thrift_rpc_helper.h" #include "util/time.h" @@ -174,6 +175,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { request.__set_timeout(ctx->timeout_second); } request.__set_request_id(ctx->id.to_thrift()); + request.__set_backend_id(_exec_env->master_info()->backend_id); TLoadTxnBeginResult result; Status status; @@ -309,6 +311,8 @@ void StreamLoadExecutor::get_commit_request(StreamLoadContext* ctx, } Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { + DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK); + DorisMetrics::instance()->stream_load_txn_commit_request_total->increment(1); TLoadTxnCommitRequest request; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 73adf7d03f..988447eea4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -58,6 +58,7 @@ import org.apache.doris.planner.OlapTableSink; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TUniqueId; @@ -405,7 +406,9 @@ public class NativeInsertStmt extends InsertStmt { LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING; transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(), Lists.newArrayList(targetTable.getId()), label.getLabelName(), - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); } isTransactionBegin = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 6be5654a2e..259062a828 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -24,6 +24,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; @@ -323,6 +324,11 @@ public class LoadAction extends RestBaseController { } private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException { + long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L); + if (debugBackendId != -1L) { + Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId); + return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); + } Backend backend = null; BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index f2c77df53b..f9c94284db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -50,6 +50,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PartitionPruner; import org.apache.doris.planner.RangePartitionPrunerV2; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -283,8 +284,9 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ public long beginTxn() throws Exception { long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(), Lists.newArrayList(deleteInfo.getTableId()), label, null, - new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.FRONTEND, id, Config.stream_load_default_timeout_second); this.signature = txnId; Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index de32abbbb3..553727c60b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -49,6 +49,7 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -108,7 +109,9 @@ public class BrokerLoadJob extends BulkLoadJob { QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index b8ff96394d..9aac4b3655 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -61,6 +61,7 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.sparkdpp.DppResult; import org.apache.doris.sparkdpp.EtlJobConfig; @@ -198,7 +199,9 @@ public class SparkLoadJob extends BulkLoadJob { QuotaExceedException, MetaNotFoundException { transactionId = Env.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.FRONTEND, id, getTimeout()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index cdee942f40..d101d98cf8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -27,6 +27,7 @@ import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; @@ -199,7 +200,9 @@ public abstract class RoutineLoadTaskInfo { try { txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(), Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(), timeoutMs / 1000); } catch (DuplicatedRequestException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 73cfc77f0c..ee94a068c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -32,6 +32,7 @@ import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.model.Data; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.InsertStreamTxnExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.SyncTask; import org.apache.doris.task.SyncTaskPool; @@ -132,8 +133,10 @@ public class CanalSyncChannel extends SyncChannel { try { long txnId = globalTransactionMgr.beginTransaction(db.getId(), Lists.newArrayList(tbl.getId()), label, - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, - FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond); + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), + sourceType, timeoutSecond); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); request = new TStreamLoadPutRequest() .setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index cabdfc203e..051550cdfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -58,6 +58,7 @@ import org.apache.doris.qe.InsertStreamTxnExecutor; import org.apache.doris.qe.MasterTxnExecutor; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -190,9 +191,10 @@ public class InsertUtils { String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) { txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); } else { MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 579e04b8e0..22f943abee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -43,6 +43,7 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TOlapTableLocationParam; import org.apache.doris.thrift.TPartitionType; @@ -104,7 +105,9 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { } else { this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), ImmutableList.of(table.getId()), labelName, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5bf75b99bf..6edceff76d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -178,6 +178,7 @@ import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; @@ -2051,9 +2052,10 @@ public class StmtExecutor { String label = txnEntry.getLabel(); if (Env.getCurrentEnv().isMaster()) { long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), - label, new TransactionState.TxnCoordinator( - TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label, + new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), sourceType, timeoutSecond); txnConf.setTxnId(txnId); String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index af86660be2..d99a4a316a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1182,7 +1182,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP); // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; - TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, clientIp); + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); if (request.isSetToken()) { txnCoord.isFromInternal = true; } @@ -1290,10 +1292,12 @@ public class FrontendServiceImpl implements FrontendService.Iface { // step 5: get timeout long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + long startTime = backend != null ? backend.getLastStartTime() : 0; + TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime); // step 6: begin transaction long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - db.getId(), tableIdList, request.getLabel(), request.getRequestId(), - new TxnCoordinator(TxnSourceType.BE, clientIp), + db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord, TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond); // step 7: return result @@ -2113,6 +2117,25 @@ public class FrontendServiceImpl implements FrontendService.Iface { httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode); httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode); httpStreamParams.getParams().setNumLocalSink(1); + + TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState( + httpStreamParams.getDb().getId(), httpStreamParams.getTxnId()); + if (txnState == null) { + LOG.warn("Not found http stream related txn, txn id = {}", httpStreamParams.getTxnId()); + } else { + TxnCoordinator txnCoord = txnState.getCoordinator(); + Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); + if (backend != null) { + // only modify txnCoord in memory, not write editlog yet. + txnCoord.sourceType = TxnSourceType.BE; + txnCoord.id = backend.getId(); + txnCoord.ip = backend.getHost(); + txnCoord.startTime = backend.getLastStartTime(); + LOG.info("Change http stream related txn {} to coordinator {}", + httpStreamParams.getTxnId(), txnCoord); + } + } + result.setParams(httpStreamParams.getParams()); result.getParams().setDbName(httpStreamParams.getDb().getFullName()); result.getParams().setTableName(httpStreamParams.getTable().getName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java index 5d17846476..9d13218ae0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java @@ -170,14 +170,21 @@ public class HeartbeatMgr extends MasterDaemon { BackendHbResponse hbResponse = (BackendHbResponse) response; Backend be = nodeMgr.getBackend(hbResponse.getBeId()); if (be != null) { + long oldStartTime = be.getLastStartTime(); boolean isChanged = be.handleHbResponse(hbResponse, isReplay); - if (hbResponse.getStatus() != HbStatus.OK) { + if (hbResponse.getStatus() == HbStatus.OK) { + long newStartTime = be.getLastStartTime(); + if (!isReplay && oldStartTime != newStartTime) { + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart( + be.getId(), be.getHost(), newStartTime); + } + } else { // invalid all connections cached in ClientPool ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort())); if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs() >= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) { - Env.getCurrentGlobalTransactionMgr() - .abortTxnWhenCoordinateBeDown(be.getHost(), 100); + Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown( + be.getId(), be.getHost(), 100); } } return isChanged; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 3f82461ee4..efcc760f0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1828,13 +1828,16 @@ public class DatabaseTransactionMgr { return null; } - public List> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + public List> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList> txnInfos = new ArrayList<>(); readLock(); try { idToRunningTransactionState.values().stream() .filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE - && t.getCoordinator().ip.equals(coordinateHost))) + && t.getTransactionStatus() == TransactionStatus.PREPARE + && t.getCoordinator().ip.equals(coordinateHost) + && (t.getCoordinator().id == 0 || t.getCoordinator().id == coordinateBeId))) .limit(limit) .forEach(t -> txnInfos.add(Pair.of(t.getDbId(), t.getTransactionId()))); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index ae331d2b75..dd62c02f40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -666,10 +666,12 @@ public class GlobalTransactionMgr implements Writable { } } - public List> getTransactionIdByCoordinateBe(String coordinateHost, int limit) { + private List> getPrepareTransactionIdByCoordinateBe(long coordinateBeId, + String coordinateHost, int limit) { ArrayList> txnInfos = new ArrayList<>(); for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) { - txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit)); + txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe( + coordinateBeId, coordinateHost, limit)); if (txnInfos.size() > limit) { break; } @@ -677,19 +679,33 @@ public class GlobalTransactionMgr implements Writable { return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos; } - /** - * If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout - * So when FE identify the Coordinate BE is down, FE should cancel it initiative - */ - public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) { - List> transactionIdByCoordinateBe = getTransactionIdByCoordinateBe(coordinateHost, limit); + public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) { + List> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, Integer.MAX_VALUE); for (Pair txnInfo : transactionIdByCoordinateBe) { try { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second); - if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) { - continue; + long coordStartTime = transactionState.getCoordinator().startTime; + if (coordStartTime < beStartTime) { + dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null); } + } catch (UserException e) { + LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); + } + } + } + + /** + * If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout + * So when FE identify the Coordinate BE is down, FE should cancel it initiative + */ + public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) { + List> transactionIdByCoordinateBe + = getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit); + for (Pair txnInfo : transactionIdByCoordinateBe) { + try { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first); dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null); } catch (UserException e) { LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 43e6585e58..816740a320 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -32,6 +32,7 @@ import org.apache.doris.proto.Types; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.InsertStreamTxnExecutor; import org.apache.doris.qe.MasterTxnExecutor; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TTabletCommitInfo; @@ -175,7 +176,9 @@ public class TransactionEntry { if (!isTransactionBegan) { this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), Lists.newArrayList(table.getId()), label, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + new TxnCoordinator(TxnSourceType.FE, 0, + FrontendOptions.getLocalHostAddress(), + ExecuteEnv.getInstance().getStartupTime()), LoadJobSourceType.INSERT_STREAMING, ConnectContext.get().getExecTimeout()); this.isTransactionBegan = true; this.database = database; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index b740141667..5955e1c08d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -162,8 +162,14 @@ public class TransactionState implements Writable { public static class TxnCoordinator { @SerializedName(value = "sourceType") public TxnSourceType sourceType; + // backendId for backend, 0 for frontend + @SerializedName(value = "id") + public long id = 0; @SerializedName(value = "ip") public String ip; + // frontend/backend start time + @SerializedName(value = "st") + public long startTime = 0; // True if this txn if created by system(such as writing data to audit table) @SerializedName(value = "ii") public boolean isFromInternal = false; @@ -171,9 +177,11 @@ public class TransactionState implements Writable { public TxnCoordinator() { } - public TxnCoordinator(TxnSourceType sourceType, String ip) { + public TxnCoordinator(TxnSourceType sourceType, long id, String ip, long startTime) { this.sourceType = sourceType; + this.id = id; this.ip = ip; + this.startTime = startTime; } @Override @@ -304,7 +312,8 @@ public class TransactionState implements Writable { this.transactionId = -1; this.label = ""; this.idToTableCommitInfos = Maps.newHashMap(); - this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); // mocked, to avoid NPE + // mocked, to avoid NPE + this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, "127.0.0.1", System.currentTimeMillis()); this.transactionStatus = TransactionStatus.PREPARE; this.sourceType = LoadJobSourceType.FRONTEND; this.prepareTime = -1; @@ -723,7 +732,7 @@ public class TransactionState implements Writable { TableCommitInfo info = TableCommitInfo.read(in); idToTableCommitInfos.put(info.getTableId(), info); } - txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in)); + txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0); transactionStatus = TransactionStatus.valueOf(in.readInt()); sourceType = LoadJobSourceType.valueOf(in.readInt()); prepareTime = in.readLong(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 9108570e5e..ea63a5e18b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -57,8 +57,8 @@ public class DatabaseTransactionMgrTest { private static Env slaveEnv; private static Map LabelToTxnId; - private TransactionState.TxnCoordinator transactionSource = - new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); public static void setTransactionFinishPublish(TransactionState transactionState, List backendIds) { for (long backendId : backendIds) { @@ -118,7 +118,9 @@ public class DatabaseTransactionMgrTest { masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1); labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); - TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1"); + // txn 2, 3, 4 + TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.BE, 0, "be1", System.currentTimeMillis()); long transactionId2 = masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(CatalogTestUtil.testTableId1), CatalogTestUtil.testTxnLabel2, beTransactionSource, @@ -204,7 +206,7 @@ public class DatabaseTransactionMgrTest { @Test public void testGetTransactionIdByCoordinateBe() throws UserException { DatabaseTransactionMgr masterDbTransMgr = masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1); - List> transactionInfoList = masterDbTransMgr.getTransactionIdByCoordinateBe("be1", 10); + List> transactionInfoList = masterDbTransMgr.getPrepareTransactionIdByCoordinateBe(0, "be1", 10); Assert.assertEquals(3, transactionInfoList.size()); Assert.assertEquals(CatalogTestUtil.testDbId1, transactionInfoList.get(0).first.longValue()); Assert.assertEquals(TransactionStatus.PREPARE, diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index d9e2088e59..4f22d95c60 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -77,7 +77,8 @@ public class GlobalTransactionMgrTest { private static Env masterEnv; private static Env slaveEnv; - private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "localfe"); + private TransactionState.TxnCoordinator transactionSource = new TransactionState.TxnCoordinator( + TransactionState.TxnSourceType.FE, 0, "localfe", System.currentTimeMillis()); @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, @@ -323,7 +324,9 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); @@ -395,7 +398,9 @@ public class GlobalTransactionMgrTest { Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L); routineLoadTaskInfoList.add(routineLoadTaskInfo); TransactionState transactionState = new TransactionState(1L, Lists.newArrayList(1L), 1L, "label", null, - LoadJobSourceType.ROUTINE_LOAD_TASK, new TxnCoordinator(TxnSourceType.BE, "be1"), routineLoadJob.getId(), + LoadJobSourceType.ROUTINE_LOAD_TASK, + new TxnCoordinator(TxnSourceType.BE, 0, "be1", System.currentTimeMillis()), + routineLoadJob.getId(), Config.stream_load_default_timeout_second); transactionState.setTransactionStatus(TransactionStatus.PREPARE); masterTransMgr.getCallbackFactory().addCallback(routineLoadJob); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java index c20b2097f8..f08b7478d0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/TransactionStateTest.java @@ -61,8 +61,9 @@ public class TransactionStateTest { UUID uuid = UUID.randomUUID(); TransactionState transactionState = new TransactionState(1000L, Lists.newArrayList(20000L, 20001L), 3000, "label123", new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), - LoadJobSourceType.BACKEND_STREAMING, new TxnCoordinator(TxnSourceType.BE, "127.0.0.1"), 50000L, - 60 * 1000L); + LoadJobSourceType.BACKEND_STREAMING, + new TxnCoordinator(TxnSourceType.BE, 0, "127.0.0.1", System.currentTimeMillis()), + 50000L, 60 * 1000L); transactionState.write(out); out.flush(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index aeabd2a22a..1ab2352696 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -572,6 +572,7 @@ struct TLoadTxnBeginRequest { 10: optional i64 timeout 11: optional Types.TUniqueId request_id 12: optional string token + 13: optional i64 backend_id } struct TLoadTxnBeginResult { @@ -594,6 +595,7 @@ struct TBeginTxnRequest { 9: optional i64 timeout 10: optional Types.TUniqueId request_id 11: optional string token + 12: optional i64 backend_id } struct TBeginTxnResult { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 86afffa700..e94331346d 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -785,6 +785,33 @@ class Suite implements GroovyInterceptable { return hdfs.downLoad(label) } + void runStreamLoadExample(String tableName, String coordidateBeHostPort = "") { + def backends = sql_return_maparray "show backends" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id int, + name varchar(255) + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "${backends.size()}" + ) + """ + + streamLoad { + table tableName + set 'column_separator', ',' + file context.config.dataPath + "/demo_p0/streamload_input.csv" + time 10000 + if (!coordidateBeHostPort.equals("")) { + def pos = coordidateBeHostPort.indexOf(':') + def host = coordidateBeHostPort.substring(0, pos) + def httpPort = coordidateBeHostPort.substring(pos + 1).toInteger() + directToBe host, httpPort + } + } + } + void streamLoad(Closure actionSupplier) { runAction(new StreamLoadAction(context), actionSupplier) } diff --git a/regression-test/suites/demo_p0/streamLoad_action.groovy b/regression-test/suites/demo_p0/streamLoad_action.groovy index 733483517d..59d12c965a 100644 --- a/regression-test/suites/demo_p0/streamLoad_action.groovy +++ b/regression-test/suites/demo_p0/streamLoad_action.groovy @@ -126,6 +126,11 @@ suite("streamLoad_action") { LIMIT 5; """ + def tableName2 = "test_streamload_action2" + runStreamLoadExample(tableName2) + sql """ DROP TABLE ${tableName} """ + sql """ DROP TABLE ${tableName2}""" + sql """ DROP TABLE B """ } diff --git a/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy new file mode 100644 index 0000000000..bb6b0c18a0 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_coordidator_be_restart.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.http.NoHttpResponseException + +suite('test_coordidator_be_restart') { + def options = new ClusterOptions() + options.cloudMode = false + options.enableDebugPoints() + + docker(options) { + def db = context.config.getDbNameByFile(context.file) + def tableName1 = 'tbl_test_coordidator_be_restart_t1' + setFeConfig('abort_txn_after_lost_heartbeat_time_second', 3600) + + def dbId = getDbId() + + def tableName2 = 'tbl_test_coordidator_be_restart_t2' + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + id int, + name CHAR(10), + dt_1 DATETIME DEFAULT CURRENT_TIMESTAMP, + dt_2 DATETIMEV2 DEFAULT CURRENT_TIMESTAMP, + dt_3 DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP, + dt_4 DATETIMEV2(6) DEFAULT CURRENT_TIMESTAMP + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + """ + + def txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + def coordinatorBe = cluster.getAllBackends().get(0) + def coordinatorBeHost = coordinatorBe.host + + GetDebugPoint().enableDebugPointForAllFEs('LoadAction.selectRedirectBackend.backendId', [value: coordinatorBe.backendId]) + GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block') + + thread { + try { + runStreamLoadExample(tableName1, coordinatorBe.host + ':' + coordinatorBe.httpPort) + } catch (NoHttpResponseException t) { + // be down will raise NoHttpResponseException + } + } + + thread { + try { + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${tableName2} (id, name) select c1, c2 from http_stream("format"="csv") + """ + time 120 * 1000 + file context.config.dataPath + '/load_p0/http_stream/test_http_stream.csv' + } + } catch (Exception e) { + logger.info('http stream: ' + e) + } + } + + sleep(5000) + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(2, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + assertEquals(0, txns.size()) + + // coordinatorBe shutdown not abort txn because abort_txn_after_lost_heartbeat_time_second = 3600 + cluster.stopBackends(coordinatorBe.index) + def isDead = false + for (def i = 0; i < 10; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (!be.Alive.toBoolean()) { + isDead = true + break + } + sleep 1000 + } + assertTrue(isDead) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(2, txns.size()) + for (def txn : txns) { + assertEquals('PREPARE', txn.TransactionStatus) + } + + // coordinatorBe restart, abort txn on it + cluster.startBackends(coordinatorBe.index) + def isAlive = false + for (def i = 0; i < 20; i++) { + def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } + if (be.Alive.toBoolean()) { + isAlive = true + break + } + sleep 1000 + } + assertTrue(isAlive) + sleep 5000 + txns = sql_return_maparray "show proc '/transactions/${dbId}/running'" + logger.info('running txns: ' + txns) + assertEquals(0, txns.size()) + txns = sql_return_maparray "show proc '/transactions/${dbId}/finished'" + logger.info('finished txns: ' + txns) + assertEquals(2, txns.size()) + for (def txn : txns) { + assertEquals('ABORTED', txn.TransactionStatus) + } + } +}