[Bug][Socket Leak] Fix bug that Mysql NIO server is leaking sockets (#4192)
When using mysql nio server, if the mysql handshake protocol fails, we need to actively close the channel to prevent socket leakage.
This commit is contained in:
22
.github/PULL_REQUEST_TEMPLATE.md
vendored
22
.github/PULL_REQUEST_TEMPLATE.md
vendored
@ -7,22 +7,22 @@ Describe the big picture of your changes here to communicate to the maintainers
|
||||
What types of changes does your code introduce to Doris?
|
||||
_Put an `x` in the boxes that apply_
|
||||
|
||||
- [ ] Bugfix (non-breaking change which fixes an issue)
|
||||
- [ ] New feature (non-breaking change which adds functionality)
|
||||
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
|
||||
- [ ] Documentation Update (if none of the other choices apply)
|
||||
- [] Bugfix (non-breaking change which fixes an issue)
|
||||
- [] New feature (non-breaking change which adds functionality)
|
||||
- [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
|
||||
- [] Documentation Update (if none of the other choices apply)
|
||||
- [] Code refactor (Modify the code structure, format the code, etc...)
|
||||
|
||||
## Checklist
|
||||
|
||||
_Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
|
||||
|
||||
- [ ] I have create an issue on [Doris's issues](https://github.com/apache/incubator-doris/issues), and have described the bug/feature there in detail
|
||||
- [ ] Commit messages in my PR start with the related issues ID, like "#4071 Add pull request template to doris project"
|
||||
- [ ] Compiling and unit tests pass locally with my changes
|
||||
- [ ] I have added tests that prove my fix is effective or that my feature works
|
||||
- [ ] If this change need a document change, I have updated the document
|
||||
- [ ] Any dependent changes have been merged
|
||||
- [] I have create an issue on #ISSUE, and have described the bug/feature there in detail
|
||||
- [] Compiling and unit tests pass locally with my changes
|
||||
- [] I have added tests that prove my fix is effective or that my feature works
|
||||
- [] If this change need a document change, I have updated the document
|
||||
- [] Any dependent changes have been merged
|
||||
|
||||
## Further comments
|
||||
|
||||
If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...
|
||||
If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...
|
||||
|
||||
@ -147,7 +147,7 @@ public class MysqlChannel {
|
||||
readLen = readAll(headerByteBuffer);
|
||||
if (readLen != PACKET_HEADER_LEN) {
|
||||
// remote has close this channel
|
||||
LOG.info("Receive packet header failed, remote may close the channel.");
|
||||
LOG.debug("Receive packet header failed, remote may close the channel.");
|
||||
return null;
|
||||
}
|
||||
if (packetId() != sequenceId) {
|
||||
|
||||
@ -48,6 +48,8 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
|
||||
return;
|
||||
}
|
||||
LOG.info("Connection established. remote={}", connection.getPeerAddress());
|
||||
// connection has been established, so need to call context.cleanup()
|
||||
// if exception happens.
|
||||
NConnectContext context = new NConnectContext(connection);
|
||||
context.setCatalog(Catalog.getCurrentCatalog());
|
||||
connectScheduler.submit(context);
|
||||
@ -59,7 +61,7 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
|
||||
context.setConnectScheduler(connectScheduler);
|
||||
// authenticate check failed.
|
||||
if (!MysqlProto.negotiate(context)) {
|
||||
return;
|
||||
throw new AfterConnectedException("mysql negotiate failed");
|
||||
}
|
||||
if (connectScheduler.registerConnection(context)) {
|
||||
MysqlProto.sendResponsePacket(context);
|
||||
@ -67,12 +69,17 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
|
||||
} else {
|
||||
context.getState().setError("Reach limit of connections");
|
||||
MysqlProto.sendResponsePacket(context);
|
||||
return;
|
||||
throw new AfterConnectedException("Reach limit of connections");
|
||||
}
|
||||
context.setStartTime();
|
||||
ConnectProcessor processor = new ConnectProcessor(context);
|
||||
context.startAcceptQuery(processor);
|
||||
} catch (AfterConnectedException e) {
|
||||
// do not need to print log for this kind of exception.
|
||||
// just clean up the context;
|
||||
context.cleanup();
|
||||
} catch (Exception e) {
|
||||
// should be unexpected exception, so print warn log
|
||||
LOG.warn("connect processor exception because ", e);
|
||||
context.cleanup();
|
||||
} finally {
|
||||
@ -83,4 +90,12 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
|
||||
LOG.warn("Connection accept failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
// this exception is only used for some expected exception after connection established.
|
||||
// so that we can catch these kind of exceptions and close the channel without printing warning logs.
|
||||
private static class AfterConnectedException extends Exception {
|
||||
public AfterConnectedException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user