diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/BaseController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/BaseController.java index 60240f20d3..cc1a208a2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/BaseController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/BaseController.java @@ -80,7 +80,7 @@ public class BaseController { value.password = authInfo.password; addSession(request, response, value); - ConnectContext ctx = new ConnectContext(null); + ConnectContext ctx = new ConnectContext(); ctx.setQualifiedUser(authInfo.fullUserName); ctx.setRemoteIP(authInfo.remoteIp); ctx.setCurrentUserIdentity(currentUser); @@ -133,7 +133,7 @@ public class BaseController { updateCookieAge(request, PALO_SESSION_ID, PALO_SESSION_EXPIRED_TIME, response); - ConnectContext ctx = new ConnectContext(null); + ConnectContext ctx = new ConnectContext(); ctx.setQualifiedUser(sessionValue.currentUser.getQualifiedUser()); ctx.setRemoteIP(request.getRemoteHost()); ctx.setCurrentUserIdentity(sessionValue.currentUser); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java index 26553e611b..2f56831d44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java @@ -56,7 +56,7 @@ public class RestBaseController extends BaseController { ActionAuthorizationInfo authInfo = getAuthorizationInfo(request); // check password UserIdentity currentUser = checkPassword(authInfo); - ConnectContext ctx = new ConnectContext(null); + ConnectContext ctx = new ConnectContext(); ctx.setEnv(Env.getCurrentEnv()); ctx.setQualifiedUser(authInfo.fullUserName); ctx.setRemoteIP(authInfo.remoteIp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index 26eaa572e9..c0bef652f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -27,6 +27,7 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.io.ByteBufferNetworkInputStream; import org.apache.doris.load.LoadJobRowResult; +import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; @@ -129,10 +130,11 @@ public class MysqlLoadManager { } private void replyClientForReadFile(ConnectContext context, String path) throws IOException { - context.getSerializer().reset(); - context.getSerializer().writeByte((byte) 0xfb); - context.getSerializer().writeEofString(path); - context.getMysqlChannel().sendAndFlush(context.getSerializer().toByteBuffer()); + MysqlSerializer serializer = context.getMysqlChannel().getSerializer(); + serializer.reset(); + serializer.writeByte((byte) 0xfb); + serializer.writeEofString(path); + context.getMysqlChannel().sendAndFlush(serializer.toByteBuffer()); } private void fillByteBufferAsync(ConnectContext context, ByteBufferNetworkInputStream inputStream) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskContext.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskContext.java index 1a9f7e2084..af6f322761 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskContext.java @@ -26,7 +26,6 @@ import java.util.Map; public class MTMVTaskContext { ConnectContext ctx; String query; - String remoteIp; Map properties; MTMVTask task; MTMVJob job; @@ -63,14 +62,6 @@ public class MTMVTaskContext { this.query = query; } - public String getRemoteIp() { - return remoteIp; - } - - public void setRemoteIp(String remoteIp) { - this.remoteIp = remoteIp; - } - public Map getProperties() { return properties; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java index da1685f3ac..55b4949c07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskExecutor.java @@ -108,7 +108,6 @@ public class MTMVTaskExecutor implements Comparable { ctx.setQueryId(queryId); taskContext.setCtx(ctx); - taskContext.setRemoteIp(ctx.getRemoteIp()); taskContext.setTask(task); taskContext.setJob(job); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java rename to fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index 8744a272cd..4f059c7c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mysql.nio; +package org.apache.doris.mysql; import org.apache.doris.catalog.Env; import org.apache.doris.common.ErrorCode; -import org.apache.doris.mysql.MysqlProto; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.qe.ConnectScheduler; @@ -55,7 +54,7 @@ public class AcceptListener implements ChannelListener { private static final Logger LOG = LogManager.getLogger(ReadListener.class); - private NConnectContext ctx; + private ConnectContext ctx; private ConnectProcessor connectProcessor; - public ReadListener(NConnectContext nConnectContext, ConnectProcessor connectProcessor) { - this.ctx = nConnectContext; + public ReadListener(ConnectContext connectContext, ConnectProcessor connectProcessor) { + this.ctx = connectContext; this.connectProcessor = connectProcessor; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NConnectContext.java deleted file mode 100644 index a4adf5b65c..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NConnectContext.java +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.mysql.nio; - -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.ConnectProcessor; - -import org.xnio.StreamConnection; - -import java.io.IOException; - -/** - * connect context based on nio. - */ -public class NConnectContext extends ConnectContext { - protected NMysqlChannel mysqlChannel; - - public NConnectContext(StreamConnection connection) { - super(); - mysqlChannel = new NMysqlChannel(connection); - } - - @Override - public void cleanup() { - mysqlChannel.close(); - returnRows = 0; - } - - @Override - public NMysqlChannel getMysqlChannel() { - return mysqlChannel; - } - - public void startAcceptQuery(ConnectProcessor connectProcessor) { - mysqlChannel.startAcceptQuery(this, connectProcessor); - } - - public void suspendAcceptQuery() { - mysqlChannel.suspendAcceptQuery(); - } - - public void resumeAcceptQuery() { - mysqlChannel.resumeAcceptQuery(); - } - - public void stopAcceptQuery() throws IOException { - mysqlChannel.stopAcceptQuery(); - } - - @Override - public String toString() { - return "[remote ip: " + mysqlChannel.getRemoteIp() + "]"; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlChannel.java deleted file mode 100644 index d70bd1edee..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlChannel.java +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.mysql.nio; - -import org.apache.doris.mysql.MysqlChannel; -import org.apache.doris.qe.ConnectProcessor; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.xnio.StreamConnection; -import org.xnio.channels.Channels; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; - -/** - * mysql Channel based on nio. - */ -public class NMysqlChannel extends MysqlChannel { - protected static final Logger LOG = LogManager.getLogger(NMysqlChannel.class); - private StreamConnection conn; - - public NMysqlChannel(StreamConnection connection) { - super(); - this.conn = connection; - if (connection.getPeerAddress() instanceof InetSocketAddress) { - InetSocketAddress address = (InetSocketAddress) connection.getPeerAddress(); - remoteHostPortString = address.getHostString() + ":" + address.getPort(); - remoteIp = address.getAddress().getHostAddress(); - } else { - // Reach here, what's it? - remoteHostPortString = connection.getPeerAddress().toString(); - remoteIp = connection.getPeerAddress().toString(); - } - } - - /** - * read packet until whole dstBuf is filled, unless block. - * Todo: find a better way to avoid block read here. - * - * @param dstBuf - * @return - */ - @Override - protected int readAll(ByteBuffer dstBuf) { - int readLen = 0; - try { - while (dstBuf.remaining() != 0) { - int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf); - // return -1 when remote peer close the channel - if (ret == -1) { - return readLen; - } - readLen += ret; - } - } catch (IOException e) { - LOG.debug("Read channel exception, ignore.", e); - return 0; - } - return readLen; - } - - /** - * write packet until no data is remained, unless block. - * - * @param buffer - * @throws IOException - */ - @Override - protected void realNetSend(ByteBuffer buffer) throws IOException { - long bufLen = buffer.remaining(); - long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer); - if (bufLen != writeLen) { - throw new IOException("Write mysql packet failed.[write=" + writeLen - + ", needToWrite=" + bufLen + "]"); - } - Channels.flushBlocking(conn.getSinkChannel()); - isSend = true; - } - - @Override - public void close() { - try { - conn.close(); - } catch (IOException e) { - LOG.warn("Close channel exception, ignore."); - } - } - - public void startAcceptQuery(NConnectContext nConnectContext, ConnectProcessor connectProcessor) { - conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor)); - conn.getSourceChannel().resumeReads(); - } - - public void suspendAcceptQuery() { - conn.getSourceChannel().suspendReads(); - } - - public void resumeAcceptQuery() { - conn.getSourceChannel().resumeReads(); - } - - public void stopAcceptQuery() throws IOException { - conn.getSourceChannel().shutdownReads(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 1b79fa6771..fb8c650004 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -28,10 +28,10 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.mysql.DummyMysqlChannel; import org.apache.doris.mysql.MysqlCapability; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; -import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.nereids.StatementContext; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; @@ -47,8 +47,9 @@ import com.google.common.collect.Sets; import io.opentelemetry.api.trace.Tracer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.xnio.StreamConnection; -import java.nio.channels.SocketChannel; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,7 +61,7 @@ import java.util.Set; // Use `volatile` to make the reference change atomic. public class ConnectContext { private static final Logger LOG = LogManager.getLogger(ConnectContext.class); - protected static ThreadLocal threadLocalInfo = new ThreadLocal(); + protected static ThreadLocal threadLocalInfo = new ThreadLocal<>(); // set this id before analyze protected volatile long stmtId; @@ -98,8 +99,6 @@ public class ConnectContext { // In other word, currentUserIdentity is the entry that matched in Doris auth table. // This account determines user's access privileges. protected volatile UserIdentity currentUserIdentity; - // Serializer used to pack MySQL packet. - protected volatile MysqlSerializer serializer; // Variables belong to this session. protected volatile SessionVariable sessionVariable; // Scheduler this connection belongs to @@ -128,7 +127,7 @@ public class ConnectContext { // This is used to statistic the current query details. // This property will only be set when the query starts to execute. // So in the query planning stage, do not use any value in this attribute. - protected QueryDetail queryDetail; + protected QueryDetail queryDetail = null; // If set to true, the nondeterministic function will not be rewrote to constant. private boolean notEvalNondeterministicFunction = false; @@ -157,7 +156,7 @@ public class ConnectContext { *

* when a connection is established, exec_timeout is set by query_timeout, when the statement is an insert stmt, * then it is set to max(query_timeout, insert_timeout) with {@link #resetExecTimeout()} in - * {@link ConnectProcessor#handleQuery()} after the StmtExecutor is specified. + * after the StmtExecutor is specified. */ private int executionTimeoutS; @@ -213,19 +212,18 @@ public class ConnectContext { this(null); } - public ConnectContext(SocketChannel channel) { + public ConnectContext(StreamConnection connection) { state = new QueryState(); returnRows = 0; serverCapability = MysqlCapability.DEFAULT_CAPABILITY; isKilled = false; - mysqlChannel = new MysqlChannel(channel); - serializer = MysqlSerializer.newInstance(); + if (connection != null) { + mysqlChannel = new MysqlChannel(connection); + } else { + mysqlChannel = new DummyMysqlChannel(); + } sessionVariable = VariableMgr.newSessionVariable(); command = MysqlCommand.COM_SLEEP; - if (channel != null) { - remoteIP = mysqlChannel.getRemoteIp(); - } - queryDetail = null; if (Config.use_fuzzy_session_variable) { sessionVariable.initFuzzyModeVariables(); } @@ -403,10 +401,6 @@ public class ConnectContext { returnRows = 0; } - public MysqlSerializer getSerializer() { - return serializer; - } - public int getConnectionId() { return connectionId; } @@ -484,7 +478,9 @@ public class ConnectContext { } public void cleanup() { - mysqlChannel.close(); + if (mysqlChannel != null) { + mysqlChannel.close(); + } threadLocalInfo.remove(); returnRows = 0; } @@ -644,10 +640,6 @@ public class ConnectContext { return currentConnectedFEIp; } - public String getRemoteIp() { - return mysqlChannel == null ? "" : mysqlChannel.getRemoteIp(); - } - public void resetExecTimeout() { if (executor != null && executor.isInsertStmt()) { // particular timeout for insert stmt, we can make other particular timeout in the same way. @@ -686,6 +678,23 @@ public class ConnectContext { } } + + public void startAcceptQuery(ConnectProcessor connectProcessor) { + mysqlChannel.startAcceptQuery(this, connectProcessor); + } + + public void suspendAcceptQuery() { + mysqlChannel.suspendAcceptQuery(); + } + + public void resumeAcceptQuery() { + mysqlChannel.resumeAcceptQuery(); + } + + public void stopAcceptQuery() throws IOException { + mysqlChannel.stopAcceptQuery(); + } + public String getQueryIdentifier() { return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]"; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 2493ed2010..6574929d7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -510,8 +510,8 @@ public class ConnectProcessor { table.readLock(); try { - MysqlSerializer serializer = ctx.getSerializer(); MysqlChannel channel = ctx.getMysqlChannel(); + MysqlSerializer serializer = channel.getSerializer(); // Send fields // NOTE: Field list doesn't send number of fields @@ -594,7 +594,7 @@ public class ConnectProcessor { return null; } - MysqlSerializer serializer = ctx.getSerializer(); + MysqlSerializer serializer = ctx.getMysqlChannel().getSerializer(); serializer.reset(); packet.writeTo(serializer); return serializer.toByteBuffer(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index a3dd5f39c3..8ce1615bb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -19,11 +19,8 @@ package org.apache.doris.qe; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.mysql.MysqlProto; -import org.apache.doris.mysql.nio.NConnectContext; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.thrift.TUniqueId; @@ -87,13 +84,7 @@ public class ConnectScheduler { if (context == null) { return false; } - context.setConnectionId(nextConnectionId.getAndAdd(1)); - // no necessary for nio. - if (context instanceof NConnectContext) { - return true; - } - executor.submit(new LoopHandler(context)); return true; } @@ -166,48 +157,4 @@ public class ConnectScheduler { TUniqueId queryId = traceId2QueryId.get(traceId); return queryId == null ? "" : DebugUtil.printId(queryId); } - - private class LoopHandler implements Runnable { - ConnectContext context; - - LoopHandler(ConnectContext context) { - this.context = context; - } - - @Override - public void run() { - try { - // Set thread local info - context.setThreadLocalInfo(); - context.setConnectScheduler(ConnectScheduler.this); - // authenticate check failed. - if (!MysqlProto.negotiate(context)) { - return; - } - - if (registerConnection(context)) { - MysqlProto.sendResponsePacket(context); - } else { - context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections"); - MysqlProto.sendResponsePacket(context); - return; - } - - context.setUserQueryTimeout(context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser())); - context.setStartTime(); - ConnectProcessor processor = new ConnectProcessor(context); - processor.loop(); - } catch (Exception e) { - // for unauthorized access such lvs probe request, may cause exception, just log it in debug level - if (context.getCurrentUserIdentity() != null) { - LOG.warn("connect processor exception because ", e); - } else { - LOG.debug("connect processor exception because ", e); - } - } finally { - unregisterConnection(context); - context.cleanup(); - } - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java index 2ec97ded45..36eb4e8db5 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java @@ -17,7 +17,7 @@ package org.apache.doris.qe; -import org.apache.doris.mysql.nio.NMysqlServer; +import org.apache.doris.mysql.MysqlServer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -31,7 +31,7 @@ public class QeService { private int port; // MySQL protocol service - private NMysqlServer mysqlServer; + private MysqlServer mysqlServer; @Deprecated public QeService(int port) { @@ -40,7 +40,7 @@ public class QeService { public QeService(int port, ConnectScheduler scheduler) { this.port = port; - this.mysqlServer = new NMysqlServer(port, scheduler); + this.mysqlServer = new MysqlServer(port, scheduler); } public void start() throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8302760149..d0d7d8a57e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -202,7 +202,7 @@ public class StmtExecutor implements ProfileWriter { public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) { this.context = context; this.originStmt = originStmt; - this.serializer = context.getSerializer(); + this.serializer = context.getMysqlChannel().getSerializer(); this.isProxy = isProxy; this.statementContext = new StatementContext(context, originStmt); this.context.setStatementContext(statementContext); @@ -218,7 +218,7 @@ public class StmtExecutor implements ProfileWriter { this.context = ctx; this.parsedStmt = parsedStmt; this.originStmt = parsedStmt.getOrigStmt(); - this.serializer = context.getSerializer(); + this.serializer = context.getMysqlChannel().getSerializer(); this.isProxy = false; if (parsedStmt instanceof LogicalPlanAdapter) { this.statementContext = ((LogicalPlanAdapter) parsedStmt).getStatementContext(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index dbcca2d6c0..09db8ac9ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -776,7 +776,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // add this log so that we can track this stmt LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), clientAddr.getHostname()); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(); // Set current connected FE to the client address, so that we can know where this request come from. context.setCurrentConnectedFEIp(clientAddr.getHostname()); ConnectProcessor processor = new ConnectProcessor(context); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AccessTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AccessTestUtil.java index b9aab77e7e..7cb83baa11 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AccessTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AccessTestUtil.java @@ -439,7 +439,7 @@ public class AccessTestUtil { public static Analyzer fetchAdminAnalyzer(boolean withCluster) { final String prefix = "testCluster:"; - Analyzer analyzer = new Analyzer(fetchAdminCatalog(), new ConnectContext(null)); + Analyzer analyzer = new Analyzer(fetchAdminCatalog(), new ConnectContext()); new Expectations(analyzer) { { analyzer.getDefaultCatalog(); @@ -475,7 +475,7 @@ public class AccessTestUtil { } public static Analyzer fetchBlockAnalyzer() throws AnalysisException { - Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext(null)); + Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext()); new Expectations(analyzer) { { analyzer.getDefaultCatalog(); @@ -499,7 +499,7 @@ public class AccessTestUtil { } public static Analyzer fetchEmptyDbAnalyzer() { - Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext(null)); + Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext()); new Expectations(analyzer) { { analyzer.getDefaultCatalog(); @@ -612,7 +612,7 @@ public class AccessTestUtil { }; Env env = fetchBlockCatalog(); - Analyzer analyzer = new Analyzer(env, new ConnectContext(null)); + Analyzer analyzer = new Analyzer(env, new ConnectContext()); new Expectations(analyzer) { { analyzer.getDefaultCatalog(); @@ -649,7 +649,7 @@ public class AccessTestUtil { analyzer.getContext(); minTimes = 0; - result = new ConnectContext(null); + result = new ConnectContext(); } }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java index 6dac199f61..fe96259bdb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelAlterStmtTest.java @@ -45,7 +45,7 @@ public class CancelAlterStmtTest { @Before public void setUp() { env = AccessTestUtil.fetchAdminCatalog(); - ctx = new ConnectContext(null); + ctx = new ConnectContext(); ctx.setQualifiedUser("root"); ctx.setRemoteIP("192.168.1.1"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnDefTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnDefTest.java index b998e61364..d969be2520 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnDefTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ColumnDefTest.java @@ -47,7 +47,7 @@ public class ColumnDefTest { floatCol = new TypeDef(ScalarType.createType(PrimitiveType.FLOAT)); booleanCol = new TypeDef(ScalarType.createType(PrimitiveType.BOOLEAN)); - ctx = new ConnectContext(null); + ctx = new ConnectContext(); new MockUp() { @Mock public ConnectContext get() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java index 21fda2e40e..391590d713 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java @@ -35,7 +35,7 @@ public class CreateUserStmtTest { @Before public void setUp() { - ConnectContext ctx = new ConnectContext(null); + ConnectContext ctx = new ConnectContext(); ctx.setQualifiedUser("root"); ctx.setRemoteIP("192.168.1.1"); UserIdentity currentUserIdentity = new UserIdentity("root", "192.168.1.1"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowAlterStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowAlterStmtTest.java index 15fb280f49..063a9601eb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowAlterStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowAlterStmtTest.java @@ -44,7 +44,7 @@ public class ShowAlterStmtTest { FakeEnv.setEnv(env); - analyzer = new Analyzer(env, new ConnectContext(null)); + analyzer = new Analyzer(env, new ConnectContext()); new Expectations(analyzer) { { analyzer.getDefaultDb(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java index 2553bba9f3..153dd9687a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/SystemInfoServiceTest.java @@ -136,7 +136,7 @@ public class SystemInfoServiceTest { } }; - analyzer = new Analyzer(env, new ConnectContext(null)); + analyzer = new Analyzer(env, new ConnectContext()); } public void mkdir(String dirString) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java b/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java index 35fd708fc7..f54045f4ab 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/ldap/LdapAuthenticateTest.java @@ -142,7 +142,7 @@ public class LdapAuthenticateTest { } private ConnectContext getContext() { - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(); context.setEnv(env); context.setThreadLocalInfo(); return context; diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlChannelTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlChannelTest.java deleted file mode 100644 index c3e65b6544..0000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlChannelTest.java +++ /dev/null @@ -1,299 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.mysql; - -import mockit.Delegate; -import mockit.Expectations; -import mockit.Mocked; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -public class MysqlChannelTest { - int packetId = 0; - int readIdx = 0; - @Mocked - private SocketChannel channel; - - @Before - public void setUp() throws IOException { - packetId = 0; - readIdx = 0; - new Expectations() { - { - channel.getRemoteAddress(); - minTimes = 0; - result = new InetSocketAddress(1024); - } - }; - } - - @Test - public void testReceive() throws IOException { - // mock - new Expectations() { - { - channel.read((ByteBuffer) any); - minTimes = 0; - result = new Delegate() { - int fakeRead(ByteBuffer buffer) { - MysqlSerializer serializer = MysqlSerializer.newInstance(); - if (readIdx == 0) { - readIdx++; - serializer.writeInt3(10); - serializer.writeInt1(packetId++); - - buffer.put(serializer.toArray()); - return 4; - } else if (readIdx == 1) { - readIdx++; - byte[] buf = new byte[buffer.remaining()]; - for (int i = 0; i < buffer.remaining(); ++i) { - buf[i] = (byte) ('a' + i); - - } - buffer.put(buf); - return 10; - } - return -1; - } - }; - } - }; - - MysqlChannel channel1 = new MysqlChannel(channel); - - ByteBuffer buf = channel1.fetchOnePacket(); - Assert.assertEquals(10, buf.remaining()); - for (int i = 0; i < 10; ++i) { - Assert.assertEquals('a' + i, buf.get()); - } - } - - @Test - public void testLongPacket() throws IOException { - // mock - new Expectations() { - { - channel.read((ByteBuffer) any); - minTimes = 0; - result = new Delegate() { - int fakeRead(ByteBuffer buffer) { - int maxLen = MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH; - MysqlSerializer serializer = MysqlSerializer.newInstance(); - if (readIdx == 0) { - // packet - readIdx++; - serializer.writeInt3(maxLen); - serializer.writeInt1(packetId++); - - buffer.put(serializer.toArray()); - return 4; - } else if (readIdx == 1) { - readIdx++; - int readLen = buffer.remaining(); - byte[] buf = new byte[readLen]; - for (int i = 0; i < readLen; ++i) { - buf[i] = (byte) ('a' + (i % 26)); - - } - buffer.put(buf); - return readLen; - } else if (readIdx == 2) { - // packet - readIdx++; - serializer.writeInt3(10); - serializer.writeInt1(packetId++); - - buffer.put(serializer.toArray()); - return 4; - } else if (readIdx == 3) { - readIdx++; - int readLen = buffer.remaining(); - byte[] buf = new byte[readLen]; - for (int i = 0; i < readLen; ++i) { - buf[i] = (byte) ('a' + (maxLen + i) % 26); - - } - buffer.put(buf); - return readLen; - } - return 0; - } - }; - } - }; - - MysqlChannel channel1 = new MysqlChannel(channel); - - ByteBuffer buf = channel1.fetchOnePacket(); - Assert.assertEquals(MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH + 10, buf.remaining()); - for (int i = 0; i < MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH + 10; ++i) { - Assert.assertEquals('a' + (i % 26), buf.get()); - } - } - - @Test(expected = IOException.class) - public void testBadSeq() throws IOException { - // mock - new Expectations() { - { - channel.read((ByteBuffer) any); - minTimes = 0; - result = new Delegate() { - int fakeRead(ByteBuffer buffer) { - int maxLen = MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH; - MysqlSerializer serializer = MysqlSerializer.newInstance(); - if (readIdx == 0) { - // packet - readIdx++; - serializer.writeInt3(maxLen); - serializer.writeInt1(packetId++); - - buffer.put(serializer.toArray()); - return 4; - } else if (readIdx == 1) { - readIdx++; - int readLen = buffer.remaining(); - byte[] buf = new byte[readLen]; - for (int i = 0; i < readLen; ++i) { - buf[i] = (byte) ('a' + (i % 26)); - - } - buffer.put(buf); - return readLen; - } else if (readIdx == 2) { - // packet - readIdx++; - serializer.writeInt3(10); - // NOTE: Bad packet seq - serializer.writeInt1(0); - - buffer.put(serializer.toArray()); - return 4; - } else if (readIdx == 3) { - readIdx++; - byte[] buf = new byte[buffer.remaining()]; - for (int i = 0; i < buffer.remaining(); ++i) { - buf[i] = (byte) ('a' + (i % 26)); - - } - buffer.put(buf); - return buffer.remaining(); - } - return 0; - } - }; - } - }; - - MysqlChannel channel1 = new MysqlChannel(channel); - - channel1.fetchOnePacket(); - } - - @Test(expected = IOException.class) - public void testException() throws IOException { - // mock - new Expectations() { - { - channel.read((ByteBuffer) any); - minTimes = 0; - result = new IOException(); - } - }; - - MysqlChannel channel1 = new MysqlChannel(channel); - - channel1.fetchOnePacket(); - Assert.fail("No Exception throws."); - } - - @Test - public void testSend() throws IOException { - // mock - new Expectations() { - { - channel.write((ByteBuffer) any); - minTimes = 0; - result = new Delegate() { - int fakeWrite(ByteBuffer buffer) { - int writeLen = 0; - writeLen += buffer.remaining(); - buffer.position(buffer.limit()); - return writeLen; - } - }; - } - }; - - MysqlChannel channel1 = new MysqlChannel(channel); - ByteBuffer buf = ByteBuffer.allocate(1000); - channel1.sendOnePacket(buf); - - buf = ByteBuffer.allocate(0xffffff0); - channel1.sendOnePacket(buf); - } - - @Test(expected = IOException.class) - public void testSendException() throws IOException { - // mock - new Expectations() { - { - channel.write((ByteBuffer) any); - minTimes = 0; - result = new IOException(); - } - }; - MysqlChannel channel1 = new MysqlChannel(channel); - ByteBuffer buf = ByteBuffer.allocate(1000); - channel1.sendOnePacket(buf); - - buf = ByteBuffer.allocate(0xffffff0); - channel1.sendAndFlush(buf); - } - - @Test(expected = IOException.class) - public void testSendFail() throws IOException { - // mock - new Expectations() { - { - channel.write((ByteBuffer) any); - minTimes = 0; - result = new Delegate() { - int fakeWrite(ByteBuffer buffer) { - int writeLen = 0; - writeLen += buffer.remaining(); - buffer.position(buffer.limit()); - return writeLen - 1; - } - }; - } - }; - MysqlChannel channel1 = new MysqlChannel(channel); - ByteBuffer buf = ByteBuffer.allocate(1000); - channel1.sendAndFlush(buf); - Assert.fail("No Exception throws."); - } - -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java index f630e6fe13..777ab59501 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlProtoTest.java @@ -39,6 +39,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; +import org.xnio.StreamConnection; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -67,6 +68,8 @@ public class MysqlProtoTest { private LdapAuthenticate ldapAuthenticate; @Mocked private MysqlClearTextPacket clearTextPacket; + @Mocked + StreamConnection streamConnection; @Before public void setUp() throws DdlException, AuthenticationException { @@ -242,7 +245,7 @@ public class MysqlProtoTest { mockChannel("user", true); mockPassword(true); mockAccess(); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(streamConnection); context.setEnv(env); context.setThreadLocalInfo(); Assert.assertTrue(MysqlProto.negotiate(context)); @@ -253,7 +256,7 @@ public class MysqlProtoTest { mockChannel("user", false); mockPassword(true); mockAccess(); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(streamConnection); MysqlProto.negotiate(context); Assert.assertFalse(MysqlProto.negotiate(context)); } @@ -263,7 +266,7 @@ public class MysqlProtoTest { mockChannel("user", true); mockPassword(false); mockAccess(); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(streamConnection); Assert.assertTrue(MysqlProto.negotiate(context)); } @@ -272,7 +275,7 @@ public class MysqlProtoTest { mockChannel("", true); mockPassword(true); mockAccess(); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(streamConnection); Assert.assertFalse(MysqlProto.negotiate(context)); } @@ -283,7 +286,7 @@ public class MysqlProtoTest { mockAccess(); mockMysqlClearTextPacket(PASSWORD_CLEAR_TEXT); mockLdap("user", true); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(streamConnection); context.setEnv(env); context.setThreadLocalInfo(); Assert.assertTrue(MysqlProto.negotiate(context)); @@ -297,7 +300,7 @@ public class MysqlProtoTest { mockAccess(); mockMysqlClearTextPacket("654321"); mockLdap("user", true); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(streamConnection); context.setEnv(env); context.setThreadLocalInfo(); Assert.assertFalse(MysqlProto.negotiate(context)); @@ -311,7 +314,7 @@ public class MysqlProtoTest { mockAccess(); mockLdap("root", false); mockMysqlClearTextPacket("654321"); - ConnectContext context = new ConnectContext(null); + ConnectContext context = new ConnectContext(streamConnection); context.setEnv(env); context.setThreadLocalInfo(); Assert.assertTrue(MysqlProto.negotiate(context)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java index 353b137cb6..ed62bc6504 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlServerTest.java @@ -17,7 +17,6 @@ package org.apache.doris.mysql; -import org.apache.doris.mysql.nio.NMysqlServer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectScheduler; @@ -84,7 +83,7 @@ public class MysqlServerTest { int port = socket.getLocalPort(); socket.close(); - NMysqlServer server = new NMysqlServer(port, scheduler); + MysqlServer server = new MysqlServer(port, scheduler); Assert.assertTrue(server.start()); // submit @@ -112,9 +111,9 @@ public class MysqlServerTest { ServerSocket socket = new ServerSocket(0); int port = socket.getLocalPort(); socket.close(); - NMysqlServer server = new NMysqlServer(port, scheduler); + MysqlServer server = new MysqlServer(port, scheduler); Assert.assertTrue(server.start()); - NMysqlServer server1 = new NMysqlServer(port, scheduler); + MysqlServer server1 = new MysqlServer(port, scheduler); Assert.assertFalse(server1.start()); server.stop(); @@ -125,7 +124,7 @@ public class MysqlServerTest { ServerSocket socket = new ServerSocket(0); int port = socket.getLocalPort(); socket.close(); - NMysqlServer server = new NMysqlServer(port, badScheduler); + MysqlServer server = new MysqlServer(port, badScheduler); Assert.assertTrue(server.start()); // submit diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/SetPasswordTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/SetPasswordTest.java index dad05d92a8..8e607e074d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/SetPasswordTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/SetPasswordTest.java @@ -85,7 +85,7 @@ public class SetPasswordTest { CreateUserStmt stmt = new CreateUserStmt(new UserDesc(userIdentity)); auth.createUser(stmt); - ConnectContext ctx = new ConnectContext(null); + ConnectContext ctx = new ConnectContext(); // set password for 'cmy'@'%' UserIdentity currentUser1 = new UserIdentity("default_cluster:cmy", "%"); currentUser1.setIsAnalyzed(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/MemoTestUtils.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/MemoTestUtils.java index f80fc7b0c4..f63b0842fc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/MemoTestUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/MemoTestUtils.java @@ -33,7 +33,6 @@ import org.apache.doris.system.SystemInfoService; import org.apache.commons.lang3.StringUtils; import java.io.IOException; -import java.nio.channels.SocketChannel; /** * MemoUtils. @@ -103,8 +102,7 @@ public class MemoTestUtils { */ public static ConnectContext createCtx(UserIdentity user, String host) { try { - SocketChannel channel = SocketChannel.open(); - ConnectContext ctx = new ConnectContext(channel); + ConnectContext ctx = new ConnectContext(); ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); ctx.setCurrentUserIdentity(user); ctx.setQualifiedUser(user.getQualifiedUser()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java index fdee7a7a37..a0f3a942a4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java @@ -19,12 +19,10 @@ package org.apache.doris.qe; import org.apache.doris.catalog.Env; import org.apache.doris.mysql.MysqlCapability; -import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.thrift.TUniqueId; -import mockit.Expectations; import mockit.Mocked; import org.junit.Assert; import org.junit.Before; @@ -34,8 +32,6 @@ import java.nio.channels.SocketChannel; import java.util.List; public class ConnectContextTest { - @Mocked - private MysqlChannel channel; @Mocked private StmtExecutor executor; @Mocked @@ -51,28 +47,11 @@ public class ConnectContextTest { @Before public void setUp() throws Exception { - new Expectations() { - { - channel.getRemoteHostPortString(); - minTimes = 0; - result = "127.0.0.1:12345"; - - channel.close(); - minTimes = 0; - - channel.getRemoteIp(); - minTimes = 0; - result = "192.168.1.1"; - - executor.cancel(); - minTimes = 0; - } - }; } @Test public void testNormal() { - ConnectContext ctx = new ConnectContext(socketChannel); + ConnectContext ctx = new ConnectContext(); // State Assert.assertNotNull(ctx.getState()); @@ -102,7 +81,7 @@ public class ConnectContextTest { Assert.assertEquals("testCluster:testUser", ctx.getQualifiedUser()); // Serializer - Assert.assertNotNull(ctx.getSerializer()); + Assert.assertNotNull(ctx.getMysqlChannel().getSerializer()); // Session variable Assert.assertNotNull(ctx.getSessionVariable()); @@ -126,7 +105,7 @@ public class ConnectContextTest { Assert.assertEquals(9, row.size()); Assert.assertEquals("101", row.get(0)); Assert.assertEquals("testUser", row.get(1)); - Assert.assertEquals("127.0.0.1:12345", row.get(2)); + Assert.assertEquals("", row.get(2)); Assert.assertEquals("testCluster", row.get(3)); Assert.assertEquals("testDb", row.get(4)); Assert.assertEquals("Ping", row.get(5)); @@ -154,7 +133,7 @@ public class ConnectContextTest { @Test public void testSleepTimeout() { - ConnectContext ctx = new ConnectContext(socketChannel); + ConnectContext ctx = new ConnectContext(); ctx.setCommand(MysqlCommand.COM_SLEEP); // sleep no time out @@ -191,7 +170,7 @@ public class ConnectContextTest { @Test public void testOtherTimeout() { - ConnectContext ctx = new ConnectContext(socketChannel); + ConnectContext ctx = new ConnectContext(); ctx.setCommand(MysqlCommand.COM_QUERY); // sleep no time out @@ -219,7 +198,7 @@ public class ConnectContextTest { @Test public void testThreadLocal() { - ConnectContext ctx = new ConnectContext(socketChannel); + ConnectContext ctx = new ConnectContext(); Assert.assertNull(ConnectContext.get()); ctx.setThreadLocalInfo(); Assert.assertNotNull(ConnectContext.get()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java deleted file mode 100644 index 0b62673a3b..0000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ /dev/null @@ -1,574 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.qe; - -import org.apache.doris.analysis.AccessTestUtil; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.metric.MetricRepo; -import org.apache.doris.mysql.MysqlChannel; -import org.apache.doris.mysql.MysqlCommand; -import org.apache.doris.mysql.MysqlEofPacket; -import org.apache.doris.mysql.MysqlErrPacket; -import org.apache.doris.mysql.MysqlOkPacket; -import org.apache.doris.mysql.MysqlSerializer; -import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; -import org.apache.doris.proto.Data; -import org.apache.doris.thrift.TUniqueId; - -import mockit.Expectations; -import mockit.Mocked; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -public class ConnectProcessorTest { - private static ByteBuffer initDbPacket; - private static ByteBuffer pingPacket; - private static ByteBuffer quitPacket; - private static ByteBuffer queryPacket; - private static ByteBuffer multiQueryPacket; - private static ByteBuffer fieldListPacket; - private static AuditEventBuilder auditBuilder = new AuditEventBuilder(); - private static ConnectContext myContext; - - @Mocked - private static SocketChannel socketChannel; - - private static Data.PQueryStatistics statistics = Data.PQueryStatistics.newBuilder().build(); - - @BeforeClass - public static void setUpClass() { - // Init Database packet - { // CHECKSTYLE IGNORE THIS LINE - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(2); - serializer.writeEofString("testCluster:testDb"); - initDbPacket = serializer.toByteBuffer(); - } // CHECKSTYLE IGNORE THIS LINE - - // Ping packet - { // CHECKSTYLE IGNORE THIS LINE - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(14); - pingPacket = serializer.toByteBuffer(); - } // CHECKSTYLE IGNORE THIS LINE - - // Quit packet - { // CHECKSTYLE IGNORE THIS LINE - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(1); - quitPacket = serializer.toByteBuffer(); - } // CHECKSTYLE IGNORE THIS LINE - - // Query packet - { // CHECKSTYLE IGNORE THIS LINE - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(3); - serializer.writeEofString("select * from a"); - queryPacket = serializer.toByteBuffer(); - } // CHECKSTYLE IGNORE THIS LINE - - // Multi query packet - { // CHECKSTYLE IGNORE THIS LINE - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(3); - serializer.writeEofString("select * from a;select * from b;drop table a"); - multiQueryPacket = serializer.toByteBuffer(); - } // CHECKSTYLE IGNORE THIS LINE - - // Field list packet - { // CHECKSTYLE IGNORE THIS LINE - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(4); - serializer.writeNulTerminateString("testTbl"); - serializer.writeEofString(""); - fieldListPacket = serializer.toByteBuffer(); - } // CHECKSTYLE IGNORE THIS LINE - statistics = statistics.toBuilder().setCpuMs(0L).setScanRows(0).setScanBytes(0).build(); - - MetricRepo.init(); - } - - @Before - public void setUp() throws Exception { - initDbPacket.clear(); - pingPacket.clear(); - quitPacket.clear(); - queryPacket.clear(); - multiQueryPacket.clear(); - fieldListPacket.clear(); - // Mock - MysqlChannel channel = new MysqlChannel(socketChannel); - new Expectations(channel) { - { - channel.getRemoteHostPortString(); - minTimes = 0; - result = "127.0.0.1:12345"; - } - }; - myContext = new ConnectContext(socketChannel); - Deencapsulation.setField(myContext, "mysqlChannel", channel); - } - - private static MysqlChannel mockChannel(ByteBuffer packet) { - try { - MysqlChannel channel = new MysqlChannel(socketChannel); - new Expectations(channel) { - { - // Mock receive - channel.fetchOnePacket(); - minTimes = 0; - result = packet; - - // Mock reset - channel.setSequenceId(0); - times = 1; - - // Mock send - // channel.sendOnePacket((ByteBuffer) any); - // minTimes = 0; - channel.sendAndFlush((ByteBuffer) any); - minTimes = 0; - - channel.getRemoteHostPortString(); - minTimes = 0; - result = "127.0.0.1:12345"; - } - }; - return channel; - } catch (IOException e) { - return null; - } - } - - private static ConnectContext initMockContext(MysqlChannel channel, Env env) { - ConnectContext context = new ConnectContext(socketChannel) { - private boolean firstTimeToSetCommand = true; - @Override - public void setKilled() { - myContext.setKilled(); - } - - @Override - public MysqlSerializer getSerializer() { - return myContext.getSerializer(); - } - - @Override - public QueryState getState() { - return myContext.getState(); - } - - @Override - public void setStartTime() { - myContext.setStartTime(); - } - - @Override - public String getDatabase() { - return myContext.getDatabase(); - } - - @Override - public void setCommand(MysqlCommand command) { - if (firstTimeToSetCommand) { - myContext.setCommand(command); - firstTimeToSetCommand = false; - } else { - super.setCommand(command); - } - } - }; - - CatalogIf catalog = env.getCurrentCatalog(); - - new Expectations(context) { - { - context.getMysqlChannel(); - minTimes = 0; - result = channel; - - context.isKilled(); - minTimes = 0; - maxTimes = 3; - returns(false, true, false); - - context.getEnv(); - minTimes = 0; - result = env; - - context.getAuditEventBuilder(); - minTimes = 0; - result = auditBuilder; - - context.getQualifiedUser(); - minTimes = 0; - result = "testCluster:user"; - - context.getClusterName(); - minTimes = 0; - result = "testCluster"; - - context.getStartTime(); - minTimes = 0; - result = 0L; - - context.getReturnRows(); - minTimes = 0; - result = 1L; - - context.setStmtId(anyLong); - minTimes = 0; - - context.getStmtId(); - minTimes = 0; - result = 1L; - - context.queryId(); - minTimes = 0; - result = new TUniqueId(); - - context.getCurrentCatalog(); - minTimes = 0; - result = catalog; - } - }; - - return context; - } - - @Test - public void testQuit() throws IOException { - ConnectContext ctx = initMockContext(mockChannel(quitPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_QUIT, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket); - Assert.assertTrue(myContext.isKilled()); - } - - @Test - public void testInitDb() throws IOException { - ConnectContext ctx = initMockContext(mockChannel(initDbPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_INIT_DB, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket); - } - - @Test - public void testInitDbFail() throws IOException { - ConnectContext ctx = initMockContext(mockChannel(initDbPacket), AccessTestUtil.fetchBlockCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_INIT_DB, myContext.getCommand()); - Assert.assertFalse(myContext.getState().toResponsePacket() instanceof MysqlOkPacket); - } - - @Test - public void testPing() throws IOException { - ConnectContext ctx = initMockContext(mockChannel(pingPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_PING, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket); - Assert.assertFalse(myContext.isKilled()); - } - - @Test - public void testPingLoop() throws IOException { - ConnectContext ctx = initMockContext(mockChannel(pingPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.loop(); - Assert.assertEquals(MysqlCommand.COM_PING, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket); - Assert.assertFalse(myContext.isKilled()); - } - - @Test - public void testQuery(@Mocked StmtExecutor executor) throws Exception { - ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - - // Mock statement executor - new Expectations() { - { - executor.getQueryStatisticsForAuditLog(); - minTimes = 0; - result = statistics; - } - }; - - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); - } - - @Test - public void testQueryWithUserInfo(@Mocked StmtExecutor executor) throws Exception { - ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - - // Mock statement executor - new Expectations() { - { - executor.getQueryStatisticsForAuditLog(); - minTimes = 0; - result = statistics; - } - }; - processor.processOnce(); - StmtExecutor er = Deencapsulation.getField(processor, "executor"); - Assert.assertTrue(er.getParsedStmt().getUserInfo() != null); - } - - @Test - public void testQueryFail(@Mocked StmtExecutor executor) throws Exception { - ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - - // Mock statement executor - new Expectations() { - { - executor.execute(); - minTimes = 0; - result = new IOException("Fail"); - - executor.getQueryStatisticsForAuditLog(); - minTimes = 0; - result = statistics; - } - }; - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); - } - - @Test - public void testQueryFail2(@Mocked StmtExecutor executor) throws Exception { - ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - - // Mock statement executor - new Expectations() { - { - executor.execute(); - minTimes = 0; - result = new NullPointerException("Fail"); - - executor.getQueryStatisticsForAuditLog(); - minTimes = 0; - result = statistics; - } - }; - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket); - } - - @Test - public void testMultiQuery(@Mocked StmtExecutor executor) throws Exception { - ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - - // Mock statement executor - new Expectations() { - { - executor.getQueryStatisticsForAuditLog(); - minTimes = 0; - result = statistics; - - executor.execute(); - times = 3; - } - }; - - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); - Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.OK); - Assert.assertEquals("drop table a", ctx.getAuditEventBuilder().build().stmt); - } - - @Test - public void testMultiQueryFail1(@Mocked StmtExecutor executor) throws Exception { - ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - - // Mock statement executor - new Expectations() { - { - executor.getQueryStatisticsForAuditLog(); - minTimes = 0; - result = statistics; - - executor.execute(); - times = 1; - result = new IOException("Fail"); - } - }; - - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); - Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.ERR); - Assert.assertEquals(QueryState.MysqlStateType.ERR.name(), ctx.getAuditEventBuilder().build().state); - Assert.assertEquals("select * from a", ctx.getAuditEventBuilder().build().stmt); - } - - @Test - public void testMultiQueryFail2(@Mocked StmtExecutor executor) throws Exception { - ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - - // Mock statement executor - new Expectations() { - { - executor.getQueryStatisticsForAuditLog(); - minTimes = 0; - result = statistics; - - executor.execute(); - result = null; - result = new IOException("Fail"); - } - }; - - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); - Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.ERR); - Assert.assertEquals(QueryState.MysqlStateType.ERR.name(), ctx.getAuditEventBuilder().build().state); - Assert.assertEquals("select * from b", ctx.getAuditEventBuilder().build().stmt); - } - - @Test - public void testFieldList() throws Exception { - ConnectContext ctx = initMockContext(mockChannel(fieldListPacket), AccessTestUtil.fetchAdminCatalog()); - - myContext.setDatabase("testCluster:testDb"); - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlEofPacket); - } - - @Test - public void testFieldListFailEmptyTable() throws Exception { - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(4); - serializer.writeNulTerminateString(""); - serializer.writeEofString(""); - - ConnectContext ctx = initMockContext(mockChannel(serializer.toByteBuffer()), - AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket); - Assert.assertEquals("Empty tableName", myContext.getState().getErrorMessage()); - } - - @Test - public void testFieldListFailNoDb() throws Exception { - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(4); - serializer.writeNulTerminateString("testTable"); - serializer.writeEofString(""); - - myContext.setDatabase("testCluster:emptyDb"); - ConnectContext ctx = initMockContext(mockChannel(serializer.toByteBuffer()), - AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket); - Assert.assertEquals("Unknown database(testCluster:emptyDb)", myContext.getState().getErrorMessage()); - } - - @Test - public void testFieldListFailNoTable() throws Exception { - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(4); - serializer.writeNulTerminateString("emptyTable"); - serializer.writeEofString(""); - - myContext.setDatabase("testCluster:testDb"); - ConnectContext ctx = initMockContext(mockChannel(serializer.toByteBuffer()), - AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket); - Assert.assertEquals("Unknown table(emptyTable)", myContext.getState().getErrorMessage()); - } - - @Test - public void testUnsupportedCommand() throws Exception { - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(5); - ByteBuffer packet = serializer.toByteBuffer(); - ConnectContext ctx = initMockContext(mockChannel(packet), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_CREATE_DB, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket); - Assert.assertFalse(myContext.isKilled()); - } - - @Test - public void testUnknownCommand() throws Exception { - MysqlSerializer serializer = MysqlSerializer.newInstance(); - serializer.writeInt1(101); - ByteBuffer packet = serializer.toByteBuffer(); - ConnectContext ctx = initMockContext(mockChannel(packet), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.processOnce(); - Assert.assertEquals(MysqlCommand.COM_SLEEP, myContext.getCommand()); - Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket); - Assert.assertFalse(myContext.isKilled()); - } - - @Test - public void testNullPacket() throws Exception { - ConnectContext ctx = initMockContext(mockChannel(null), AccessTestUtil.fetchAdminCatalog()); - - ConnectProcessor processor = new ConnectProcessor(ctx); - processor.loop(); - Assert.assertTrue(myContext.isKilled()); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java index 4187fd6156..c5980bd5ea 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectSchedulerTest.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlProto; -import mockit.Delegate; import mockit.Expectations; import mockit.Mocked; import org.junit.Assert; @@ -65,28 +64,9 @@ public class ConnectSchedulerTest { @Test public void testSubmit(@Mocked ConnectProcessor processor) throws Exception { - // mock new processor - new Expectations() { - { - processor.loop(); - result = new Delegate() { - void fakeLoop() { - LOG.warn("starts loop"); - // Make cancel thread to work - succSubmit.incrementAndGet(); - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - LOG.warn("sleep exception"); - } - } - }; - } - }; - ConnectScheduler scheduler = new ConnectScheduler(10); for (int i = 0; i < 2; ++i) { - ConnectContext context = new ConnectContext(socketChannel); + ConnectContext context = new ConnectContext(); if (i == 1) { context.setEnv(AccessTestUtil.fetchBlockCatalog()); } else { @@ -100,16 +80,9 @@ public class ConnectSchedulerTest { @Test public void testProcessException(@Mocked ConnectProcessor processor) throws Exception { - new Expectations() { - { - processor.loop(); - result = new RuntimeException("failed"); - } - }; - ConnectScheduler scheduler = new ConnectScheduler(10); - ConnectContext context = new ConnectContext(socketChannel); + ConnectContext context = new ConnectContext(); context.setEnv(AccessTestUtil.fetchAdminCatalog()); context.setQualifiedUser("root"); Assert.assertTrue(scheduler.submit(context)); @@ -128,7 +101,7 @@ public class ConnectSchedulerTest { @Test public void testSubmitTooMany() throws InterruptedException { ConnectScheduler scheduler = new ConnectScheduler(0); - ConnectContext context = new ConnectContext(socketChannel); + ConnectContext context = new ConnectContext(); Assert.assertTrue(scheduler.submit(context)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index 42fa87e28a..941a587a6d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -73,7 +73,7 @@ public class CoordinatorTest extends Coordinator { @Mocked static FrontendOptions frontendOptions; - static ConnectContext context = new ConnectContext(null); + static ConnectContext context = new ConnectContext(); static Analyzer analyzer = new Analyzer(env, context); static OriginalPlanner originalPlanner = new OriginalPlanner(analyzer); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java index 59d9dea337..c4c103e786 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java @@ -128,7 +128,7 @@ public class PartitionCacheTest { MetricRepo.init(); try { FrontendOptions.init(); - context = new ConnectContext(null); + context = new ConnectContext(); Config.cache_enable_sql_mode = true; Config.cache_enable_partition_mode = true; context.getSessionVariable().setEnableSqlCache(true); @@ -235,6 +235,14 @@ public class PartitionCacheTest { QueryState state = new QueryState(); channel.reset(); + new Expectations(channel) { + { + channel.getSerializer(); + minTimes = 0; + result = MysqlSerializer.newInstance(); + } + }; + new Expectations(ctx) { { ctx.getMysqlChannel(); @@ -245,10 +253,6 @@ public class PartitionCacheTest { minTimes = 0; result = clusterName; - ctx.getSerializer(); - minTimes = 0; - result = MysqlSerializer.newInstance(); - ctx.getEnv(); minTimes = 0; result = env; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SetExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SetExecutorTest.java index 0c99f7665a..00b9170f69 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SetExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SetExecutorTest.java @@ -52,7 +52,7 @@ public class SetExecutorTest { @Before public void setUp() throws DdlException { analyzer = AccessTestUtil.fetchAdminAnalyzer(false); - ctx = new ConnectContext(null); + ctx = new ConnectContext(); ctx.setEnv(AccessTestUtil.fetchAdminCatalog()); ctx.setQualifiedUser("root"); ctx.setRemoteIP("192.168.1.1"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ShowExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ShowExecutorTest.java index 98934f3138..da1af7702e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ShowExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ShowExecutorTest.java @@ -85,7 +85,7 @@ public class ShowExecutorTest { @Before public void setUp() throws Exception { - ctx = new ConnectContext(null); + ctx = new ConnectContext(); ctx.setCommand(MysqlCommand.COM_SLEEP); Column column1 = new Column("col1", PrimitiveType.BIGINT); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 39603385b4..24d46ce1ca 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -57,7 +57,6 @@ import org.junit.Test; import java.io.IOException; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; import java.util.List; import java.util.SortedMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,10 +65,8 @@ public class StmtExecutorTest { private ConnectContext ctx; private QueryState state; private ConnectScheduler scheduler; - private MysqlChannel channel = null; - @Mocked - SocketChannel socketChannel; + private MysqlChannel channel = null; @BeforeClass public static void start() { @@ -86,13 +83,12 @@ public class StmtExecutorTest { public void setUp() throws IOException { state = new QueryState(); scheduler = new ConnectScheduler(10); - ctx = new ConnectContext(socketChannel); + ctx = new ConnectContext(); SessionVariable sessionVariable = new SessionVariable(); MysqlSerializer serializer = MysqlSerializer.newInstance(); Env env = AccessTestUtil.fetchAdminCatalog(); - channel = new MysqlChannel(socketChannel); new Expectations(channel) { { channel.sendOnePacket((ByteBuffer) any); @@ -100,6 +96,10 @@ public class StmtExecutorTest { channel.reset(); minTimes = 0; + + channel.getSerializer(); + minTimes = 0; + result = serializer; } }; @@ -109,10 +109,6 @@ public class StmtExecutorTest { minTimes = 0; result = channel; - ctx.getSerializer(); - minTimes = 0; - result = serializer; - ctx.getEnv(); minTimes = 0; result = env; diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index d9c44e8c77..35f730726a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -92,7 +92,6 @@ import java.io.StringReader; import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; -import java.nio.channels.SocketChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -190,8 +189,7 @@ public abstract class TestWithFeService { } protected ConnectContext createCtx(UserIdentity user, String host) throws IOException { - SocketChannel channel = SocketChannel.open(); - ConnectContext ctx = new ConnectContext(channel); + ConnectContext ctx = new ConnectContext(); ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); ctx.setCurrentUserIdentity(user); ctx.setQualifiedUser(user.getQualifiedUser()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index dd7a6647f0..a14e5319e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -68,7 +68,6 @@ import java.io.StringReader; import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; -import java.nio.channels.SocketChannel; import java.nio.file.Files; import java.util.ConcurrentModificationException; import java.util.List; @@ -84,8 +83,7 @@ public class UtFrameUtils { // Help to create a mocked ConnectContext. public static ConnectContext createDefaultCtx(UserIdentity userIdentity, String remoteIp) throws IOException { - SocketChannel channel = SocketChannel.open(); - ConnectContext ctx = new ConnectContext(channel); + ConnectContext ctx = new ConnectContext(); ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); ctx.setCurrentUserIdentity(userIdentity); ctx.setQualifiedUser(userIdentity.getQualifiedUser());