[fix](txn insert) Fix txn insert values error when connect to follower fe (#34950)

This commit is contained in:
meiyi
2024-05-16 17:54:39 +08:00
committed by yiguolei
parent 5d1f5968eb
commit b51a4212d6
2 changed files with 23 additions and 10 deletions

View File

@ -33,7 +33,7 @@ import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.commands.NoForward;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@ -59,7 +59,7 @@ import java.util.stream.Collectors;
/**
* insert into values with in txn model.
*/
public class BatchInsertIntoTableCommand extends Command implements ForwardWithSync, Explainable {
public class BatchInsertIntoTableCommand extends Command implements NoForward, Explainable {
public static final Logger LOG = LogManager.getLogger(BatchInsertIntoTableCommand.class);

View File

@ -24,7 +24,7 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
@ -54,11 +54,14 @@ import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.MasterTxnExecutor;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
@ -182,15 +185,25 @@ public class InsertUtils {
txnEntry.setDb(dbObj);
String label = txnEntry.getLabel();
try {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
long txnId;
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
} else {
MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(ctx);
TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token)
.setLabel(label).setUser("").setUserIp("").setPasswd("");
TLoadTxnBeginResult result = masterTxnExecutor.beginTxn(request);
txnId = result.getTxnId();
}
txnConf.setTxnId(txnId);
txnConf.setToken(token);
} catch (UserException e) {
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}