[fix](broker-load) fix broker load statement type conversion failure (#31746)

Co-authored-by: Luwei <814383175@qq.com>
This commit is contained in:
Xin Liao
2024-03-05 12:24:32 +08:00
committed by yiguolei
parent 7c30cb20fd
commit 2c26a308d1

View File

@ -23,6 +23,8 @@ import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UnifiedLoadStmt;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Database;
@ -32,6 +34,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.annotation.LogException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
@ -265,14 +268,9 @@ public abstract class BulkLoadJob extends LoadJob {
fileGroupAggInfo = new BrokerFileGroupAggInfo();
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
LoadStmt stmt;
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
for (DataDescription dataDescription : stmt.getDataDescriptions()) {
dataDescription.analyzeWithoutCheckPriv(db.getFullName());
}
checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
analyzeStmt(SqlParserUtils.getStmt(parser, originStmt.idx), db);
} catch (Exception e) {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
.add("origin_stmt", originStmt)
@ -283,6 +281,20 @@ public abstract class BulkLoadJob extends LoadJob {
}
}
protected void analyzeStmt(StatementBase stmtBase, Database db) throws UserException {
LoadStmt stmt = null;
if (stmtBase instanceof UnifiedLoadStmt) {
stmt = (LoadStmt) ((UnifiedLoadStmt) stmtBase).getProxyStmt();
} else {
stmt = (LoadStmt) stmtBase;
}
for (DataDescription dataDescription : stmt.getDataDescriptions()) {
dataDescription.analyzeWithoutCheckPriv(db.getFullName());
}
checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
}
@Override
protected void replayTxnAttachment(TransactionState txnState) {
if (txnState.getTxnCommitAttachment() == null) {