[fix](mysql) fix mysql channel infinite blocking (#28808)
Call the Channels blocking method with timeout instead. Using session variables net_write_timeout and net_read_timeout as the timeout parameter.
This commit is contained in:
@ -31,6 +31,7 @@ import org.xnio.channels.Channels;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLEngineResult;
|
||||
import javax.net.ssl.SSLException;
|
||||
@ -79,6 +80,8 @@ public class MysqlChannel {
|
||||
// mysql flag CLIENT_DEPRECATE_EOF
|
||||
private boolean clientDeprecatedEOF;
|
||||
|
||||
private ConnectContext context;
|
||||
|
||||
protected MysqlChannel() {
|
||||
// For DummyMysqlChannel
|
||||
}
|
||||
@ -91,7 +94,7 @@ public class MysqlChannel {
|
||||
return clientDeprecatedEOF;
|
||||
}
|
||||
|
||||
public MysqlChannel(StreamConnection connection) {
|
||||
public MysqlChannel(StreamConnection connection, ConnectContext context) {
|
||||
Preconditions.checkNotNull(connection);
|
||||
this.sequenceId = 0;
|
||||
this.isSend = false;
|
||||
@ -113,6 +116,7 @@ public class MysqlChannel {
|
||||
this.defaultBuffer = ByteBuffer.allocate(16 * 1024);
|
||||
this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
|
||||
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
public void initSslBuffer() {
|
||||
@ -195,7 +199,8 @@ public class MysqlChannel {
|
||||
}
|
||||
try {
|
||||
while (dstBuf.remaining() != 0) {
|
||||
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf);
|
||||
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(),
|
||||
TimeUnit.SECONDS);
|
||||
// return -1 when remote peer close the channel
|
||||
if (ret == -1) {
|
||||
decryptData(dstBuf, isHeader);
|
||||
@ -365,12 +370,13 @@ public class MysqlChannel {
|
||||
protected void realNetSend(ByteBuffer buffer) throws IOException {
|
||||
buffer = encryptData(buffer);
|
||||
long bufLen = buffer.remaining();
|
||||
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
|
||||
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(),
|
||||
TimeUnit.SECONDS);
|
||||
if (bufLen != writeLen) {
|
||||
throw new IOException("Write mysql packet failed.[write=" + writeLen
|
||||
+ ", needToWrite=" + bufLen + "]");
|
||||
}
|
||||
Channels.flushBlocking(conn.getSinkChannel());
|
||||
Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS);
|
||||
isSend = true;
|
||||
}
|
||||
|
||||
|
||||
@ -327,7 +327,7 @@ public class ConnectContext {
|
||||
connectType = ConnectType.MYSQL;
|
||||
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
|
||||
if (connection != null) {
|
||||
mysqlChannel = new MysqlChannel(connection);
|
||||
mysqlChannel = new MysqlChannel(connection, this);
|
||||
} else {
|
||||
mysqlChannel = new DummyMysqlChannel();
|
||||
}
|
||||
@ -1033,5 +1033,13 @@ public class ConnectContext {
|
||||
public void setSkipAuth(boolean skipAuth) {
|
||||
this.skipAuth = skipAuth;
|
||||
}
|
||||
|
||||
public int getNetReadTimeout() {
|
||||
return this.sessionVariable.getNetReadTimeout();
|
||||
}
|
||||
|
||||
public int getNetWriteTimeout() {
|
||||
return this.sessionVariable.getNetWriteTimeout();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user