From 900a7d67ddcb49962782e6b5997cc05ab7370fd9 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Sun, 9 Oct 2016 18:48:22 +0800 Subject: [PATCH] server: add comments for readability. (#1798) --- server/conn.go | 62 ++++++++++++++++++++++++++++++---------------- server/packetio.go | 1 + server/server.go | 18 ++++++-------- 3 files changed, 50 insertions(+), 31 deletions(-) diff --git a/server/conn.go b/server/conn.go index 705f9df341..b71ee8c51b 100644 --- a/server/conn.go +++ b/server/conn.go @@ -59,29 +59,34 @@ var defaultCapability = mysql.ClientLongPassword | mysql.ClientLongFlag | mysql.ClientMultiStatements | mysql.ClientMultiResults | mysql.ClientConnectAtts +// clientConn represents a connection between server and client, it maintains connection specific state, +// handles client query. type clientConn struct { - pkg *packetIO + pkt *packetIO // a helper to read and write data in packet format. conn net.Conn - server *Server - capability uint32 - connectionID uint32 - collation uint8 - charset string - user string - dbname string - salt []byte - alloc arena.Allocator - lastCmd string - ctx IContext - attrs map[string]string + server *Server // a reference of server instance. + capability uint32 // client capability affects the way server handles client request. + connectionID uint32 // atomically allocated by a global variable, unique in process scope. + collation uint8 // collation used by client, may be different from the collation used by database. + user string // user of the client. + dbname string // default database name. + salt []byte // random bytes used for authentication. + alloc arena.Allocator // an memory allocator for reducing memory allocation. + lastCmd string // latest sql query string, currently used for logging error. + ctx IContext // an interface to execute sql statements. + attrs map[string]string // attributes parsed from client handshake response, not used for now. } func (cc *clientConn) String() string { - return fmt.Sprintf("conn: %s, status: %d, charset: %s, user: %s, lastInsertId: %d", - cc.conn.RemoteAddr(), cc.ctx.Status(), cc.charset, cc.user, cc.ctx.LastInsertID(), + collationStr := mysql.Collations[uint8(cc.connectionID)] + return fmt.Sprintf("conn: %s, status: %d, collation: %s, user: %s, lastInsertId: %d", + cc.conn.RemoteAddr(), cc.ctx.Status(), collationStr, cc.user, cc.ctx.LastInsertID(), ) } +// handshake works like TCP handshake, but in a higher level, it first writes initial packet to client, +// during handshake, client and server negotiate compatible features and do authentication. +// After handshake, client can send sql query to server. func (cc *clientConn) handshake() error { if err := cc.writeInitialHandshake(); err != nil { return errors.Trace(err) @@ -99,7 +104,7 @@ func (cc *clientConn) handshake() error { } err := cc.writePacket(data) - cc.pkg.sequence = 0 + cc.pkt.sequence = 0 if err != nil { return errors.Trace(err) } @@ -120,6 +125,8 @@ func (cc *clientConn) Close() error { return nil } +// writeInitialHandshake sends server version, connection ID, server capability, collation, server status +// and auth salt to the client. func (cc *clientConn) writeInitialHandshake() error { data := make([]byte, 4, 128) @@ -159,11 +166,11 @@ func (cc *clientConn) writeInitialHandshake() error { } func (cc *clientConn) readPacket() ([]byte, error) { - return cc.pkg.readPacket() + return cc.pkt.readPacket() } func (cc *clientConn) writePacket(data []byte) error { - return cc.pkg.writePacket(data) + return cc.pkt.writePacket(data) } type handshakeResponse41 struct { @@ -299,6 +306,9 @@ func (cc *clientConn) readHandshakeResponse() error { return nil } +// Run reads client query and writes query result to client in for loop, if there is a panic during query handling, +// it will be recovered and log the panic error. +// This function returns and the connection is closed if there is an IO error or there is a panic. func (cc *clientConn) Run() { defer func() { r := recover() @@ -330,10 +340,13 @@ func (cc *clientConn) Run() { cc.writeError(err) } - cc.pkg.sequence = 0 + cc.pkt.sequence = 0 } } +// dispatch handles client request based on command which is the first byte of the data. +// It also gets a token from server which is used to limit the concurrently handling clients. +// The most frequently used command is ComQuery. func (cc *clientConn) dispatch(data []byte) error { cmd := data[0] data = data[1:] @@ -355,7 +368,7 @@ func (cc *clientConn) dispatch(data []byte) error { return nil case mysql.ComQuit: return io.EOF - case mysql.ComQuery: + case mysql.ComQuery: // Most frequently used command. return cc.handleQuery(hack.String(data)) case mysql.ComPing: return cc.writeOK() @@ -394,7 +407,7 @@ func (cc *clientConn) useDB(db string) (err error) { } func (cc *clientConn) flush() error { - return cc.pkg.flush() + return cc.pkt.flush() } func (cc *clientConn) writeOK() error { @@ -480,6 +493,8 @@ func (cc *clientConn) writeReq(filePath string) error { return errors.Trace(cc.flush()) } +// handleLoadData does the additional work after processing the 'load data' query. +// It sends client a file path, then reads the file content from client, inserts data into database. func (cc *clientConn) handleLoadData(loadDataInfo *executor.LoadDataInfo) error { var err error defer func() { @@ -534,6 +549,9 @@ func (cc *clientConn) handleLoadData(loadDataInfo *executor.LoadDataInfo) error return nil } +// handleQuery executes the sql query string and writes result set or result ok to the client. +// As the execution time of this function represents the performance of TiDB, we do time log and metrics here. +// There is a special query `load data` that does not return result, which is handled differently. func (cc *clientConn) handleQuery(sql string) (err error) { startTS := time.Now() defer func() { @@ -577,6 +595,8 @@ func (cc *clientConn) handleQuery(sql string) (err error) { return errors.Trace(err) } +// handleFieldList returns the field list for a table. +// The sql string is composed of a table name and a terminating character \x00. func (cc *clientConn) handleFieldList(sql string) (err error) { parts := strings.Split(sql, "\x00") columns, err := cc.ctx.FieldList(parts[0]) diff --git a/server/packetio.go b/server/packetio.go index af6dd4eadf..d55f1e8558 100644 --- a/server/packetio.go +++ b/server/packetio.go @@ -48,6 +48,7 @@ const ( defaultWriterSize = 16 * 1024 ) +// packetIO is a helper to read and write data in packet format. type packetIO struct { rb *bufio.Reader wb *bufio.Writer diff --git a/server/server.go b/server/server.go index cae5d8ed62..83c13d38e9 100644 --- a/server/server.go +++ b/server/server.go @@ -99,19 +99,20 @@ func randomBuf(size int) []byte { return buf } -func (s *Server) newConn(conn net.Conn) (cc *clientConn, err error) { +// newConn creates a new *clientConn from a net.Conn. +// It allocates a connection ID and random salt data for authentication. +func (s *Server) newConn(conn net.Conn) *clientConn { log.Info("newConn", conn.RemoteAddr().String()) - cc = &clientConn{ + cc := &clientConn{ conn: conn, - pkg: newPacketIO(conn), + pkt: newPacketIO(conn), server: s, connectionID: atomic.AddUint32(&baseConnID, 1), collation: mysql.DefaultCollationID, - charset: mysql.DefaultCharset, alloc: arena.NewAllocator(32 * 1024), } cc.salt = randomBuf(20) - return + return cc } func (s *Server) skipAuth() bool { @@ -180,12 +181,9 @@ func (s *Server) Close() { } } +// onConn runs in its own goroutine, handles queries from this connection. func (s *Server) onConn(c net.Conn) { - conn, err := s.newConn(c) - if err != nil { - log.Errorf("newConn error %s", errors.ErrorStack(err)) - return - } + conn := s.newConn(c) if err := conn.handshake(); err != nil { log.Errorf("handshake error %s", errors.ErrorStack(err)) c.Close()