[MysqlProtocol] Support MySQL multiple statements protocol (#3050)

2 Changes in this CL:

## Support multiple statements in one request like:

```
select 10; select 20; select 30;
```
ISSUE: #3049 

For simple testing this CL, you can using mysql-client shell command tools:

```
mysql> delimiter //
mysql> select 1; select 2; //
+------+
| 1    |
+------+
|    1 |
+------+
1 row in set (0.01 sec)

+------+
| 2    |
+------+
|    2 |
+------+
1 row in set (0.02 sec)

Query OK, 0 rows affected (0.02 sec)
```

I add a new class called `OriginStatement.java`, to save the origin statement in string format with an index. This class is mainly for the following cases:

1. User send a multi-statement to the non-master FE:
      `DDL1; DDL2; DDL3`

2. Currently we cannot separate the original string of a single statement from multiple statements. So we have to forward the entire statement to the Master FE. So I add an index in the forward request. `DDL1`'s index is 0,  `DDL2`'s index is 1,...

3. When the Master FE handle the forwarded request, it will parse the entire statement, got 3 DDL statements, and using the `index` to get the  specified the statement.

## Optimized the display of syntax errors
I have also optimized the display of syntax errors so that longer syntax errors can be fully displayed.
This commit is contained in:
Mingyu Chen
2020-03-13 22:21:40 +08:00
committed by GitHub
parent 9832024995
commit 4c98596283
38 changed files with 499 additions and 161 deletions

View File

@ -132,25 +132,53 @@ parser code {:
if (errorToken == null || stmt == null) {
return null;
}
String[] lines = stmt.split("\n");
String[] lines = stmt.split("\n", -1);
StringBuffer result = new StringBuffer();
result.append(getErrorTypeMessage(errorToken.sym) + " at:\n");
result.append(getErrorTypeMessage(errorToken.sym) + " in line ");
result.append(errorToken.left);
result.append(":\n");
// print lines up to and including the one with the error
for (int i = 0; i < errorToken.left; ++i) {
result.append(lines[i]);
// errorToken_.left is the line number of error.
// errorToken_.right is the column number of the error.
// index is start from 0, so "minus 1" is the real error line idx
String errorLine = lines[errorToken.left - 1];
// If the error is that additional tokens are expected past the end,
// errorToken_.right will be past the end of the string.
int lastCharIndex = Math.min(errorLine.length(), errorToken.right);
int maxPrintLength = 60;
int errorLoc = 0;
if (errorLine.length() <= maxPrintLength) {
// The line is short. Print the entire line.
result.append(errorLine);
result.append('\n');
errorLoc = errorToken.right;
} else {
// The line is too long. Print maxPrintLength/2 characters before the error and
// after the error.
int contextLength = maxPrintLength / 2 - 3;
String leftSubStr;
if (errorToken.right > maxPrintLength / 2) {
leftSubStr = "..." + errorLine.substring(errorToken.right - contextLength,
lastCharIndex);
} else {
leftSubStr = errorLine.substring(0, errorToken.right);
}
errorLoc = leftSubStr.length();
result.append(leftSubStr);
if (errorLine.length() - errorToken.right > maxPrintLength / 2) {
result.append(errorLine.substring(errorToken.right,
errorToken.right + contextLength) + "...");
} else {
result.append(errorLine.substring(lastCharIndex));
}
result.append("\n");
}
// print error indicator
for (int i = 0; i < errorToken.right - 1; ++i) {
for (int i = 0; i < errorLoc - 1; ++i) {
result.append(' ');
}
result.append("^\n");
// print remaining lines
for (int i = errorToken.left; i < lines.length; ++i) {
result.append(lines[i]);
result.append('\n');
}
// only report encountered and expected tokens for syntax errors
if (errorToken.sym == SqlParserSymbols.UNMATCHED_STRING_LITERAL ||
@ -162,10 +190,13 @@ parser code {:
result.append("Encountered: ");
String lastToken = SqlScanner.tokenIdMap.get(Integer.valueOf(errorToken.sym));
if (lastToken != null) {
result.append(lastToken);
result.append(lastToken);
} else if (SqlScanner.isKeyword((String) errorToken.value)) {
result.append("A reserved word cannot be used as an identifier: ").append((String) errorToken.value);
} else {
result.append("Unknown last token with id: " + errorToken.sym);
result.append("Unknown last token with id: " + errorToken.sym);
}
// Append expected tokens
result.append('\n');
result.append("Expected: ");
@ -173,23 +204,24 @@ parser code {:
Integer tokenId = null;
for (int i = 0; i < expectedTokenIds.size(); ++i) {
tokenId = expectedTokenIds.get(i);
// keywords hints
// keywords hints
if (SqlScanner.isKeyword(lastToken) && tokenId.intValue() == SqlParserSymbols.IDENT) {
result.append(String.format("%s is keyword, maybe `%s`", lastToken, lastToken) + ", ");
continue;
result.append(String.format("%s is keyword, maybe `%s`", lastToken, lastToken) + ", ");
continue;
}
if (reportExpectedToken(tokenId)) {
if (reportExpectedToken(tokenId)) {
expectedToken = SqlScanner.tokenIdMap.get(tokenId);
result.append(expectedToken + ", ");
}
}
// remove trailing ", "
result.delete(result.length()-2, result.length());
result.delete(result.length() - 2, result.length());
result.append('\n');
return result.toString();
}
:};
// Total keywords of doris
@ -247,7 +279,8 @@ terminal String UNMATCHED_STRING_LITERAL;
terminal String COMMENTED_PLAN_HINTS;
// Statement that the result of this parser.
nonterminal StatementBase query, stmt, show_stmt, show_param, help_stmt, load_stmt,
nonterminal List<StatementBase> stmts;
nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt,
describe_stmt, alter_stmt,
@ -457,24 +490,25 @@ precedence left LPAREN, RPAREN;
// Support chaining of timestamp arithmetic exprs.
precedence left KW_INTERVAL;
precedence left KW_OVER;
start with query;
start with stmts;
query ::=
stmts ::=
stmt:stmt
{:
RESULT = stmt;
RESULT = Lists.newArrayList(stmt);
:}
| stmt:stmt SEMICOLON
| stmts:stmts SEMICOLON stmt:stmt
{:
RESULT = stmt;
stmts.add(stmt);
RESULT = stmts;
:}
| import_columns_stmt:stmt
{:
RESULT = stmt;
RESULT = Lists.newArrayList(stmt);
:}
| import_where_stmt:stmt
{:
RESULT = stmt;
RESULT = Lists.newArrayList(stmt);
:}
;

View File

@ -802,6 +802,11 @@ public class MaterializedViewHandler extends AlterHandler {
private void changeTableStatus(long dbId, long tableId, OlapTableState olapTableState) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
LOG.warn("db {} has been dropped when changing table {} status after rollup job done",
dbId, tableId);
return;
}
db.writeLock();
try {
OlapTable tbl = (OlapTable) db.getTable(tableId);

View File

@ -23,6 +23,7 @@ import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.SqlParserUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -153,7 +154,7 @@ public class View extends Table {
SqlParser parser = new SqlParser(input);
ParseNode node;
try {
node = (ParseNode) parser.parse().value;
node = (ParseNode) SqlParserUtils.getFirstStmt(parser);
} catch (Exception e) {
LOG.info("stmt is {}", inlineViewDef);
LOG.info("exception because: {}", e);

View File

@ -161,6 +161,8 @@ public final class FeMetaVersion {
public static final int VERSION_74 = 74;
// support materialized index meta while there is different keys type in different materialized index
public static final int VERSION_75 = 75;
// multi statement
public static final int VERSION_76 = 76;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_75;
public static final int VERSION_CURRENT = VERSION_76;
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.common.AnalysisException;
import java.util.List;
// Utils about SQL parser
public class SqlParserUtils {
// parse origin statement and get the first one.
// Doris supports "multi-statement" protocol of MySQL, so when receiving the origin statement like:
// "select k1 from tbl;"
// The parser will return a list with 2 elements: [SelectStmt, EmptyStmt]
// In this case, we only need the first Stmt.
public static StatementBase getFirstStmt(SqlParser parser) throws Exception {
List<StatementBase> stmts = (List<StatementBase>) parser.parse().value;
return stmts.get(0);
}
public static StatementBase getStmt(SqlParser parser, int idx) throws Exception {
List<StatementBase> stmts = (List<StatementBase>) parser.parse().value;
if (idx >= stmts.size()) {
throw new AnalysisException("Invalid statement index: " + idx + ". size: " + stmts.size());
}
return stmts.get(idx);
}
// get all parsed statements as a list
public static List<StatementBase> getMultiStmts(SqlParser parser) throws Exception {
return (List<StatementBase>) parser.parse().value;
}
}

View File

@ -29,6 +29,7 @@ import org.apache.doris.http.BaseResponse;
import org.apache.doris.http.IllegalArgException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.ShowResultSet;
import com.google.common.base.Preconditions;
@ -84,8 +85,8 @@ public class SystemAction extends WebBaseAction {
if (!Catalog.getCurrentCatalog().isMaster()) {
// forward to master
String showProcStmt = "SHOW PROC \"" + procPath + "\"";
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(showProcStmt, ConnectContext.get(),
RedirectStatus.FORWARD_NO_SYNC);
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(new OriginStatement(showProcStmt, 0),
ConnectContext.get(), RedirectStatus.FORWARD_NO_SYNC);
try {
masterOpExecutor.execute();
} catch (Exception e) {

View File

@ -31,6 +31,7 @@ import org.apache.doris.http.IllegalArgException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.system.SystemInfoService;
@ -77,7 +78,7 @@ public class ShowProcAction extends RestBaseAction {
context.setCluster(SystemInfoService.DEFAULT_CLUSTER);
context.setQualifiedUser(ConnectContext.get().getQualifiedUser());
context.setRemoteIP(ConnectContext.get().getRemoteIP());
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(showProcStmt, context,
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(new OriginStatement(showProcStmt, 0), context,
RedirectStatus.FORWARD_NO_SYNC);
LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId());

View File

@ -37,6 +37,7 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
@ -183,7 +184,7 @@ public class TableQueryPlanAction extends RestBaseAction {
private void handleQuery(ConnectContext context, String requestDb, String requestTable, String sql,
Map<String, Object> result) throws DorisHttpException {
// use SE to resolve sql
StmtExecutor stmtExecutor = new StmtExecutor(context, sql, false);
StmtExecutor stmtExecutor = new StmtExecutor(context, new OriginStatement(sql, 0), false);
try {
TQueryOptions tQueryOptions = context.getSessionVariable().toThrift();
// Conduct Planner create SingleNodePlan#createPlanFragments

View File

@ -39,12 +39,14 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.service.FrontendOptions;
@ -86,7 +88,7 @@ public class BrokerLoadJob extends LoadJob {
// this param is used to persist the expr of columns
// the origin stmt is persisted instead of columns expr
// the expr of columns will be reanalyze when the log is replayed
private String originStmt = "";
private OriginStatement originStmt;
// include broker desc and data desc
private BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
@ -102,12 +104,12 @@ public class BrokerLoadJob extends LoadJob {
this.jobType = EtlJobType.BROKER;
}
private BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, String originStmt)
private BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt)
throws MetaNotFoundException {
super(dbId, label);
this.timeoutSecond = Config.broker_load_default_timeout_second;
this.brokerDesc = brokerDesc;
this.originStmt = Strings.nullToEmpty(originStmt);
this.originStmt = originStmt;
this.jobType = EtlJobType.BROKER;
this.authorizationInfo = gatherAuthInfo();
@ -119,7 +121,7 @@ public class BrokerLoadJob extends LoadJob {
}
}
public static BrokerLoadJob fromLoadStmt(LoadStmt stmt, String originStmt) throws DdlException {
public static BrokerLoadJob fromLoadStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException {
// get db id
String dbName = stmt.getLabel().getDbName();
Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName());
@ -277,16 +279,16 @@ public class BrokerLoadJob extends LoadJob {
*/
@Override
public void analyze() {
if (Strings.isNullOrEmpty(originStmt)) {
if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) {
return;
}
// Reset dataSourceInfo, it will be re-created in analyze
fileGroupAggInfo = new BrokerFileGroupAggInfo();
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt),
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
LoadStmt stmt = null;
try {
stmt = (LoadStmt) parser.parse().value;
stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
for (DataDescription dataDescription : stmt.getDataDescriptions()) {
dataDescription.analyzeWithoutCheckPriv();
}
@ -455,6 +457,7 @@ public class BrokerLoadJob extends LoadJob {
.add("error_msg", "db has been deleted when job is loading")
.build(), e);
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
return;
}
db.writeLock();
try {
@ -521,7 +524,7 @@ public class BrokerLoadJob extends LoadJob {
public void write(DataOutput out) throws IOException {
super.write(out);
brokerDesc.write(out);
Text.writeString(out, originStmt);
originStmt.write(out);
out.writeInt(sessionVariables.size());
for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
@ -538,9 +541,14 @@ public class BrokerLoadJob extends LoadJob {
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
originStmt = Text.readString(in);
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
String stmt = Text.readString(in);
originStmt = new OriginStatement(stmt, 0);
} else {
originStmt = OriginStatement.read(in);
}
} else {
originStmt = "";
originStmt = new OriginStatement("", 0);
}
// The origin stmt does not be analyzed in here.
// The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.

View File

@ -38,6 +38,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.thrift.TMiniLoadRequest;
@ -96,7 +97,7 @@ public class LoadManager implements Writable{
* @param stmt
* @throws DdlException
*/
public void createLoadJobFromStmt(LoadStmt stmt, String originStmt) throws DdlException {
public void createLoadJobFromStmt(LoadStmt stmt, OriginStatement originStmt) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
LoadJob loadJob = null;

View File

@ -43,12 +43,14 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.task.StreamLoadTask;
@ -214,7 +216,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob,
// because we can not serialize the Expressions contained in job.
protected String origStmt;
protected OriginStatement origStmt;
protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
// TODO(ml): error sample
@ -1067,7 +1069,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return false;
}
public void setOrigStmt(String origStmt) {
public void setOrigStmt(OriginStatement origStmt) {
this.origStmt = origStmt;
}
@ -1233,7 +1235,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
out.writeLong(committedTaskNum);
out.writeLong(abortedTaskNum);
Text.writeString(out, origStmt);
origStmt.write(out);
out.writeInt(jobProperties.size());
for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
Text.writeString(out, entry.getKey());
@ -1289,7 +1291,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
committedTaskNum = in.readLong();
abortedTaskNum = in.readLong();
origStmt = Text.readString(in);
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
String stmt = Text.readString(in);
origStmt = new OriginStatement(stmt, 0);
} else {
origStmt = OriginStatement.read(in);
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_59) {
int size = in.readInt();
@ -1316,11 +1323,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
// parse the origin stmt to get routine load desc
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt),
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
CreateRoutineLoadStmt stmt = null;
try {
stmt = (CreateRoutineLoadStmt) parser.parse().value;
stmt = (CreateRoutineLoadStmt) SqlParserUtils.getStmt(parser, origStmt.idx);
stmt.checkLoadProperties();
setRoutineLoadDesc(stmt.getRoutineLoadDesc());
} catch (Exception e) {

View File

@ -37,6 +37,7 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -116,7 +117,7 @@ public class RoutineLoadManager implements Writable {
}
public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, String origStmt)
public void createRoutineLoadJob(CreateRoutineLoadStmt createRoutineLoadStmt, OriginStatement origStmt)
throws UserException {
// check load auth
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),

View File

@ -23,9 +23,12 @@ import org.apache.doris.qe.QueryState;
public class MysqlEofPacket extends MysqlPacket {
private static final int EOF_INDICATOR = 0XFE;
private static final int WARNINGS = 0;
private static final int STATUS_FLAGS = 0;
// private static final int STATUS_FLAGS = 0;
private int serverStatus = 0;
public MysqlEofPacket(QueryState state) {
this.serverStatus = state.serverStatus;
}
@Override
@ -35,7 +38,7 @@ public class MysqlEofPacket extends MysqlPacket {
serializer.writeInt1(EOF_INDICATOR);
if (capability.isProtocol41()) {
serializer.writeInt2(WARNINGS);
serializer.writeInt2(STATUS_FLAGS);
serializer.writeInt2(serverStatus);
}
}
}

View File

@ -26,15 +26,16 @@ public class MysqlOkPacket extends MysqlPacket {
private static final int PACKET_OK_INDICATOR = 0X00;
// TODO(zhaochun): following are not used in palo
private static final long LAST_INSERT_ID = 0;
private static final int STATUS_FLAGS = 0;
private final String infoMessage;
private long affectedRows = 0;
private int warningRows = 0;
private int serverStatus = 0;
public MysqlOkPacket(QueryState state) {
infoMessage = state.getInfoMessage();
affectedRows = state.getAffectedRows();
warningRows = state.getWarningRows();
serverStatus = state.serverStatus;
}
@Override
@ -46,10 +47,10 @@ public class MysqlOkPacket extends MysqlPacket {
serializer.writeVInt(affectedRows);
serializer.writeVInt(LAST_INSERT_ID);
if (capability.isProtocol41()) {
serializer.writeInt2(STATUS_FLAGS);
serializer.writeInt2(serverStatus);
serializer.writeInt2(warningRows);
} else if (capability.isTransactions()) {
serializer.writeInt2(STATUS_FLAGS);
serializer.writeInt2(serverStatus);
}
if (capability.isSessionTrack()) {

View File

@ -17,7 +17,8 @@
package org.apache.doris.mysql;
// MySQL server status flag, doesn't used now.
// MySQL server status flag.
// Only SERVER_MORE_RESULTS_EXISTS is used now.
public class MysqlServerStatusFlag {
public static final int SERVER_STATUS_IN_TRANS = 0x0001;
public static final int SERVER_STATUS_AUTOCOMMIT = 0x0002;

View File

@ -240,6 +240,10 @@ public class ConnectContext {
return returnRows;
}
public void resetRetureRows() {
returnRows = 0;
}
public MysqlSerializer getSerializer() {
return serializer;
}

View File

@ -17,6 +17,9 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
@ -24,29 +27,33 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuditLog;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.proto.PQueryStatistics;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
@ -102,9 +109,8 @@ public class ConnectProcessor {
// query state log
ctx.getAuditBuilder().put("State", ctx.getState());
ctx.getAuditBuilder().put("Time", elapseMs);
Preconditions.checkNotNull(statistics);
ctx.getAuditBuilder().put("ScanBytes", statistics.scan_bytes);
ctx.getAuditBuilder().put("ScanRows", statistics.scan_rows);
ctx.getAuditBuilder().put("ScanBytes", statistics == null ? 0 : statistics.scan_bytes);
ctx.getAuditBuilder().put("ScanRows", statistics == null ? 0 : statistics.scan_rows);
ctx.getAuditBuilder().put("ReturnRows", ctx.getReturnRows());
ctx.getAuditBuilder().put("StmtId", ctx.getStmtId());
ctx.getAuditBuilder().put("QueryId", ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()));
@ -143,14 +149,14 @@ public class ConnectProcessor {
private void handleQuery() {
MetricRepo.COUNTER_REQUEST_ALL.increase(1L);
// convert statement to Java string
String stmt = null;
String originStmt = null;
try {
byte[] bytes = packetBuf.array();
int ending = packetBuf.limit() - 1;
while (ending >= 1 && bytes[ending] == '\0') {
ending--;
}
stmt = new String(bytes, 1, ending, "UTF-8");
originStmt = new String(bytes, 1, ending, "UTF-8");
} catch (UnsupportedEncodingException e) {
// impossible
LOG.error("UTF8 is not supported in this environment.");
@ -163,30 +169,78 @@ public class ConnectProcessor {
ctx.getAuditBuilder().put("Db", ctx.getDatabase());
// execute this query.
StatementBase parsedStmt = null;
try {
executor = new StmtExecutor(ctx, stmt);
ctx.setExecutor(executor);
executor.execute();
// set if this is a QueryStmt
ctx.getState().setQuery(executor.isQueryStmt());
} catch (DdlException e) {
LOG.warn("Process one query failed because DdlException.", e);
ctx.getState().setError(e.getMessage());
List<StatementBase> stmts = analyze(originStmt);
for (int i = 0; i < stmts.size(); ++i) {
ctx.getState().reset();
if (i > 0) {
ctx.resetRetureRows();
}
parsedStmt = stmts.get(i);
executor = new StmtExecutor(ctx, parsedStmt, new OriginStatement(originStmt, i));
executor.execute();
if (i != stmts.size() - 1) {
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
finalizeCommand();
}
}
} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
ctx.getState().setError("Palo process failed");
ctx.getState().setError("Doris process failed");
} catch (UserException e) {
LOG.warn("Process one query failed because.", e);
ctx.getState().setError(e.getMessage());
// set is as ANALYSIS_ERR so that it won't be treated as a query failure.
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} catch (Throwable e) {
// Catch all throwable.
// If reach here, maybe palo bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError("Unexpected exception: " + e.getMessage());
if (parsedStmt instanceof KillStmt) {
// ignore kill stmt execute err(not monitor it)
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
}
}
// audit after exec
// replace '\n' to '\\n' to make string in one line
auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt(),
executor.getQueryStatisticsForAuditLog());
// TODO(cmy): when user send multi-statement, the executor is the last statement's executor.
// We may need to find some way to resolve this.
if (executor != null) {
auditAfterExec(originStmt.replace("\n", " \\n"), executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog());
} else {
// executor can be null if we encounter analysis error.
auditAfterExec(originStmt.replace("\n", " \\n"), null, null);
}
}
// analyze the origin stmt and return multi-statements
private List<StatementBase> analyze(String originStmt) throws AnalysisException {
LOG.debug("the originStmts are: {}", originStmt);
// Parse statement with parser generated by CUP&FLEX
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
return SqlParserUtils.getMultiStmts(parser);
} catch (Error e) {
throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
} catch (AnalysisException e) {
LOG.warn("origin_stmt: " + originStmt + "; Analyze error message: " + parser.getErrorMsg(originStmt), e);
String errorMessage = parser.getErrorMsg(originStmt);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {
// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
// should be removed this try-catch clause future.
throw new AnalysisException("Internal Error, maybe this is a bug, please contact with Palo RD.");
}
}
// Get the column definitions of a table
@ -371,7 +425,9 @@ public class ConnectProcessor {
StmtExecutor executor = null;
try {
executor = new StmtExecutor(ctx, request.getSql(), true);
// 0 for compatibility.
int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0;
executor = new StmtExecutor(ctx, new OriginStatement(request.getSql(), idx), true);
executor.execute();
} catch (IOException e) {
// Client failed.

View File

@ -76,7 +76,8 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.Load;
public class DdlExecutor {
public static void execute(Catalog catalog, DdlStmt ddlStmt, String origStmt) throws DdlException, Exception {
public static void execute(Catalog catalog, DdlStmt ddlStmt, OriginStatement origStmt)
throws DdlException, Exception {
if (ddlStmt instanceof CreateClusterStmt) {
CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt;
catalog.createCluster(stmt);

View File

@ -33,7 +33,7 @@ import java.nio.ByteBuffer;
public class MasterOpExecutor {
private static final Logger LOG = LogManager.getLogger(MasterOpExecutor.class);
private final String originStmt;
private final OriginStatement originStmt;
private final ConnectContext ctx;
private TMasterOpResult result;
@ -41,7 +41,7 @@ public class MasterOpExecutor {
// the total time of thrift connectTime add readTime and writeTime
private int thriftTimeoutMs;
public MasterOpExecutor(String originStmt, ConnectContext ctx, RedirectStatus status) {
public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status) {
this.originStmt = originStmt;
this.ctx = ctx;
if (status.isNeedToWaitJournalSync()) {
@ -73,7 +73,8 @@ public class MasterOpExecutor {
}
TMasterOpRequest params = new TMasterOpRequest();
params.setCluster(ctx.getClusterName());
params.setSql(originStmt);
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
params.setUser(ctx.getQualifiedUser());
params.setDb(ctx.getDatabase());
params.setSqlMode(ctx.getSessionVariable().getSqlMode());

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*
* This class represents an origin statement
* in multiple statements.
*/
public class OriginStatement implements Writable {
// the origin stmt from client. this may includes more than one statement.
// eg: "select 1; select 2; select 3"
@SerializedName(value = "originStmt")
public final String originStmt;
// the idx of the specified statement in "originStmt", start from 0.
@SerializedName(value = "idx")
public final int idx;
public OriginStatement(String originStmt, int idx) {
this.originStmt = originStmt;
this.idx = idx;
}
public static OriginStatement read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, OriginStatement.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -45,6 +45,8 @@ public class QueryState {
private boolean isQuery = false;
private long affectedRows = 0;
private int warningRows = 0;
// make it public for easy to use
public int serverStatus = 0;
public QueryState() {
}
@ -53,6 +55,7 @@ public class QueryState {
stateType = MysqlStateType.OK;
errorCode = null;
infoMessage = null;
serverStatus = 0;
}
public MysqlStateType getStateType() {

View File

@ -54,6 +54,7 @@ import org.apache.doris.common.Version;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.mysql.MysqlChannel;
@ -99,7 +100,7 @@ public class StmtExecutor {
private ConnectContext context;
private MysqlSerializer serializer;
private String originStmt;
private OriginStatement originStmt;
private StatementBase parsedStmt;
private Analyzer analyzer;
private RuntimeProfile profile;
@ -112,15 +113,26 @@ public class StmtExecutor {
private ShowResultSet proxyResultSet = null;
private PQueryStatistics statisticsForAuditLog;
public StmtExecutor(ConnectContext context, String stmt, boolean isProxy) {
// this constructor is mainly for proxy
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
this.context = context;
this.originStmt = stmt;
this.originStmt = originStmt;
this.serializer = context.getSerializer();
this.isProxy = isProxy;
}
// this constructor is only for test now.
public StmtExecutor(ConnectContext context, String stmt) {
this(context, stmt, false);
this(context, new OriginStatement(stmt, 0), false);
}
// constructor for receiving parsed stmt from connect processor
public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt, OriginStatement originStmt) {
this.context = ctx;
this.originStmt = originStmt;
this.serializer = context.getSerializer();
this.isProxy = false;
this.parsedStmt = parsedStmt;
}
// At the end of query execution, we begin to add up profile
@ -140,7 +152,7 @@ public class StmtExecutor {
summaryProfile.addInfoString("Doris Version", Version.DORIS_BUILD_VERSION);
summaryProfile.addInfoString(ProfileManager.USER, context.getQualifiedUser());
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, context.getDatabase());
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt);
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, originStmt.originStmt);
profile.addChild(summaryProfile);
if (coord != null) {
coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
@ -216,7 +228,7 @@ public class StmtExecutor {
if (isForwardToMaster()) {
forwardToMaster();
return;
} else {
} else {
LOG.debug("no need to transfer to Master. stmt: {}", context.getStmtId());
}
@ -351,31 +363,35 @@ public class StmtExecutor {
NotImplementedException {
LOG.info("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId());
// Parse statement with parser generated by CUP&FLEX
SqlScanner input = new SqlScanner(new StringReader(originStmt), context.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
parsedStmt = (StatementBase) parser.parse().value;
redirectStatus = parsedStmt.getRedirectStatus();
} catch (Error e) {
LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e);
throw new AnalysisException("sql parsing error, please check your sql");
} catch (AnalysisException e) {
LOG.info("analysis exception happened when parsing stmt {}, id: {}, error: {}",
originStmt, context.getStmtId(), parser.getErrorMsg(originStmt), e);
String errorMessage = parser.getErrorMsg(originStmt);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
// parsedStmt may already by set when constructing this StmtExecutor();
if (parsedStmt == null) {
// Parse statement with parser generated by CUP&FLEX
SqlScanner input = new SqlScanner(new StringReader(originStmt.originStmt), context.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx);
} catch (Error e) {
LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e);
throw new AnalysisException("sql parsing error, please check your sql");
} catch (AnalysisException e) {
String syntaxError = parser.getErrorMsg(originStmt.originStmt);
LOG.info("analysis exception happened when parsing stmt {}, id: {}, error: {}",
originStmt, context.getStmtId(), syntaxError, e);
if (syntaxError == null) {
throw e;
} else {
throw new AnalysisException(syntaxError, e);
}
} catch (Exception e) {
// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
// should be removed this try-catch clause future.
LOG.info("unexpected exception happened when parsing stmt {}, id: {}, error: {}",
originStmt, context.getStmtId(), parser.getErrorMsg(originStmt.originStmt), e);
throw new AnalysisException("Unexpected exception: " + e.getMessage());
}
} catch (Exception e) {
// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
// should be removed this try-catch clause future.
LOG.info("unexpected exception happened when parsing stmt {}, id: {}, error: {}",
originStmt, context.getStmtId(), parser.getErrorMsg(originStmt), e);
throw new AnalysisException("Unexpected exception: " + e.getMessage());
}
redirectStatus = parsedStmt.getRedirectStatus();
// yiguolei: insertstmt's grammer analysis will write editlog, so that we check if the stmt should be forward to master here
// if the stmt should be forward to master, then just return here and the master will do analysis again
@ -549,7 +565,7 @@ public class StmtExecutor {
coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt, coord));
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
coord.exec();
@ -854,7 +870,7 @@ public class StmtExecutor {
context.getState().setError(e.getMessage());
} catch (Exception e) {
// Maybe our bug
LOG.warn("DDL statement(" + originStmt + ") process failed.", e);
LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e);
context.getState().setError("Unexpected exception: " + e.getMessage());
}
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.thrift.TFileFormatType;
@ -195,7 +196,7 @@ public class StreamLoadTask {
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL)));
ImportColumnsStmt columnsStmt;
try {
columnsStmt = (ImportColumnsStmt) parser.parse().value;
columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Error e) {
LOG.warn("error happens when parsing columns, sql={}", columnsSQL, e);
throw new AnalysisException("failed to parsing columns' header, maybe contain unsupported character");
@ -223,7 +224,7 @@ public class StreamLoadTask {
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL)));
ImportWhereStmt whereStmt;
try {
whereStmt = (ImportWhereStmt) parser.parse().value;
whereStmt = (ImportWhereStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Error e) {
LOG.warn("error happens when parsing where header, sql={}", whereSQL, e);
throw new AnalysisException("failed to parsing where header, maybe contain unsupported character");

View File

@ -17,6 +17,8 @@
package org.apache.doris.analysis;
import org.apache.doris.common.util.SqlParserUtils;
import org.junit.Assert;
import org.junit.Test;
@ -56,7 +58,7 @@ public class AdminShowReplicaTest {
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(stmt)));
AdminShowReplicaStatusStmt showStmt = null;
try {
showStmt = (AdminShowReplicaStatusStmt) parser.parse().value;
showStmt = (AdminShowReplicaStatusStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Error e) {
Assert.fail(e.getMessage());
} catch (Exception e) {

View File

@ -17,11 +17,6 @@
package org.apache.doris.analysis;
import com.google.common.collect.Lists;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@ -32,12 +27,16 @@ import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.catalog.View;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -46,6 +45,11 @@ import java.io.StringReader;
import java.util.LinkedList;
import java.util.List;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
public class AlterViewStmtTest {
private Analyzer analyzer;
@ -155,7 +159,7 @@ public class AlterViewStmtTest {
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(alterStmt)));
QueryStmt alterQueryStmt = null;
try {
alterQueryStmt = (QueryStmt) parser.parse().value;
alterQueryStmt = (QueryStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Error e) {
Assert.fail(e.getMessage());
} catch (Exception e) {

View File

@ -104,5 +104,4 @@ public class LoadStmtTest {
Assert.fail("No exception throws.");
}
}

View File

@ -1,15 +1,16 @@
package org.apache.doris.analysis;
import java.io.StringReader;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.mysql.privilege.MockedAuth;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.qe.ConnectContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.StringReader;
import mockit.Mocked;
public class SetOperationStmtTest {
@ -31,28 +32,28 @@ public class SetOperationStmtTest {
String sql = "select k1,k2 from t where k1='a' union select k1,k2 from t where k1='b';";
SqlScanner input = new SqlScanner(new StringReader(sql));
SqlParser parser = new SqlParser(input);
SetOperationStmt stmt = (SetOperationStmt) parser.parse().value;
SetOperationStmt stmt = (SetOperationStmt) SqlParserUtils.getFirstStmt(parser);
Assert.assertEquals(SetOperationStmt.Operation.UNION, stmt.getOperands().get(1).getOperation());
sql = "select k1,k2 from t where k1='a' intersect select k1,k2 from t where k1='b';";
input = new SqlScanner(new StringReader(sql));
parser = new SqlParser(input);
stmt = (SetOperationStmt) parser.parse().value;
stmt = (SetOperationStmt) SqlParserUtils.getFirstStmt(parser);
Assert.assertEquals(SetOperationStmt.Operation.INTERSECT, stmt.getOperands().get(1).getOperation());
sql = "select k1,k2 from t where k1='a' except select k1,k2 from t where k1='b';";
input = new SqlScanner(new StringReader(sql));
parser = new SqlParser(input);
stmt = (SetOperationStmt) parser.parse().value;
stmt = (SetOperationStmt) SqlParserUtils.getFirstStmt(parser);
Assert.assertEquals(SetOperationStmt.Operation.EXCEPT, stmt.getOperands().get(1).getOperation());
sql = "select k1,k2 from t where k1='a' minus select k1,k2 from t where k1='b';";
input = new SqlScanner(new StringReader(sql));
parser = new SqlParser(input);
stmt = (SetOperationStmt) parser.parse().value;
stmt = (SetOperationStmt) SqlParserUtils.getFirstStmt(parser);
Assert.assertEquals(SetOperationStmt.Operation.EXCEPT, stmt.getOperands().get(1).getOperation());
sql = "select k1,k2 from t where k1='a' union select k1,k2 from t where k1='b' intersect select k1,k2 from t "
+ "where k1='c' except select k1,k2 from t where k1='d';";
input = new SqlScanner(new StringReader(sql));
parser = new SqlParser(input);
stmt = (SetOperationStmt) parser.parse().value;
stmt = (SetOperationStmt) SqlParserUtils.getFirstStmt(parser);
Assert.assertEquals(SetOperationStmt.Operation.UNION, stmt.getOperands().get(1).getOperation());
Assert.assertEquals(SetOperationStmt.Operation.INTERSECT, stmt.getOperands().get(2).getOperation());
Assert.assertEquals(SetOperationStmt.Operation.EXCEPT, stmt.getOperands().get(3).getOperation());

View File

@ -18,8 +18,10 @@
package org.apache.doris.analysis;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.rewrite.ExprRewriter;
import org.junit.Assert;
import org.junit.Test;
@ -35,7 +37,7 @@ public class SqlModeTest {
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(stmt)));
SelectStmt selectStmt = null;
try {
selectStmt = (SelectStmt) parser.parse().value;
selectStmt = (SelectStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
@ -43,7 +45,7 @@ public class SqlModeTest {
parser = new SqlParser(new SqlScanner(new StringReader(stmt), SqlModeHelper.MODE_DEFAULT));
try {
selectStmt = (SelectStmt) parser.parse().value;
selectStmt = (SelectStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
@ -57,7 +59,7 @@ public class SqlModeTest {
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(stmt), SqlModeHelper.MODE_PIPES_AS_CONCAT));
SelectStmt selectStmt = null;
try {
selectStmt = (SelectStmt) parser.parse().value;
selectStmt = (SelectStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
@ -70,7 +72,7 @@ public class SqlModeTest {
// Mode DeActive
parser = new SqlParser(new SqlScanner(new StringReader(stmt), SqlModeHelper.MODE_DEFAULT));
try {
selectStmt = (SelectStmt) parser.parse().value;
selectStmt = (SelectStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
@ -88,7 +90,7 @@ public class SqlModeTest {
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(stmt), SqlModeHelper.MODE_PIPES_AS_CONCAT));
SelectStmt parsedStmt = null;
try {
parsedStmt = (SelectStmt) parser.parse().value;
parsedStmt = (SelectStmt) SqlParserUtils.getFirstStmt(parser);
} catch (Exception e) {
Assert.fail(e.getMessage());
}

View File

@ -35,6 +35,7 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.transaction.TransactionState;
@ -96,7 +97,7 @@ public class BrokerLoadJobTest {
};
try {
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, originStmt);
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0));
Assert.fail();
} catch (DdlException e) {
System.out.println("could not find table named " + tableName);
@ -161,7 +162,7 @@ public class BrokerLoadJobTest {
};
try {
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, originStmt);
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, new OriginStatement(originStmt, 0));
Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId"));
Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label"));
Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state"));

View File

@ -40,6 +40,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TResourceInfo;
@ -115,7 +116,7 @@ public class RoutineLoadManagerTest {
}
};
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, "dummy");
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0));
Map<String, RoutineLoadJob> idToRoutineLoadJob =
Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob");
@ -175,7 +176,7 @@ public class RoutineLoadManagerTest {
};
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
try {
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, "dummy");
routineLoadManager.createRoutineLoadJob(createRoutineLoadStmt, new OriginStatement("dummy", 0));
Assert.fail();
} catch (LoadException | DdlException e) {
Assert.fail();

View File

@ -17,6 +17,8 @@
package org.apache.doris.mysql;
import org.apache.doris.qe.QueryState;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -33,7 +35,7 @@ public class MysqlEofPacketTest {
@Test
public void testWrite() {
MysqlEofPacket packet = new MysqlEofPacket(null);
MysqlEofPacket packet = new MysqlEofPacket(new QueryState());
MysqlSerializer serializer = MysqlSerializer.newInstance(capability);
packet.writeTo(serializer);

View File

@ -20,6 +20,7 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Type;
import org.apache.doris.qe.ConnectContext;
@ -32,6 +33,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.List;
import java.util.UUID;
public class QueryPlanTest {
@ -288,6 +290,25 @@ public class QueryPlanTest {
String sql = "select * from test.baseall a where k1 in (select k1 from test.bigtable b where k2 > 0 and k1 = 1);";
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertEquals(MysqlStateType.EOF, connectContext.getState().getStateType());
sql = "SHOW VARIABLES LIKE 'lower_case_%'; SHOW VARIABLES LIKE 'sql_mode'";
List<StatementBase> stmts = UtFrameUtils.parseAndAnalyzeStmts(sql, connectContext);
Assert.assertEquals(2, stmts.size());
}
@Test
public void testMultiStmts() throws Exception {
String sql = "SHOW VARIABLES LIKE 'lower_case_%'; SHOW VARIABLES LIKE 'sql_mode'";
List<StatementBase>stmts = UtFrameUtils.parseAndAnalyzeStmts(sql, connectContext);
Assert.assertEquals(2, stmts.size());
sql = "SHOW VARIABLES LIKE 'lower_case_%';;;";
stmts = UtFrameUtils.parseAndAnalyzeStmts(sql, connectContext);
Assert.assertEquals(4, stmts.size());
sql = "SHOW VARIABLES LIKE 'lower_case_%'";
stmts = UtFrameUtils.parseAndAnalyzeStmts(sql, connectContext);
Assert.assertEquals(1, stmts.size());
}
@Test

View File

@ -17,13 +17,18 @@
package org.apache.doris.planner;
import mockit.Expectations;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.ImportWhereStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
@ -32,10 +37,13 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import java.io.StringReader;
import java.util.List;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
@ -83,4 +91,17 @@ public class StreamLoadPlannerTest {
StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, streamLoadTask);
planner.plan(streamLoadTask.getId());
}
@Test
public void testParseStmt() throws Exception {
String sql = new String("COLUMNS (k1, k2, k3=abc(), k4=default_value())");
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(sql)));
ImportColumnsStmt columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser);
Assert.assertEquals(4, columnsStmt.getColumns().size());
sql = new String("WHERE k1 > 2 and k3 < 4");
parser = new SqlParser(new SqlScanner(new StringReader(sql)));
ImportWhereStmt whereStmt = (ImportWhereStmt) SqlParserUtils.getFirstStmt(parser);
Assert.assertTrue(whereStmt.getExpr() instanceof CompoundPredicate);
}
}

View File

@ -17,8 +17,6 @@
package org.apache.doris.qe;
import mockit.Expectations;
import mockit.Mocked;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DdlStmt;
@ -60,6 +58,8 @@ import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java_cup.runtime.Symbol;
import mockit.Expectations;
import mockit.Mocked;
public class StmtExecutorTest {
private ConnectContext ctx;
@ -205,7 +205,7 @@ public class StmtExecutorTest {
queryStmt.rewriteExprs((ExprRewriter) any);
minTimes = 0;
Symbol symbol = new Symbol(0, queryStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(queryStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -259,7 +259,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = null;
Symbol symbol = new Symbol(0, showStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(showStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -294,7 +294,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = null;
Symbol symbol = new Symbol(0, showStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(showStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -329,7 +329,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, killStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(killStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -371,7 +371,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, killStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(killStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -427,7 +427,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, killStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(killStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -478,7 +478,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, killStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(killStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -509,7 +509,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, setStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(setStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -537,7 +537,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, setStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(setStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -566,7 +566,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, ddlStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(ddlStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -577,7 +577,7 @@ public class StmtExecutorTest {
new Expectations(ddlExecutor) {
{
// Mock ddl
DdlExecutor.execute((Catalog) any, (DdlStmt) any, anyString);
DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any);
minTimes = 0;
}
};
@ -599,7 +599,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, ddlStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(ddlStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -610,7 +610,7 @@ public class StmtExecutorTest {
new Expectations(ddlExecutor) {
{
// Mock ddl
DdlExecutor.execute((Catalog) any, (DdlStmt) any, anyString);
DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any);
minTimes = 0;
result = new DdlException("ddl fail");
}
@ -633,7 +633,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = RedirectStatus.NO_FORWARD;
Symbol symbol = new Symbol(0, ddlStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(ddlStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -644,7 +644,7 @@ public class StmtExecutorTest {
new Expectations(ddlExecutor) {
{
// Mock ddl
DdlExecutor.execute((Catalog) any, (DdlStmt) any, anyString);
DdlExecutor.execute((Catalog) any, (DdlStmt) any, (OriginStatement) any);
minTimes = 0;
result = new Exception("bug");
}
@ -675,7 +675,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = "testCluster";
Symbol symbol = new Symbol(0, useStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(useStmt));
parser.parse();
minTimes = 0;
result = symbol;
@ -707,7 +707,7 @@ public class StmtExecutorTest {
minTimes = 0;
result = "testCluster";
Symbol symbol = new Symbol(0, useStmt);
Symbol symbol = new Symbol(0, Lists.newArrayList(useStmt));
parser.parse();
minTimes = 0;
result = symbol;

View File

@ -17,13 +17,13 @@
package org.apache.doris.utframe;
import org.apache.commons.io.FileUtils;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
@ -47,7 +47,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
@ -78,6 +77,7 @@ public class AnotherDemoTest {
@BeforeClass
public static void beforeClass() throws EnvVarNotSetException, IOException,
FeStartException, NotInitException, DdlException, InterruptedException {
FeConstants.default_scheduler_interval_millisecond = 10;
// get DORIS_HOME
String dorisHome = System.getenv("DORIS_HOME");
if (Strings.isNullOrEmpty(dorisHome)) {

View File

@ -64,7 +64,7 @@ public class DemoTest {
@BeforeClass
public static void beforeClass() throws EnvVarNotSetException, IOException,
FeStartException, NotInitException, DdlException, InterruptedException {
FeConstants.default_scheduler_interval_millisecond = 1000;
FeConstants.default_scheduler_interval_millisecond = 10;
UtFrameUtils.createMinDorisCluster(runningDir);
}

View File

@ -26,6 +26,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
@ -80,7 +81,7 @@ public class UtFrameUtils {
Analyzer analyzer = new Analyzer(ctx.getCatalog(), ctx);
StatementBase statementBase = null;
try {
statementBase = (StatementBase) parser.parse().value;
statementBase = SqlParserUtils.getFirstStmt(parser);
} catch (AnalysisException e) {
String errorMessage = parser.getErrorMsg(originStmt);
System.err.println("parse failed: " + errorMessage);
@ -94,6 +95,30 @@ public class UtFrameUtils {
return statementBase;
}
// for analyzing multi statements
public static List<StatementBase> parseAndAnalyzeStmts(String originStmt, ConnectContext ctx) throws Exception {
System.out.println("begin to parse stmts: " + originStmt);
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
Analyzer analyzer = new Analyzer(ctx.getCatalog(), ctx);
List<StatementBase> statementBases = null;
try {
statementBases = SqlParserUtils.getMultiStmts(parser);
} catch (AnalysisException e) {
String errorMessage = parser.getErrorMsg(originStmt);
System.err.println("parse failed: " + errorMessage);
if (errorMessage == null) {
throw e;
} else {
throw new AnalysisException(errorMessage, e);
}
}
for (StatementBase stmt : statementBases) {
stmt.analyze(analyzer);
}
return statementBases;
}
public static int startFEServer(String runningDir) throws EnvVarNotSetException, IOException,
FeStartException, NotInitException, DdlException, InterruptedException {
// get DORIS_HOME

View File

@ -421,6 +421,7 @@ struct TMasterOpRequest {
13: optional bool enableStrictMode
// this can replace the "user" field
14: optional Types.TUserIdentity current_user_ident
15: optional i32 stmtIdx // the idx of the sql in multi statements
}
struct TColumnDefinition {