[improvement](mysql) merge connect context and mysql channel and reduce send buffer memory (#17125)

This commit is contained in:
Mingyu Chen
2023-02-25 21:07:23 +08:00
committed by GitHub
parent e7f9819168
commit 50b423e09b
42 changed files with 269 additions and 1335 deletions

View File

@ -80,7 +80,7 @@ public class BaseController {
value.password = authInfo.password;
addSession(request, response, value);
ConnectContext ctx = new ConnectContext(null);
ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser(authInfo.fullUserName);
ctx.setRemoteIP(authInfo.remoteIp);
ctx.setCurrentUserIdentity(currentUser);
@ -133,7 +133,7 @@ public class BaseController {
updateCookieAge(request, PALO_SESSION_ID, PALO_SESSION_EXPIRED_TIME, response);
ConnectContext ctx = new ConnectContext(null);
ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser(sessionValue.currentUser.getQualifiedUser());
ctx.setRemoteIP(request.getRemoteHost());
ctx.setCurrentUserIdentity(sessionValue.currentUser);

View File

@ -56,7 +56,7 @@ public class RestBaseController extends BaseController {
ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
// check password
UserIdentity currentUser = checkPassword(authInfo);
ConnectContext ctx = new ConnectContext(null);
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQualifiedUser(authInfo.fullUserName);
ctx.setRemoteIP(authInfo.remoteIp);

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.ByteBufferNetworkInputStream;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
@ -129,10 +130,11 @@ public class MysqlLoadManager {
}
private void replyClientForReadFile(ConnectContext context, String path) throws IOException {
context.getSerializer().reset();
context.getSerializer().writeByte((byte) 0xfb);
context.getSerializer().writeEofString(path);
context.getMysqlChannel().sendAndFlush(context.getSerializer().toByteBuffer());
MysqlSerializer serializer = context.getMysqlChannel().getSerializer();
serializer.reset();
serializer.writeByte((byte) 0xfb);
serializer.writeEofString(path);
context.getMysqlChannel().sendAndFlush(serializer.toByteBuffer());
}
private void fillByteBufferAsync(ConnectContext context, ByteBufferNetworkInputStream inputStream) {

View File

@ -26,7 +26,6 @@ import java.util.Map;
public class MTMVTaskContext {
ConnectContext ctx;
String query;
String remoteIp;
Map<String, String> properties;
MTMVTask task;
MTMVJob job;
@ -63,14 +62,6 @@ public class MTMVTaskContext {
this.query = query;
}
public String getRemoteIp() {
return remoteIp;
}
public void setRemoteIp(String remoteIp) {
this.remoteIp = remoteIp;
}
public Map<String, String> getProperties() {
return properties;
}

View File

@ -108,7 +108,6 @@ public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
ctx.setQueryId(queryId);
taskContext.setCtx(ctx);
taskContext.setRemoteIp(ctx.getRemoteIp());
taskContext.setTask(task);
taskContext.setJob(job);

View File

@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql.nio;
package org.apache.doris.mysql;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.ConnectScheduler;
@ -55,7 +54,7 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
LOG.debug("Connection established. remote={}", connection.getPeerAddress());
// connection has been established, so need to call context.cleanup()
// if exception happens.
NConnectContext context = new NConnectContext(connection);
ConnectContext context = new ConnectContext(connection);
context.setEnv(Env.getCurrentEnv());
connectScheduler.submit(context);

View File

@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* DummyMysqlChannel do nothing, just for making unit test happy.
* So that in unit test, we don't need to check if serializer is null.
* And don't need to allocate a real ByteBuffer.
*/
public class DummyMysqlChannel extends MysqlChannel {
public DummyMysqlChannel() {
this.serializer = MysqlSerializer.newInstance();
}
public void setSequenceId(int sequenceId) {
this.sequenceId = sequenceId;
}
@Override
public String getRemoteIp() {
return "";
}
@Override
public String getRemoteHostPortString() {
return "";
}
@Override
public void close() {
}
@Override
protected int readAll(ByteBuffer dstBuf) throws IOException {
return 0;
}
@Override
public ByteBuffer fetchOnePacket() throws IOException {
return ByteBuffer.allocate(0);
}
@Override
public void flush() throws IOException {
}
@Override
public void sendOnePacket(ByteBuffer packet) throws IOException {
}
@Override
public void sendAndFlush(ByteBuffer packet) throws IOException {
}
@Override
public void reset() {
}
public MysqlSerializer getSerializer() {
return serializer;
}
}

View File

@ -17,13 +17,18 @@
package org.apache.doris.mysql;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.xnio.StreamConnection;
import org.xnio.channels.Channels;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* This class used to read/write MySQL logical packet.
@ -31,59 +36,54 @@ import java.nio.channels.SocketChannel;
* http://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
*/
public class MysqlChannel {
// logger for this class
private static final Logger LOG = LogManager.getLogger(MysqlChannel.class);
// max length which one MySQL physical can hold, if one logical packet is bigger than this,
// one packet will split to many packets
public static final int MAX_PHYSICAL_PACKET_LENGTH = 0xffffff;
// MySQL packet header length
protected static final int PACKET_HEADER_LEN = 4;
// logger for this class
protected static final Logger LOG = LogManager.getLogger(MysqlChannel.class);
// next sequence id to receive or send
protected int sequenceId;
// channel connected with client
protected SocketChannel channel;
private StreamConnection conn;
// used to receive/send header, avoiding new this many time.
protected ByteBuffer headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
protected ByteBuffer headerByteBuffer;
protected ByteBuffer defaultBuffer;
// default packet byte buffer for most packet
protected ByteBuffer defaultBuffer = ByteBuffer.allocate(16 * 1024);
protected ByteBuffer sendBuffer;
// for log and show
protected String remoteHostPortString;
protected String remoteIp;
protected boolean isSend;
// Serializer used to pack MySQL packet.
protected volatile MysqlSerializer serializer;
protected MysqlChannel() {
this.sequenceId = 0;
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
this.isSend = false;
this.remoteHostPortString = "";
this.remoteIp = "";
// For DummyMysqlChannel
}
public MysqlChannel(SocketChannel channel) {
public MysqlChannel(StreamConnection connection) {
Preconditions.checkNotNull(connection);
this.sequenceId = 0;
this.channel = channel;
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
this.isSend = false;
this.remoteHostPortString = "";
this.remoteIp = "";
if (channel != null) {
try {
if (channel.getRemoteAddress() instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
// avoid calling getHostName() which may trigger a name service reverse lookup
remoteHostPortString = address.getHostString() + ":" + address.getPort();
remoteIp = address.getAddress().getHostAddress();
} else if (channel.getRemoteAddress() != null) {
// Reach here, what's it?
remoteHostPortString = channel.getRemoteAddress().toString();
remoteIp = channel.getRemoteAddress().toString();
}
} catch (Exception e) {
LOG.warn("get remote host string failed: ", e);
}
this.conn = connection;
if (connection.getPeerAddress() instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) connection.getPeerAddress();
remoteHostPortString = address.getHostString() + ":" + address.getPort();
remoteIp = address.getAddress().getHostAddress();
} else {
// Reach here, what's it?
remoteHostPortString = connection.getPeerAddress().toString();
remoteIp = connection.getPeerAddress().toString();
}
// The serializer and buffers should only be created if this is a real MysqlChannel
this.serializer = MysqlSerializer.newInstance();
this.defaultBuffer = ByteBuffer.allocate(16 * 1024);
this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
}
public void setSequenceId(int sequenceId) {
@ -114,7 +114,7 @@ public class MysqlChannel {
// Close channel
public void close() {
try {
channel.close();
conn.close();
} catch (IOException e) {
LOG.warn("Close channel exception, ignore.");
}
@ -122,13 +122,18 @@ public class MysqlChannel {
protected int readAll(ByteBuffer dstBuf) throws IOException {
int readLen = 0;
while (dstBuf.remaining() != 0) {
int ret = channel.read(dstBuf);
// return -1 when remote peer close the channel
if (ret == -1) {
return readLen;
try {
while (dstBuf.remaining() != 0) {
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf);
// return -1 when remote peer close the channel
if (ret == -1) {
return readLen;
}
readLen += ret;
}
readLen += ret;
} catch (IOException e) {
LOG.debug("Read channel exception, ignore.", e);
return 0;
}
return readLen;
}
@ -188,12 +193,12 @@ public class MysqlChannel {
protected void realNetSend(ByteBuffer buffer) throws IOException {
long bufLen = buffer.remaining();
long writeLen = channel.write(buffer);
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
if (bufLen != writeLen) {
throw new IOException("Write mysql packet failed.[write=" + writeLen
+ ", needToWrite=" + bufLen + "]");
}
channel.write(buffer);
Channels.flushBlocking(conn.getSinkChannel());
isSend = true;
}
@ -280,4 +285,25 @@ public class MysqlChannel {
public String getRemoteHostPortString() {
return remoteHostPortString;
}
public void startAcceptQuery(ConnectContext connectContext, ConnectProcessor connectProcessor) {
conn.getSourceChannel().setReadListener(new ReadListener(connectContext, connectProcessor));
conn.getSourceChannel().resumeReads();
}
public void suspendAcceptQuery() {
conn.getSourceChannel().suspendReads();
}
public void resumeAcceptQuery() {
conn.getSourceChannel().resumeReads();
}
public void stopAcceptQuery() throws IOException {
conn.getSourceChannel().shutdownReads();
}
public MysqlSerializer getSerializer() {
return serializer;
}
}

View File

@ -122,8 +122,8 @@ public class MysqlProto {
// send response packet(OK/EOF/ERR).
// before call this function, should set information in state of ConnectContext
public static void sendResponsePacket(ConnectContext context) throws IOException {
MysqlSerializer serializer = context.getSerializer();
MysqlChannel channel = context.getMysqlChannel();
MysqlSerializer serializer = channel.getSerializer();
MysqlPacket packet = context.getState().toResponsePacket();
// send response packet to client
@ -156,8 +156,8 @@ public class MysqlProto {
* IOException:
*/
public static boolean negotiate(ConnectContext context) throws IOException {
MysqlSerializer serializer = context.getSerializer();
MysqlChannel channel = context.getMysqlChannel();
MysqlSerializer serializer = channel.getSerializer();
context.getState().setOk();
// Server send handshake packet to client.

View File

@ -31,23 +31,19 @@ public class MysqlSerializer {
private ByteArrayOutputStream out;
private MysqlCapability capability;
public MysqlSerializer(ByteArrayOutputStream out) {
this(out, MysqlCapability.DEFAULT_CAPABILITY);
}
public MysqlSerializer(ByteArrayOutputStream out, MysqlCapability capability) {
this.out = out;
this.capability = capability;
}
public static MysqlSerializer newInstance() {
return new MysqlSerializer(new ByteArrayOutputStream());
return new MysqlSerializer(new ByteArrayOutputStream(), MysqlCapability.DEFAULT_CAPABILITY);
}
public static MysqlSerializer newInstance(MysqlCapability capability) {
return new MysqlSerializer(new ByteArrayOutputStream(), capability);
}
private MysqlSerializer(ByteArrayOutputStream out, MysqlCapability capability) {
this.out = out;
this.capability = capability;
}
// used after success handshake
public void setCapability(MysqlCapability capability) {
this.capability = capability;

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql.nio;
package org.apache.doris.mysql;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
@ -38,8 +38,8 @@ import java.util.concurrent.ExecutorService;
/**
* mysql protocol implementation based on nio.
*/
public class NMysqlServer {
private static final Logger LOG = LogManager.getLogger(NMysqlServer.class);
public class MysqlServer {
private static final Logger LOG = LogManager.getLogger(MysqlServer.class);
private int port;
private volatile boolean running;
@ -54,7 +54,7 @@ public class NMysqlServer {
private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(
Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true);
public NMysqlServer(int port, ConnectScheduler connectScheduler) {
public MysqlServer(int port, ConnectScheduler connectScheduler) {
this.port = port;
this.xnioWorker = Xnio.getInstance().createWorkerBuilder()
.setWorkerName("doris-mysql-nio")

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql.nio;
package org.apache.doris.mysql;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
@ -31,11 +31,11 @@ import org.xnio.conduits.ConduitStreamSourceChannel;
*/
public class ReadListener implements ChannelListener<ConduitStreamSourceChannel> {
private static final Logger LOG = LogManager.getLogger(ReadListener.class);
private NConnectContext ctx;
private ConnectContext ctx;
private ConnectProcessor connectProcessor;
public ReadListener(NConnectContext nConnectContext, ConnectProcessor connectProcessor) {
this.ctx = nConnectContext;
public ReadListener(ConnectContext connectContext, ConnectProcessor connectProcessor) {
this.ctx = connectContext;
this.connectProcessor = connectProcessor;
}

View File

@ -1,69 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql.nio;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.xnio.StreamConnection;
import java.io.IOException;
/**
* connect context based on nio.
*/
public class NConnectContext extends ConnectContext {
protected NMysqlChannel mysqlChannel;
public NConnectContext(StreamConnection connection) {
super();
mysqlChannel = new NMysqlChannel(connection);
}
@Override
public void cleanup() {
mysqlChannel.close();
returnRows = 0;
}
@Override
public NMysqlChannel getMysqlChannel() {
return mysqlChannel;
}
public void startAcceptQuery(ConnectProcessor connectProcessor) {
mysqlChannel.startAcceptQuery(this, connectProcessor);
}
public void suspendAcceptQuery() {
mysqlChannel.suspendAcceptQuery();
}
public void resumeAcceptQuery() {
mysqlChannel.resumeAcceptQuery();
}
public void stopAcceptQuery() throws IOException {
mysqlChannel.stopAcceptQuery();
}
@Override
public String toString() {
return "[remote ip: " + mysqlChannel.getRemoteIp() + "]";
}
}

View File

@ -1,122 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql.nio;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.xnio.StreamConnection;
import org.xnio.channels.Channels;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
/**
* mysql Channel based on nio.
*/
public class NMysqlChannel extends MysqlChannel {
protected static final Logger LOG = LogManager.getLogger(NMysqlChannel.class);
private StreamConnection conn;
public NMysqlChannel(StreamConnection connection) {
super();
this.conn = connection;
if (connection.getPeerAddress() instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) connection.getPeerAddress();
remoteHostPortString = address.getHostString() + ":" + address.getPort();
remoteIp = address.getAddress().getHostAddress();
} else {
// Reach here, what's it?
remoteHostPortString = connection.getPeerAddress().toString();
remoteIp = connection.getPeerAddress().toString();
}
}
/**
* read packet until whole dstBuf is filled, unless block.
* Todo: find a better way to avoid block read here.
*
* @param dstBuf
* @return
*/
@Override
protected int readAll(ByteBuffer dstBuf) {
int readLen = 0;
try {
while (dstBuf.remaining() != 0) {
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf);
// return -1 when remote peer close the channel
if (ret == -1) {
return readLen;
}
readLen += ret;
}
} catch (IOException e) {
LOG.debug("Read channel exception, ignore.", e);
return 0;
}
return readLen;
}
/**
* write packet until no data is remained, unless block.
*
* @param buffer
* @throws IOException
*/
@Override
protected void realNetSend(ByteBuffer buffer) throws IOException {
long bufLen = buffer.remaining();
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
if (bufLen != writeLen) {
throw new IOException("Write mysql packet failed.[write=" + writeLen
+ ", needToWrite=" + bufLen + "]");
}
Channels.flushBlocking(conn.getSinkChannel());
isSend = true;
}
@Override
public void close() {
try {
conn.close();
} catch (IOException e) {
LOG.warn("Close channel exception, ignore.");
}
}
public void startAcceptQuery(NConnectContext nConnectContext, ConnectProcessor connectProcessor) {
conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor));
conn.getSourceChannel().resumeReads();
}
public void suspendAcceptQuery() {
conn.getSourceChannel().suspendReads();
}
public void resumeAcceptQuery() {
conn.getSourceChannel().resumeReads();
}
public void stopAcceptQuery() throws IOException {
conn.getSourceChannel().shutdownReads();
}
}

View File

@ -28,10 +28,10 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.mysql.DummyMysqlChannel;
import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
@ -47,8 +47,9 @@ import com.google.common.collect.Sets;
import io.opentelemetry.api.trace.Tracer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.xnio.StreamConnection;
import java.nio.channels.SocketChannel;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -60,7 +61,7 @@ import java.util.Set;
// Use `volatile` to make the reference change atomic.
public class ConnectContext {
private static final Logger LOG = LogManager.getLogger(ConnectContext.class);
protected static ThreadLocal<ConnectContext> threadLocalInfo = new ThreadLocal<ConnectContext>();
protected static ThreadLocal<ConnectContext> threadLocalInfo = new ThreadLocal<>();
// set this id before analyze
protected volatile long stmtId;
@ -98,8 +99,6 @@ public class ConnectContext {
// In other word, currentUserIdentity is the entry that matched in Doris auth table.
// This account determines user's access privileges.
protected volatile UserIdentity currentUserIdentity;
// Serializer used to pack MySQL packet.
protected volatile MysqlSerializer serializer;
// Variables belong to this session.
protected volatile SessionVariable sessionVariable;
// Scheduler this connection belongs to
@ -128,7 +127,7 @@ public class ConnectContext {
// This is used to statistic the current query details.
// This property will only be set when the query starts to execute.
// So in the query planning stage, do not use any value in this attribute.
protected QueryDetail queryDetail;
protected QueryDetail queryDetail = null;
// If set to true, the nondeterministic function will not be rewrote to constant.
private boolean notEvalNondeterministicFunction = false;
@ -157,7 +156,7 @@ public class ConnectContext {
* <p>
* when a connection is established, exec_timeout is set by query_timeout, when the statement is an insert stmt,
* then it is set to max(query_timeout, insert_timeout) with {@link #resetExecTimeout()} in
* {@link ConnectProcessor#handleQuery()} after the StmtExecutor is specified.
* after the StmtExecutor is specified.
*/
private int executionTimeoutS;
@ -213,19 +212,18 @@ public class ConnectContext {
this(null);
}
public ConnectContext(SocketChannel channel) {
public ConnectContext(StreamConnection connection) {
state = new QueryState();
returnRows = 0;
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
isKilled = false;
mysqlChannel = new MysqlChannel(channel);
serializer = MysqlSerializer.newInstance();
if (connection != null) {
mysqlChannel = new MysqlChannel(connection);
} else {
mysqlChannel = new DummyMysqlChannel();
}
sessionVariable = VariableMgr.newSessionVariable();
command = MysqlCommand.COM_SLEEP;
if (channel != null) {
remoteIP = mysqlChannel.getRemoteIp();
}
queryDetail = null;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
}
@ -403,10 +401,6 @@ public class ConnectContext {
returnRows = 0;
}
public MysqlSerializer getSerializer() {
return serializer;
}
public int getConnectionId() {
return connectionId;
}
@ -484,7 +478,9 @@ public class ConnectContext {
}
public void cleanup() {
mysqlChannel.close();
if (mysqlChannel != null) {
mysqlChannel.close();
}
threadLocalInfo.remove();
returnRows = 0;
}
@ -644,10 +640,6 @@ public class ConnectContext {
return currentConnectedFEIp;
}
public String getRemoteIp() {
return mysqlChannel == null ? "" : mysqlChannel.getRemoteIp();
}
public void resetExecTimeout() {
if (executor != null && executor.isInsertStmt()) {
// particular timeout for insert stmt, we can make other particular timeout in the same way.
@ -686,6 +678,23 @@ public class ConnectContext {
}
}
public void startAcceptQuery(ConnectProcessor connectProcessor) {
mysqlChannel.startAcceptQuery(this, connectProcessor);
}
public void suspendAcceptQuery() {
mysqlChannel.suspendAcceptQuery();
}
public void resumeAcceptQuery() {
mysqlChannel.resumeAcceptQuery();
}
public void stopAcceptQuery() throws IOException {
mysqlChannel.stopAcceptQuery();
}
public String getQueryIdentifier() {
return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
}

View File

@ -510,8 +510,8 @@ public class ConnectProcessor {
table.readLock();
try {
MysqlSerializer serializer = ctx.getSerializer();
MysqlChannel channel = ctx.getMysqlChannel();
MysqlSerializer serializer = channel.getSerializer();
// Send fields
// NOTE: Field list doesn't send number of fields
@ -594,7 +594,7 @@ public class ConnectProcessor {
return null;
}
MysqlSerializer serializer = ctx.getSerializer();
MysqlSerializer serializer = ctx.getMysqlChannel().getSerializer();
serializer.reset();
packet.writeTo(serializer);
return serializer.toByteBuffer();

View File

@ -19,11 +19,8 @@ package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.nio.NConnectContext;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.thrift.TUniqueId;
@ -87,13 +84,7 @@ public class ConnectScheduler {
if (context == null) {
return false;
}
context.setConnectionId(nextConnectionId.getAndAdd(1));
// no necessary for nio.
if (context instanceof NConnectContext) {
return true;
}
executor.submit(new LoopHandler(context));
return true;
}
@ -166,48 +157,4 @@ public class ConnectScheduler {
TUniqueId queryId = traceId2QueryId.get(traceId);
return queryId == null ? "" : DebugUtil.printId(queryId);
}
private class LoopHandler implements Runnable {
ConnectContext context;
LoopHandler(ConnectContext context) {
this.context = context;
}
@Override
public void run() {
try {
// Set thread local info
context.setThreadLocalInfo();
context.setConnectScheduler(ConnectScheduler.this);
// authenticate check failed.
if (!MysqlProto.negotiate(context)) {
return;
}
if (registerConnection(context)) {
MysqlProto.sendResponsePacket(context);
} else {
context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
MysqlProto.sendResponsePacket(context);
return;
}
context.setUserQueryTimeout(context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
context.setStartTime();
ConnectProcessor processor = new ConnectProcessor(context);
processor.loop();
} catch (Exception e) {
// for unauthorized access such lvs probe request, may cause exception, just log it in debug level
if (context.getCurrentUserIdentity() != null) {
LOG.warn("connect processor exception because ", e);
} else {
LOG.debug("connect processor exception because ", e);
}
} finally {
unregisterConnection(context);
context.cleanup();
}
}
}
}

View File

@ -17,7 +17,7 @@
package org.apache.doris.qe;
import org.apache.doris.mysql.nio.NMysqlServer;
import org.apache.doris.mysql.MysqlServer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -31,7 +31,7 @@ public class QeService {
private int port;
// MySQL protocol service
private NMysqlServer mysqlServer;
private MysqlServer mysqlServer;
@Deprecated
public QeService(int port) {
@ -40,7 +40,7 @@ public class QeService {
public QeService(int port, ConnectScheduler scheduler) {
this.port = port;
this.mysqlServer = new NMysqlServer(port, scheduler);
this.mysqlServer = new MysqlServer(port, scheduler);
}
public void start() throws Exception {

View File

@ -202,7 +202,7 @@ public class StmtExecutor implements ProfileWriter {
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
this.context = context;
this.originStmt = originStmt;
this.serializer = context.getSerializer();
this.serializer = context.getMysqlChannel().getSerializer();
this.isProxy = isProxy;
this.statementContext = new StatementContext(context, originStmt);
this.context.setStatementContext(statementContext);
@ -218,7 +218,7 @@ public class StmtExecutor implements ProfileWriter {
this.context = ctx;
this.parsedStmt = parsedStmt;
this.originStmt = parsedStmt.getOrigStmt();
this.serializer = context.getSerializer();
this.serializer = context.getMysqlChannel().getSerializer();
this.isProxy = false;
if (parsedStmt instanceof LogicalPlanAdapter) {
this.statementContext = ((LogicalPlanAdapter) parsedStmt).getStatementContext();

View File

@ -776,7 +776,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
// add this log so that we can track this stmt
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), clientAddr.getHostname());
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext();
// Set current connected FE to the client address, so that we can know where this request come from.
context.setCurrentConnectedFEIp(clientAddr.getHostname());
ConnectProcessor processor = new ConnectProcessor(context);

View File

@ -439,7 +439,7 @@ public class AccessTestUtil {
public static Analyzer fetchAdminAnalyzer(boolean withCluster) {
final String prefix = "testCluster:";
Analyzer analyzer = new Analyzer(fetchAdminCatalog(), new ConnectContext(null));
Analyzer analyzer = new Analyzer(fetchAdminCatalog(), new ConnectContext());
new Expectations(analyzer) {
{
analyzer.getDefaultCatalog();
@ -475,7 +475,7 @@ public class AccessTestUtil {
}
public static Analyzer fetchBlockAnalyzer() throws AnalysisException {
Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext(null));
Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext());
new Expectations(analyzer) {
{
analyzer.getDefaultCatalog();
@ -499,7 +499,7 @@ public class AccessTestUtil {
}
public static Analyzer fetchEmptyDbAnalyzer() {
Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext(null));
Analyzer analyzer = new Analyzer(fetchBlockCatalog(), new ConnectContext());
new Expectations(analyzer) {
{
analyzer.getDefaultCatalog();
@ -612,7 +612,7 @@ public class AccessTestUtil {
};
Env env = fetchBlockCatalog();
Analyzer analyzer = new Analyzer(env, new ConnectContext(null));
Analyzer analyzer = new Analyzer(env, new ConnectContext());
new Expectations(analyzer) {
{
analyzer.getDefaultCatalog();
@ -649,7 +649,7 @@ public class AccessTestUtil {
analyzer.getContext();
minTimes = 0;
result = new ConnectContext(null);
result = new ConnectContext();
}
};

View File

@ -45,7 +45,7 @@ public class CancelAlterStmtTest {
@Before
public void setUp() {
env = AccessTestUtil.fetchAdminCatalog();
ctx = new ConnectContext(null);
ctx = new ConnectContext();
ctx.setQualifiedUser("root");
ctx.setRemoteIP("192.168.1.1");

View File

@ -47,7 +47,7 @@ public class ColumnDefTest {
floatCol = new TypeDef(ScalarType.createType(PrimitiveType.FLOAT));
booleanCol = new TypeDef(ScalarType.createType(PrimitiveType.BOOLEAN));
ctx = new ConnectContext(null);
ctx = new ConnectContext();
new MockUp<ConnectContext>() {
@Mock
public ConnectContext get() {

View File

@ -35,7 +35,7 @@ public class CreateUserStmtTest {
@Before
public void setUp() {
ConnectContext ctx = new ConnectContext(null);
ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser("root");
ctx.setRemoteIP("192.168.1.1");
UserIdentity currentUserIdentity = new UserIdentity("root", "192.168.1.1");

View File

@ -44,7 +44,7 @@ public class ShowAlterStmtTest {
FakeEnv.setEnv(env);
analyzer = new Analyzer(env, new ConnectContext(null));
analyzer = new Analyzer(env, new ConnectContext());
new Expectations(analyzer) {
{
analyzer.getDefaultDb();

View File

@ -136,7 +136,7 @@ public class SystemInfoServiceTest {
}
};
analyzer = new Analyzer(env, new ConnectContext(null));
analyzer = new Analyzer(env, new ConnectContext());
}
public void mkdir(String dirString) {

View File

@ -142,7 +142,7 @@ public class LdapAuthenticateTest {
}
private ConnectContext getContext() {
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext();
context.setEnv(env);
context.setThreadLocalInfo();
return context;

View File

@ -1,299 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mysql;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class MysqlChannelTest {
int packetId = 0;
int readIdx = 0;
@Mocked
private SocketChannel channel;
@Before
public void setUp() throws IOException {
packetId = 0;
readIdx = 0;
new Expectations() {
{
channel.getRemoteAddress();
minTimes = 0;
result = new InetSocketAddress(1024);
}
};
}
@Test
public void testReceive() throws IOException {
// mock
new Expectations() {
{
channel.read((ByteBuffer) any);
minTimes = 0;
result = new Delegate() {
int fakeRead(ByteBuffer buffer) {
MysqlSerializer serializer = MysqlSerializer.newInstance();
if (readIdx == 0) {
readIdx++;
serializer.writeInt3(10);
serializer.writeInt1(packetId++);
buffer.put(serializer.toArray());
return 4;
} else if (readIdx == 1) {
readIdx++;
byte[] buf = new byte[buffer.remaining()];
for (int i = 0; i < buffer.remaining(); ++i) {
buf[i] = (byte) ('a' + i);
}
buffer.put(buf);
return 10;
}
return -1;
}
};
}
};
MysqlChannel channel1 = new MysqlChannel(channel);
ByteBuffer buf = channel1.fetchOnePacket();
Assert.assertEquals(10, buf.remaining());
for (int i = 0; i < 10; ++i) {
Assert.assertEquals('a' + i, buf.get());
}
}
@Test
public void testLongPacket() throws IOException {
// mock
new Expectations() {
{
channel.read((ByteBuffer) any);
minTimes = 0;
result = new Delegate() {
int fakeRead(ByteBuffer buffer) {
int maxLen = MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH;
MysqlSerializer serializer = MysqlSerializer.newInstance();
if (readIdx == 0) {
// packet
readIdx++;
serializer.writeInt3(maxLen);
serializer.writeInt1(packetId++);
buffer.put(serializer.toArray());
return 4;
} else if (readIdx == 1) {
readIdx++;
int readLen = buffer.remaining();
byte[] buf = new byte[readLen];
for (int i = 0; i < readLen; ++i) {
buf[i] = (byte) ('a' + (i % 26));
}
buffer.put(buf);
return readLen;
} else if (readIdx == 2) {
// packet
readIdx++;
serializer.writeInt3(10);
serializer.writeInt1(packetId++);
buffer.put(serializer.toArray());
return 4;
} else if (readIdx == 3) {
readIdx++;
int readLen = buffer.remaining();
byte[] buf = new byte[readLen];
for (int i = 0; i < readLen; ++i) {
buf[i] = (byte) ('a' + (maxLen + i) % 26);
}
buffer.put(buf);
return readLen;
}
return 0;
}
};
}
};
MysqlChannel channel1 = new MysqlChannel(channel);
ByteBuffer buf = channel1.fetchOnePacket();
Assert.assertEquals(MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH + 10, buf.remaining());
for (int i = 0; i < MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH + 10; ++i) {
Assert.assertEquals('a' + (i % 26), buf.get());
}
}
@Test(expected = IOException.class)
public void testBadSeq() throws IOException {
// mock
new Expectations() {
{
channel.read((ByteBuffer) any);
minTimes = 0;
result = new Delegate() {
int fakeRead(ByteBuffer buffer) {
int maxLen = MysqlChannel.MAX_PHYSICAL_PACKET_LENGTH;
MysqlSerializer serializer = MysqlSerializer.newInstance();
if (readIdx == 0) {
// packet
readIdx++;
serializer.writeInt3(maxLen);
serializer.writeInt1(packetId++);
buffer.put(serializer.toArray());
return 4;
} else if (readIdx == 1) {
readIdx++;
int readLen = buffer.remaining();
byte[] buf = new byte[readLen];
for (int i = 0; i < readLen; ++i) {
buf[i] = (byte) ('a' + (i % 26));
}
buffer.put(buf);
return readLen;
} else if (readIdx == 2) {
// packet
readIdx++;
serializer.writeInt3(10);
// NOTE: Bad packet seq
serializer.writeInt1(0);
buffer.put(serializer.toArray());
return 4;
} else if (readIdx == 3) {
readIdx++;
byte[] buf = new byte[buffer.remaining()];
for (int i = 0; i < buffer.remaining(); ++i) {
buf[i] = (byte) ('a' + (i % 26));
}
buffer.put(buf);
return buffer.remaining();
}
return 0;
}
};
}
};
MysqlChannel channel1 = new MysqlChannel(channel);
channel1.fetchOnePacket();
}
@Test(expected = IOException.class)
public void testException() throws IOException {
// mock
new Expectations() {
{
channel.read((ByteBuffer) any);
minTimes = 0;
result = new IOException();
}
};
MysqlChannel channel1 = new MysqlChannel(channel);
channel1.fetchOnePacket();
Assert.fail("No Exception throws.");
}
@Test
public void testSend() throws IOException {
// mock
new Expectations() {
{
channel.write((ByteBuffer) any);
minTimes = 0;
result = new Delegate() {
int fakeWrite(ByteBuffer buffer) {
int writeLen = 0;
writeLen += buffer.remaining();
buffer.position(buffer.limit());
return writeLen;
}
};
}
};
MysqlChannel channel1 = new MysqlChannel(channel);
ByteBuffer buf = ByteBuffer.allocate(1000);
channel1.sendOnePacket(buf);
buf = ByteBuffer.allocate(0xffffff0);
channel1.sendOnePacket(buf);
}
@Test(expected = IOException.class)
public void testSendException() throws IOException {
// mock
new Expectations() {
{
channel.write((ByteBuffer) any);
minTimes = 0;
result = new IOException();
}
};
MysqlChannel channel1 = new MysqlChannel(channel);
ByteBuffer buf = ByteBuffer.allocate(1000);
channel1.sendOnePacket(buf);
buf = ByteBuffer.allocate(0xffffff0);
channel1.sendAndFlush(buf);
}
@Test(expected = IOException.class)
public void testSendFail() throws IOException {
// mock
new Expectations() {
{
channel.write((ByteBuffer) any);
minTimes = 0;
result = new Delegate() {
int fakeWrite(ByteBuffer buffer) {
int writeLen = 0;
writeLen += buffer.remaining();
buffer.position(buffer.limit());
return writeLen - 1;
}
};
}
};
MysqlChannel channel1 = new MysqlChannel(channel);
ByteBuffer buf = ByteBuffer.allocate(1000);
channel1.sendAndFlush(buf);
Assert.fail("No Exception throws.");
}
}

View File

@ -39,6 +39,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.xnio.StreamConnection;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@ -67,6 +68,8 @@ public class MysqlProtoTest {
private LdapAuthenticate ldapAuthenticate;
@Mocked
private MysqlClearTextPacket clearTextPacket;
@Mocked
StreamConnection streamConnection;
@Before
public void setUp() throws DdlException, AuthenticationException {
@ -242,7 +245,7 @@ public class MysqlProtoTest {
mockChannel("user", true);
mockPassword(true);
mockAccess();
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext(streamConnection);
context.setEnv(env);
context.setThreadLocalInfo();
Assert.assertTrue(MysqlProto.negotiate(context));
@ -253,7 +256,7 @@ public class MysqlProtoTest {
mockChannel("user", false);
mockPassword(true);
mockAccess();
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext(streamConnection);
MysqlProto.negotiate(context);
Assert.assertFalse(MysqlProto.negotiate(context));
}
@ -263,7 +266,7 @@ public class MysqlProtoTest {
mockChannel("user", true);
mockPassword(false);
mockAccess();
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext(streamConnection);
Assert.assertTrue(MysqlProto.negotiate(context));
}
@ -272,7 +275,7 @@ public class MysqlProtoTest {
mockChannel("", true);
mockPassword(true);
mockAccess();
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext(streamConnection);
Assert.assertFalse(MysqlProto.negotiate(context));
}
@ -283,7 +286,7 @@ public class MysqlProtoTest {
mockAccess();
mockMysqlClearTextPacket(PASSWORD_CLEAR_TEXT);
mockLdap("user", true);
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext(streamConnection);
context.setEnv(env);
context.setThreadLocalInfo();
Assert.assertTrue(MysqlProto.negotiate(context));
@ -297,7 +300,7 @@ public class MysqlProtoTest {
mockAccess();
mockMysqlClearTextPacket("654321");
mockLdap("user", true);
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext(streamConnection);
context.setEnv(env);
context.setThreadLocalInfo();
Assert.assertFalse(MysqlProto.negotiate(context));
@ -311,7 +314,7 @@ public class MysqlProtoTest {
mockAccess();
mockLdap("root", false);
mockMysqlClearTextPacket("654321");
ConnectContext context = new ConnectContext(null);
ConnectContext context = new ConnectContext(streamConnection);
context.setEnv(env);
context.setThreadLocalInfo();
Assert.assertTrue(MysqlProto.negotiate(context));

View File

@ -17,7 +17,6 @@
package org.apache.doris.mysql;
import org.apache.doris.mysql.nio.NMysqlServer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectScheduler;
@ -84,7 +83,7 @@ public class MysqlServerTest {
int port = socket.getLocalPort();
socket.close();
NMysqlServer server = new NMysqlServer(port, scheduler);
MysqlServer server = new MysqlServer(port, scheduler);
Assert.assertTrue(server.start());
// submit
@ -112,9 +111,9 @@ public class MysqlServerTest {
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
socket.close();
NMysqlServer server = new NMysqlServer(port, scheduler);
MysqlServer server = new MysqlServer(port, scheduler);
Assert.assertTrue(server.start());
NMysqlServer server1 = new NMysqlServer(port, scheduler);
MysqlServer server1 = new MysqlServer(port, scheduler);
Assert.assertFalse(server1.start());
server.stop();
@ -125,7 +124,7 @@ public class MysqlServerTest {
ServerSocket socket = new ServerSocket(0);
int port = socket.getLocalPort();
socket.close();
NMysqlServer server = new NMysqlServer(port, badScheduler);
MysqlServer server = new MysqlServer(port, badScheduler);
Assert.assertTrue(server.start());
// submit

View File

@ -85,7 +85,7 @@ public class SetPasswordTest {
CreateUserStmt stmt = new CreateUserStmt(new UserDesc(userIdentity));
auth.createUser(stmt);
ConnectContext ctx = new ConnectContext(null);
ConnectContext ctx = new ConnectContext();
// set password for 'cmy'@'%'
UserIdentity currentUser1 = new UserIdentity("default_cluster:cmy", "%");
currentUser1.setIsAnalyzed();

View File

@ -33,7 +33,6 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.nio.channels.SocketChannel;
/**
* MemoUtils.
@ -103,8 +102,7 @@ public class MemoTestUtils {
*/
public static ConnectContext createCtx(UserIdentity user, String host) {
try {
SocketChannel channel = SocketChannel.open();
ConnectContext ctx = new ConnectContext(channel);
ConnectContext ctx = new ConnectContext();
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
ctx.setCurrentUserIdentity(user);
ctx.setQualifiedUser(user.getQualifiedUser());

View File

@ -19,12 +19,10 @@ package org.apache.doris.qe;
import org.apache.doris.catalog.Env;
import org.apache.doris.mysql.MysqlCapability;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.thrift.TUniqueId;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
@ -34,8 +32,6 @@ import java.nio.channels.SocketChannel;
import java.util.List;
public class ConnectContextTest {
@Mocked
private MysqlChannel channel;
@Mocked
private StmtExecutor executor;
@Mocked
@ -51,28 +47,11 @@ public class ConnectContextTest {
@Before
public void setUp() throws Exception {
new Expectations() {
{
channel.getRemoteHostPortString();
minTimes = 0;
result = "127.0.0.1:12345";
channel.close();
minTimes = 0;
channel.getRemoteIp();
minTimes = 0;
result = "192.168.1.1";
executor.cancel();
minTimes = 0;
}
};
}
@Test
public void testNormal() {
ConnectContext ctx = new ConnectContext(socketChannel);
ConnectContext ctx = new ConnectContext();
// State
Assert.assertNotNull(ctx.getState());
@ -102,7 +81,7 @@ public class ConnectContextTest {
Assert.assertEquals("testCluster:testUser", ctx.getQualifiedUser());
// Serializer
Assert.assertNotNull(ctx.getSerializer());
Assert.assertNotNull(ctx.getMysqlChannel().getSerializer());
// Session variable
Assert.assertNotNull(ctx.getSessionVariable());
@ -126,7 +105,7 @@ public class ConnectContextTest {
Assert.assertEquals(9, row.size());
Assert.assertEquals("101", row.get(0));
Assert.assertEquals("testUser", row.get(1));
Assert.assertEquals("127.0.0.1:12345", row.get(2));
Assert.assertEquals("", row.get(2));
Assert.assertEquals("testCluster", row.get(3));
Assert.assertEquals("testDb", row.get(4));
Assert.assertEquals("Ping", row.get(5));
@ -154,7 +133,7 @@ public class ConnectContextTest {
@Test
public void testSleepTimeout() {
ConnectContext ctx = new ConnectContext(socketChannel);
ConnectContext ctx = new ConnectContext();
ctx.setCommand(MysqlCommand.COM_SLEEP);
// sleep no time out
@ -191,7 +170,7 @@ public class ConnectContextTest {
@Test
public void testOtherTimeout() {
ConnectContext ctx = new ConnectContext(socketChannel);
ConnectContext ctx = new ConnectContext();
ctx.setCommand(MysqlCommand.COM_QUERY);
// sleep no time out
@ -219,7 +198,7 @@ public class ConnectContextTest {
@Test
public void testThreadLocal() {
ConnectContext ctx = new ConnectContext(socketChannel);
ConnectContext ctx = new ConnectContext();
Assert.assertNull(ConnectContext.get());
ctx.setThreadLocalInfo();
Assert.assertNotNull(ConnectContext.get());

View File

@ -1,574 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
import org.apache.doris.mysql.MysqlErrPacket;
import org.apache.doris.mysql.MysqlOkPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.proto.Data;
import org.apache.doris.thrift.TUniqueId;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class ConnectProcessorTest {
private static ByteBuffer initDbPacket;
private static ByteBuffer pingPacket;
private static ByteBuffer quitPacket;
private static ByteBuffer queryPacket;
private static ByteBuffer multiQueryPacket;
private static ByteBuffer fieldListPacket;
private static AuditEventBuilder auditBuilder = new AuditEventBuilder();
private static ConnectContext myContext;
@Mocked
private static SocketChannel socketChannel;
private static Data.PQueryStatistics statistics = Data.PQueryStatistics.newBuilder().build();
@BeforeClass
public static void setUpClass() {
// Init Database packet
{ // CHECKSTYLE IGNORE THIS LINE
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(2);
serializer.writeEofString("testCluster:testDb");
initDbPacket = serializer.toByteBuffer();
} // CHECKSTYLE IGNORE THIS LINE
// Ping packet
{ // CHECKSTYLE IGNORE THIS LINE
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(14);
pingPacket = serializer.toByteBuffer();
} // CHECKSTYLE IGNORE THIS LINE
// Quit packet
{ // CHECKSTYLE IGNORE THIS LINE
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(1);
quitPacket = serializer.toByteBuffer();
} // CHECKSTYLE IGNORE THIS LINE
// Query packet
{ // CHECKSTYLE IGNORE THIS LINE
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(3);
serializer.writeEofString("select * from a");
queryPacket = serializer.toByteBuffer();
} // CHECKSTYLE IGNORE THIS LINE
// Multi query packet
{ // CHECKSTYLE IGNORE THIS LINE
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(3);
serializer.writeEofString("select * from a;select * from b;drop table a");
multiQueryPacket = serializer.toByteBuffer();
} // CHECKSTYLE IGNORE THIS LINE
// Field list packet
{ // CHECKSTYLE IGNORE THIS LINE
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(4);
serializer.writeNulTerminateString("testTbl");
serializer.writeEofString("");
fieldListPacket = serializer.toByteBuffer();
} // CHECKSTYLE IGNORE THIS LINE
statistics = statistics.toBuilder().setCpuMs(0L).setScanRows(0).setScanBytes(0).build();
MetricRepo.init();
}
@Before
public void setUp() throws Exception {
initDbPacket.clear();
pingPacket.clear();
quitPacket.clear();
queryPacket.clear();
multiQueryPacket.clear();
fieldListPacket.clear();
// Mock
MysqlChannel channel = new MysqlChannel(socketChannel);
new Expectations(channel) {
{
channel.getRemoteHostPortString();
minTimes = 0;
result = "127.0.0.1:12345";
}
};
myContext = new ConnectContext(socketChannel);
Deencapsulation.setField(myContext, "mysqlChannel", channel);
}
private static MysqlChannel mockChannel(ByteBuffer packet) {
try {
MysqlChannel channel = new MysqlChannel(socketChannel);
new Expectations(channel) {
{
// Mock receive
channel.fetchOnePacket();
minTimes = 0;
result = packet;
// Mock reset
channel.setSequenceId(0);
times = 1;
// Mock send
// channel.sendOnePacket((ByteBuffer) any);
// minTimes = 0;
channel.sendAndFlush((ByteBuffer) any);
minTimes = 0;
channel.getRemoteHostPortString();
minTimes = 0;
result = "127.0.0.1:12345";
}
};
return channel;
} catch (IOException e) {
return null;
}
}
private static ConnectContext initMockContext(MysqlChannel channel, Env env) {
ConnectContext context = new ConnectContext(socketChannel) {
private boolean firstTimeToSetCommand = true;
@Override
public void setKilled() {
myContext.setKilled();
}
@Override
public MysqlSerializer getSerializer() {
return myContext.getSerializer();
}
@Override
public QueryState getState() {
return myContext.getState();
}
@Override
public void setStartTime() {
myContext.setStartTime();
}
@Override
public String getDatabase() {
return myContext.getDatabase();
}
@Override
public void setCommand(MysqlCommand command) {
if (firstTimeToSetCommand) {
myContext.setCommand(command);
firstTimeToSetCommand = false;
} else {
super.setCommand(command);
}
}
};
CatalogIf catalog = env.getCurrentCatalog();
new Expectations(context) {
{
context.getMysqlChannel();
minTimes = 0;
result = channel;
context.isKilled();
minTimes = 0;
maxTimes = 3;
returns(false, true, false);
context.getEnv();
minTimes = 0;
result = env;
context.getAuditEventBuilder();
minTimes = 0;
result = auditBuilder;
context.getQualifiedUser();
minTimes = 0;
result = "testCluster:user";
context.getClusterName();
minTimes = 0;
result = "testCluster";
context.getStartTime();
minTimes = 0;
result = 0L;
context.getReturnRows();
minTimes = 0;
result = 1L;
context.setStmtId(anyLong);
minTimes = 0;
context.getStmtId();
minTimes = 0;
result = 1L;
context.queryId();
minTimes = 0;
result = new TUniqueId();
context.getCurrentCatalog();
minTimes = 0;
result = catalog;
}
};
return context;
}
@Test
public void testQuit() throws IOException {
ConnectContext ctx = initMockContext(mockChannel(quitPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUIT, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket);
Assert.assertTrue(myContext.isKilled());
}
@Test
public void testInitDb() throws IOException {
ConnectContext ctx = initMockContext(mockChannel(initDbPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_INIT_DB, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket);
}
@Test
public void testInitDbFail() throws IOException {
ConnectContext ctx = initMockContext(mockChannel(initDbPacket), AccessTestUtil.fetchBlockCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_INIT_DB, myContext.getCommand());
Assert.assertFalse(myContext.getState().toResponsePacket() instanceof MysqlOkPacket);
}
@Test
public void testPing() throws IOException {
ConnectContext ctx = initMockContext(mockChannel(pingPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_PING, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket);
Assert.assertFalse(myContext.isKilled());
}
@Test
public void testPingLoop() throws IOException {
ConnectContext ctx = initMockContext(mockChannel(pingPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.loop();
Assert.assertEquals(MysqlCommand.COM_PING, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlOkPacket);
Assert.assertFalse(myContext.isKilled());
}
@Test
public void testQuery(@Mocked StmtExecutor executor) throws Exception {
ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
// Mock statement executor
new Expectations() {
{
executor.getQueryStatisticsForAuditLog();
minTimes = 0;
result = statistics;
}
};
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
}
@Test
public void testQueryWithUserInfo(@Mocked StmtExecutor executor) throws Exception {
ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
// Mock statement executor
new Expectations() {
{
executor.getQueryStatisticsForAuditLog();
minTimes = 0;
result = statistics;
}
};
processor.processOnce();
StmtExecutor er = Deencapsulation.getField(processor, "executor");
Assert.assertTrue(er.getParsedStmt().getUserInfo() != null);
}
@Test
public void testQueryFail(@Mocked StmtExecutor executor) throws Exception {
ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
// Mock statement executor
new Expectations() {
{
executor.execute();
minTimes = 0;
result = new IOException("Fail");
executor.getQueryStatisticsForAuditLog();
minTimes = 0;
result = statistics;
}
};
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
}
@Test
public void testQueryFail2(@Mocked StmtExecutor executor) throws Exception {
ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
// Mock statement executor
new Expectations() {
{
executor.execute();
minTimes = 0;
result = new NullPointerException("Fail");
executor.getQueryStatisticsForAuditLog();
minTimes = 0;
result = statistics;
}
};
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket);
}
@Test
public void testMultiQuery(@Mocked StmtExecutor executor) throws Exception {
ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
// Mock statement executor
new Expectations() {
{
executor.getQueryStatisticsForAuditLog();
minTimes = 0;
result = statistics;
executor.execute();
times = 3;
}
};
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.OK);
Assert.assertEquals("drop table a", ctx.getAuditEventBuilder().build().stmt);
}
@Test
public void testMultiQueryFail1(@Mocked StmtExecutor executor) throws Exception {
ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
// Mock statement executor
new Expectations() {
{
executor.getQueryStatisticsForAuditLog();
minTimes = 0;
result = statistics;
executor.execute();
times = 1;
result = new IOException("Fail");
}
};
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.ERR);
Assert.assertEquals(QueryState.MysqlStateType.ERR.name(), ctx.getAuditEventBuilder().build().state);
Assert.assertEquals("select * from a", ctx.getAuditEventBuilder().build().stmt);
}
@Test
public void testMultiQueryFail2(@Mocked StmtExecutor executor) throws Exception {
ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
// Mock statement executor
new Expectations() {
{
executor.getQueryStatisticsForAuditLog();
minTimes = 0;
result = statistics;
executor.execute();
result = null;
result = new IOException("Fail");
}
};
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.ERR);
Assert.assertEquals(QueryState.MysqlStateType.ERR.name(), ctx.getAuditEventBuilder().build().state);
Assert.assertEquals("select * from b", ctx.getAuditEventBuilder().build().stmt);
}
@Test
public void testFieldList() throws Exception {
ConnectContext ctx = initMockContext(mockChannel(fieldListPacket), AccessTestUtil.fetchAdminCatalog());
myContext.setDatabase("testCluster:testDb");
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlEofPacket);
}
@Test
public void testFieldListFailEmptyTable() throws Exception {
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(4);
serializer.writeNulTerminateString("");
serializer.writeEofString("");
ConnectContext ctx = initMockContext(mockChannel(serializer.toByteBuffer()),
AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket);
Assert.assertEquals("Empty tableName", myContext.getState().getErrorMessage());
}
@Test
public void testFieldListFailNoDb() throws Exception {
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(4);
serializer.writeNulTerminateString("testTable");
serializer.writeEofString("");
myContext.setDatabase("testCluster:emptyDb");
ConnectContext ctx = initMockContext(mockChannel(serializer.toByteBuffer()),
AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket);
Assert.assertEquals("Unknown database(testCluster:emptyDb)", myContext.getState().getErrorMessage());
}
@Test
public void testFieldListFailNoTable() throws Exception {
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(4);
serializer.writeNulTerminateString("emptyTable");
serializer.writeEofString("");
myContext.setDatabase("testCluster:testDb");
ConnectContext ctx = initMockContext(mockChannel(serializer.toByteBuffer()),
AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_FIELD_LIST, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket);
Assert.assertEquals("Unknown table(emptyTable)", myContext.getState().getErrorMessage());
}
@Test
public void testUnsupportedCommand() throws Exception {
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(5);
ByteBuffer packet = serializer.toByteBuffer();
ConnectContext ctx = initMockContext(mockChannel(packet), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_CREATE_DB, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket);
Assert.assertFalse(myContext.isKilled());
}
@Test
public void testUnknownCommand() throws Exception {
MysqlSerializer serializer = MysqlSerializer.newInstance();
serializer.writeInt1(101);
ByteBuffer packet = serializer.toByteBuffer();
ConnectContext ctx = initMockContext(mockChannel(packet), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.processOnce();
Assert.assertEquals(MysqlCommand.COM_SLEEP, myContext.getCommand());
Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket);
Assert.assertFalse(myContext.isKilled());
}
@Test
public void testNullPacket() throws Exception {
ConnectContext ctx = initMockContext(mockChannel(null), AccessTestUtil.fetchAdminCatalog());
ConnectProcessor processor = new ConnectProcessor(ctx);
processor.loop();
Assert.assertTrue(myContext.isKilled());
}
}

View File

@ -21,7 +21,6 @@ import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlProto;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
@ -65,28 +64,9 @@ public class ConnectSchedulerTest {
@Test
public void testSubmit(@Mocked ConnectProcessor processor) throws Exception {
// mock new processor
new Expectations() {
{
processor.loop();
result = new Delegate() {
void fakeLoop() {
LOG.warn("starts loop");
// Make cancel thread to work
succSubmit.incrementAndGet();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
LOG.warn("sleep exception");
}
}
};
}
};
ConnectScheduler scheduler = new ConnectScheduler(10);
for (int i = 0; i < 2; ++i) {
ConnectContext context = new ConnectContext(socketChannel);
ConnectContext context = new ConnectContext();
if (i == 1) {
context.setEnv(AccessTestUtil.fetchBlockCatalog());
} else {
@ -100,16 +80,9 @@ public class ConnectSchedulerTest {
@Test
public void testProcessException(@Mocked ConnectProcessor processor) throws Exception {
new Expectations() {
{
processor.loop();
result = new RuntimeException("failed");
}
};
ConnectScheduler scheduler = new ConnectScheduler(10);
ConnectContext context = new ConnectContext(socketChannel);
ConnectContext context = new ConnectContext();
context.setEnv(AccessTestUtil.fetchAdminCatalog());
context.setQualifiedUser("root");
Assert.assertTrue(scheduler.submit(context));
@ -128,7 +101,7 @@ public class ConnectSchedulerTest {
@Test
public void testSubmitTooMany() throws InterruptedException {
ConnectScheduler scheduler = new ConnectScheduler(0);
ConnectContext context = new ConnectContext(socketChannel);
ConnectContext context = new ConnectContext();
Assert.assertTrue(scheduler.submit(context));
}
}

View File

@ -73,7 +73,7 @@ public class CoordinatorTest extends Coordinator {
@Mocked
static FrontendOptions frontendOptions;
static ConnectContext context = new ConnectContext(null);
static ConnectContext context = new ConnectContext();
static Analyzer analyzer = new Analyzer(env, context);
static OriginalPlanner originalPlanner = new OriginalPlanner(analyzer);

View File

@ -128,7 +128,7 @@ public class PartitionCacheTest {
MetricRepo.init();
try {
FrontendOptions.init();
context = new ConnectContext(null);
context = new ConnectContext();
Config.cache_enable_sql_mode = true;
Config.cache_enable_partition_mode = true;
context.getSessionVariable().setEnableSqlCache(true);
@ -235,6 +235,14 @@ public class PartitionCacheTest {
QueryState state = new QueryState();
channel.reset();
new Expectations(channel) {
{
channel.getSerializer();
minTimes = 0;
result = MysqlSerializer.newInstance();
}
};
new Expectations(ctx) {
{
ctx.getMysqlChannel();
@ -245,10 +253,6 @@ public class PartitionCacheTest {
minTimes = 0;
result = clusterName;
ctx.getSerializer();
minTimes = 0;
result = MysqlSerializer.newInstance();
ctx.getEnv();
minTimes = 0;
result = env;

View File

@ -52,7 +52,7 @@ public class SetExecutorTest {
@Before
public void setUp() throws DdlException {
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
ctx = new ConnectContext(null);
ctx = new ConnectContext();
ctx.setEnv(AccessTestUtil.fetchAdminCatalog());
ctx.setQualifiedUser("root");
ctx.setRemoteIP("192.168.1.1");

View File

@ -85,7 +85,7 @@ public class ShowExecutorTest {
@Before
public void setUp() throws Exception {
ctx = new ConnectContext(null);
ctx = new ConnectContext();
ctx.setCommand(MysqlCommand.COM_SLEEP);
Column column1 = new Column("col1", PrimitiveType.BIGINT);

View File

@ -57,7 +57,6 @@ import org.junit.Test;
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ -66,10 +65,8 @@ public class StmtExecutorTest {
private ConnectContext ctx;
private QueryState state;
private ConnectScheduler scheduler;
private MysqlChannel channel = null;
@Mocked
SocketChannel socketChannel;
private MysqlChannel channel = null;
@BeforeClass
public static void start() {
@ -86,13 +83,12 @@ public class StmtExecutorTest {
public void setUp() throws IOException {
state = new QueryState();
scheduler = new ConnectScheduler(10);
ctx = new ConnectContext(socketChannel);
ctx = new ConnectContext();
SessionVariable sessionVariable = new SessionVariable();
MysqlSerializer serializer = MysqlSerializer.newInstance();
Env env = AccessTestUtil.fetchAdminCatalog();
channel = new MysqlChannel(socketChannel);
new Expectations(channel) {
{
channel.sendOnePacket((ByteBuffer) any);
@ -100,6 +96,10 @@ public class StmtExecutorTest {
channel.reset();
minTimes = 0;
channel.getSerializer();
minTimes = 0;
result = serializer;
}
};
@ -109,10 +109,6 @@ public class StmtExecutorTest {
minTimes = 0;
result = channel;
ctx.getSerializer();
minTimes = 0;
result = serializer;
ctx.getEnv();
minTimes = 0;
result = env;

View File

@ -92,7 +92,6 @@ import java.io.StringReader;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -190,8 +189,7 @@ public abstract class TestWithFeService {
}
protected ConnectContext createCtx(UserIdentity user, String host) throws IOException {
SocketChannel channel = SocketChannel.open();
ConnectContext ctx = new ConnectContext(channel);
ConnectContext ctx = new ConnectContext();
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
ctx.setCurrentUserIdentity(user);
ctx.setQualifiedUser(user.getQualifiedUser());

View File

@ -68,7 +68,6 @@ import java.io.StringReader;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.util.ConcurrentModificationException;
import java.util.List;
@ -84,8 +83,7 @@ public class UtFrameUtils {
// Help to create a mocked ConnectContext.
public static ConnectContext createDefaultCtx(UserIdentity userIdentity, String remoteIp) throws IOException {
SocketChannel channel = SocketChannel.open();
ConnectContext ctx = new ConnectContext(channel);
ConnectContext ctx = new ConnectContext();
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
ctx.setCurrentUserIdentity(userIdentity);
ctx.setQualifiedUser(userIdentity.getQualifiedUser());