[fix](txn) Fix coordidator be restart not abort txn #35342 (#36437)

cherry pick from #35342
This commit is contained in:
yujun
2024-06-25 13:35:01 +08:00
committed by GitHub
parent 07ce9cf52c
commit 785a1f49f5
24 changed files with 318 additions and 46 deletions

View File

@ -58,6 +58,7 @@ import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TUniqueId;
@ -405,7 +406,9 @@ public class NativeInsertStmt extends InsertStmt {
LoadJobSourceType sourceType = LoadJobSourceType.INSERT_STREAMING;
transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(db.getId(),
Lists.newArrayList(targetTable.getId()), label.getLabelName(),
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
}
isTransactionBegin = true;

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
@ -323,6 +324,11 @@ public class LoadAction extends RestBaseController {
}
private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadException {
long debugBackendId = DebugPointUtil.getDebugParamOrDefault("LoadAction.selectRedirectBackend.backendId", -1L);
if (debugBackendId != -1L) {
Backend backend = Env.getCurrentSystemInfo().getBackend(debugBackendId);
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
Backend backend = null;
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();

View File

@ -50,6 +50,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
@ -283,8 +284,9 @@ public class DeleteJob extends AbstractTxnStateChangeCallback implements DeleteJ
public long beginTxn() throws Exception {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(deleteInfo.getDbId(),
Lists.newArrayList(deleteInfo.getTableId()), label, null,
new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.FRONTEND, id, Config.stream_load_default_timeout_second);
this.signature = txnId;
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);

View File

@ -49,6 +49,7 @@ import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
@ -108,7 +109,9 @@ public class BrokerLoadJob extends BulkLoadJob {
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
getTimeout());
}

View File

@ -61,6 +61,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
@ -198,7 +199,9 @@ public class SparkLoadJob extends BulkLoadJob {
QuotaExceedException, MetaNotFoundException {
transactionId = Env.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.FRONTEND, id, getTimeout());
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
@ -199,7 +200,9 @@ public abstract class RoutineLoadTaskInfo {
try {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(routineLoadJob.getDbId(),
Lists.newArrayList(routineLoadJob.getTableId()), DebugUtil.printId(id), null,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob.getId(),
timeoutMs / 1000);
} catch (DuplicatedRequestException e) {

View File

@ -32,6 +32,7 @@ import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
@ -132,8 +133,10 @@ public class CanalSyncChannel extends SyncChannel {
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())

View File

@ -58,6 +58,7 @@ import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@ -190,9 +191,10 @@ public class InsertUtils {
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
} else {
MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx);

View File

@ -43,6 +43,7 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
@ -104,7 +105,9 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
} else {
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
}
} catch (Exception e) {

View File

@ -178,6 +178,7 @@ import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
@ -2051,9 +2052,10 @@ public class StmtExecutor {
String label = txnEntry.getLabel();
if (Env.getCurrentEnv().isMaster()) {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();

View File

@ -1182,7 +1182,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, clientIp);
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
if (request.isSetToken()) {
txnCoord.isFromInternal = true;
}
@ -1290,10 +1292,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
// step 6: begin transaction
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), tableIdList, request.getLabel(), request.getRequestId(),
new TxnCoordinator(TxnSourceType.BE, clientIp),
db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
// step 7: return result
@ -2113,6 +2117,25 @@ public class FrontendServiceImpl implements FrontendService.Iface {
httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode);
httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode);
httpStreamParams.getParams().setNumLocalSink(1);
TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(
httpStreamParams.getDb().getId(), httpStreamParams.getTxnId());
if (txnState == null) {
LOG.warn("Not found http stream related txn, txn id = {}", httpStreamParams.getTxnId());
} else {
TxnCoordinator txnCoord = txnState.getCoordinator();
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
if (backend != null) {
// only modify txnCoord in memory, not write editlog yet.
txnCoord.sourceType = TxnSourceType.BE;
txnCoord.id = backend.getId();
txnCoord.ip = backend.getHost();
txnCoord.startTime = backend.getLastStartTime();
LOG.info("Change http stream related txn {} to coordinator {}",
httpStreamParams.getTxnId(), txnCoord);
}
}
result.setParams(httpStreamParams.getParams());
result.getParams().setDbName(httpStreamParams.getDb().getFullName());
result.getParams().setTableName(httpStreamParams.getTable().getName());

View File

@ -170,14 +170,21 @@ public class HeartbeatMgr extends MasterDaemon {
BackendHbResponse hbResponse = (BackendHbResponse) response;
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
long oldStartTime = be.getLastStartTime();
boolean isChanged = be.handleHbResponse(hbResponse, isReplay);
if (hbResponse.getStatus() != HbStatus.OK) {
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
if (!isReplay && oldStartTime != newStartTime) {
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
be.getId(), be.getHost(), newStartTime);
}
} else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay && System.currentTimeMillis() - be.getLastUpdateMs()
>= Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
Env.getCurrentGlobalTransactionMgr()
.abortTxnWhenCoordinateBeDown(be.getHost(), 100);
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
be.getId(), be.getHost(), 100);
}
}
return isChanged;

View File

@ -1828,13 +1828,16 @@ public class DatabaseTransactionMgr {
return null;
}
public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
public List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long coordinateBeId,
String coordinateHost, int limit) {
ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
readLock();
try {
idToRunningTransactionState.values().stream()
.filter(t -> (t.getCoordinator().sourceType == TransactionState.TxnSourceType.BE
&& t.getCoordinator().ip.equals(coordinateHost)))
&& t.getTransactionStatus() == TransactionStatus.PREPARE
&& t.getCoordinator().ip.equals(coordinateHost)
&& (t.getCoordinator().id == 0 || t.getCoordinator().id == coordinateBeId)))
.limit(limit)
.forEach(t -> txnInfos.add(Pair.of(t.getDbId(), t.getTransactionId())));
} finally {

View File

@ -666,10 +666,12 @@ public class GlobalTransactionMgr implements Writable {
}
}
public List<Pair<Long, Long>> getTransactionIdByCoordinateBe(String coordinateHost, int limit) {
private List<Pair<Long, Long>> getPrepareTransactionIdByCoordinateBe(long coordinateBeId,
String coordinateHost, int limit) {
ArrayList<Pair<Long, Long>> txnInfos = new ArrayList<>();
for (DatabaseTransactionMgr databaseTransactionMgr : dbIdToDatabaseTransactionMgrs.values()) {
txnInfos.addAll(databaseTransactionMgr.getTransactionIdByCoordinateBe(coordinateHost, limit));
txnInfos.addAll(databaseTransactionMgr.getPrepareTransactionIdByCoordinateBe(
coordinateBeId, coordinateHost, limit));
if (txnInfos.size() > limit) {
break;
}
@ -677,19 +679,33 @@ public class GlobalTransactionMgr implements Writable {
return txnInfos.size() > limit ? new ArrayList<>(txnInfos.subList(0, limit)) : txnInfos;
}
/**
* If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout
* So when FE identify the Coordinate BE is down, FE should cancel it initiative
*/
public void abortTxnWhenCoordinateBeDown(String coordinateHost, int limit) {
List<Pair<Long, Long>> transactionIdByCoordinateBe = getTransactionIdByCoordinateBe(coordinateHost, limit);
public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordinateHost, long beStartTime) {
List<Pair<Long, Long>> transactionIdByCoordinateBe
= getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, Integer.MAX_VALUE);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first);
TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second);
if (transactionState.getTransactionStatus() == TransactionStatus.PRECOMMITTED) {
continue;
long coordStartTime = transactionState.getCoordinator().startTime;
if (coordStartTime < beStartTime) {
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null);
}
} catch (UserException e) {
LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage());
}
}
}
/**
* If a Coordinate BE is down when running txn, the txn will remain in FE until killed by timeout
* So when FE identify the Coordinate BE is down, FE should cancel it initiative
*/
public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateHost, int limit) {
List<Pair<Long, Long>> transactionIdByCoordinateBe
= getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first);
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null);
} catch (UserException e) {
LOG.warn("Abort txn on coordinate BE {} failed, msg={}", coordinateHost, e.getMessage());

View File

@ -32,6 +32,7 @@ import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TTabletCommitInfo;
@ -175,7 +176,9 @@ public class TransactionEntry {
if (!isTransactionBegan) {
this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), Lists.newArrayList(table.getId()), label,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
new TxnCoordinator(TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, ConnectContext.get().getExecTimeout());
this.isTransactionBegan = true;
this.database = database;

View File

@ -162,8 +162,14 @@ public class TransactionState implements Writable {
public static class TxnCoordinator {
@SerializedName(value = "sourceType")
public TxnSourceType sourceType;
// backendId for backend, 0 for frontend
@SerializedName(value = "id")
public long id = 0;
@SerializedName(value = "ip")
public String ip;
// frontend/backend start time
@SerializedName(value = "st")
public long startTime = 0;
// True if this txn if created by system(such as writing data to audit table)
@SerializedName(value = "ii")
public boolean isFromInternal = false;
@ -171,9 +177,11 @@ public class TransactionState implements Writable {
public TxnCoordinator() {
}
public TxnCoordinator(TxnSourceType sourceType, String ip) {
public TxnCoordinator(TxnSourceType sourceType, long id, String ip, long startTime) {
this.sourceType = sourceType;
this.id = id;
this.ip = ip;
this.startTime = startTime;
}
@Override
@ -304,7 +312,8 @@ public class TransactionState implements Writable {
this.transactionId = -1;
this.label = "";
this.idToTableCommitInfos = Maps.newHashMap();
this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, "127.0.0.1"); // mocked, to avoid NPE
// mocked, to avoid NPE
this.txnCoordinator = new TxnCoordinator(TxnSourceType.FE, 0, "127.0.0.1", System.currentTimeMillis());
this.transactionStatus = TransactionStatus.PREPARE;
this.sourceType = LoadJobSourceType.FRONTEND;
this.prepareTime = -1;
@ -723,7 +732,7 @@ public class TransactionState implements Writable {
TableCommitInfo info = TableCommitInfo.read(in);
idToTableCommitInfos.put(info.getTableId(), info);
}
txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), Text.readString(in));
txnCoordinator = new TxnCoordinator(TxnSourceType.valueOf(in.readInt()), 0, Text.readString(in), 0);
transactionStatus = TransactionStatus.valueOf(in.readInt());
sourceType = LoadJobSourceType.valueOf(in.readInt());
prepareTime = in.readLong();