Reset persistent MySQL connections
When a persistent connection is taken from the pool, the state is reset with a COM_CHANGE_USER on the next write. This allows reuse of persistent connections without having to worry about the state of the MySQL session.
This commit is contained in:
@ -276,6 +276,7 @@ typedef struct dcb
|
|||||||
bool ssl_write_want_read; /*< Flag */
|
bool ssl_write_want_read; /*< Flag */
|
||||||
bool ssl_write_want_write; /*< Flag */
|
bool ssl_write_want_write; /*< Flag */
|
||||||
int dcb_port; /**< port of target server */
|
int dcb_port; /**< port of target server */
|
||||||
|
bool was_persistent; /**< Whether this DCB was in the persistent pool */
|
||||||
skygw_chk_t dcb_chk_tail;
|
skygw_chk_t dcb_chk_tail;
|
||||||
} DCB;
|
} DCB;
|
||||||
|
|
||||||
|
|||||||
@ -265,23 +265,20 @@ typedef struct
|
|||||||
skygw_chk_t protocol_chk_top;
|
skygw_chk_t protocol_chk_top;
|
||||||
#endif
|
#endif
|
||||||
int fd; /*< The socket descriptor */
|
int fd; /*< The socket descriptor */
|
||||||
struct dcb *owner_dcb; /*< The DCB of the socket
|
struct dcb* owner_dcb; /*< The DCB of the socket we are running on */
|
||||||
* we are running on */
|
SPINLOCK protocol_lock; /*< Protocol lock */
|
||||||
SPINLOCK protocol_lock;
|
mysql_server_cmd_t current_command; /*< Current command being executed */
|
||||||
mysql_server_cmd_t current_command; /**< Current command being executed */
|
|
||||||
server_command_t protocol_command; /*< session command list */
|
server_command_t protocol_command; /*< session command list */
|
||||||
server_command_t* protocol_cmd_history; /*< session command history */
|
server_command_t* protocol_cmd_history; /*< session command history */
|
||||||
mxs_auth_state_t protocol_auth_state; /*< Authentication status */
|
mxs_auth_state_t protocol_auth_state; /*< Authentication status */
|
||||||
mysql_protocol_state_t protocol_state; /*< Protocol struct status */
|
mysql_protocol_state_t protocol_state; /*< Protocol struct status */
|
||||||
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble,
|
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble, created or received */
|
||||||
* created or received */
|
uint32_t server_capabilities; /*< server capabilities, created or received */
|
||||||
uint32_t server_capabilities; /*< server capabilities,
|
uint32_t client_capabilities; /*< client capabilities, created or received */
|
||||||
* created or received */
|
unsigned long tid; /*< MySQL Thread ID, in handshake */
|
||||||
uint32_t client_capabilities; /*< client capabilities,
|
|
||||||
* created or received */
|
|
||||||
unsigned long tid; /*< MySQL Thread ID, in
|
|
||||||
* handshake */
|
|
||||||
unsigned int charset; /*< MySQL character set at connect time */
|
unsigned int charset; /*< MySQL character set at connect time */
|
||||||
|
bool ignore_reply; /*< If the reply should be discarded */
|
||||||
|
GWBUF* stored_query; /*< Temporarily stored queries */
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_chk_t protocol_chk_tail;
|
skygw_chk_t protocol_chk_tail;
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -772,6 +772,7 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
|
|||||||
MXS_DEBUG("%lu [dcb_connect] Reusing a persistent connection, dcb %p\n",
|
MXS_DEBUG("%lu [dcb_connect] Reusing a persistent connection, dcb %p\n",
|
||||||
pthread_self(), dcb);
|
pthread_self(), dcb);
|
||||||
dcb->persistentstart = 0;
|
dcb->persistentstart = 0;
|
||||||
|
dcb->was_persistent = true;
|
||||||
return dcb;
|
return dcb;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -867,6 +868,8 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
|
|||||||
dcb->dcb_server_status = server->status;
|
dcb->dcb_server_status = server->status;
|
||||||
dcb->dcb_port = server->port;
|
dcb->dcb_port = server->port;
|
||||||
|
|
||||||
|
dcb->was_persistent = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* backend_dcb is connected to backend server, and once backend_dcb
|
* backend_dcb is connected to backend server, and once backend_dcb
|
||||||
* is added to poll set, authentication takes place as part of
|
* is added to poll set, authentication takes place as part of
|
||||||
|
|||||||
@ -759,6 +759,41 @@ gw_read_and_write(DCB *dcb)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
|
||||||
|
|
||||||
|
spinlock_acquire(&dcb->authlock);
|
||||||
|
|
||||||
|
if (proto->ignore_reply)
|
||||||
|
{
|
||||||
|
|
||||||
|
/** The reply to a COM_CHANGE_USER is in packet */
|
||||||
|
GWBUF *query = proto->stored_query;
|
||||||
|
uint8_t result = *((uint8_t*)GWBUF_DATA(read_buffer) + 4);
|
||||||
|
proto->stored_query = NULL;
|
||||||
|
proto->ignore_reply = false;
|
||||||
|
gwbuf_free(read_buffer);
|
||||||
|
|
||||||
|
spinlock_release(&dcb->authlock);
|
||||||
|
|
||||||
|
int rval = 0;
|
||||||
|
|
||||||
|
if (result == MYSQL_REPLY_OK)
|
||||||
|
{
|
||||||
|
rval = query ? dcb->func.write(dcb, query) : 1;
|
||||||
|
}
|
||||||
|
else if (query)
|
||||||
|
{
|
||||||
|
/** The COM_CHANGE USER failed, generate a fake hangup event to
|
||||||
|
* close the DCB and send an error to the client. */
|
||||||
|
gwbuf_free(query);
|
||||||
|
poll_fake_hangup_event(dcb);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
spinlock_release(&dcb->authlock);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If protocol has session command set, concatenate whole
|
* If protocol has session command set, concatenate whole
|
||||||
* response into one buffer.
|
* response into one buffer.
|
||||||
@ -911,6 +946,48 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
|
|||||||
|
|
||||||
CHK_DCB(dcb);
|
CHK_DCB(dcb);
|
||||||
spinlock_acquire(&dcb->authlock);
|
spinlock_acquire(&dcb->authlock);
|
||||||
|
|
||||||
|
if (dcb->was_persistent)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* This is a DCB that was just taken out of the persistent connection pool.
|
||||||
|
* We need to sent a COM_CHANGE_USER query to the backend to reset the
|
||||||
|
* session state.
|
||||||
|
*/
|
||||||
|
if (backend_protocol->stored_query)
|
||||||
|
{
|
||||||
|
/** It is possible that the client DCB is closed before the COM_CHANGE_USER
|
||||||
|
* response is received. */
|
||||||
|
gwbuf_free(backend_protocol->stored_query);
|
||||||
|
}
|
||||||
|
dcb->was_persistent = false;
|
||||||
|
backend_protocol->ignore_reply = true;
|
||||||
|
backend_protocol->stored_query = queue;
|
||||||
|
|
||||||
|
spinlock_release(&dcb->authlock);
|
||||||
|
|
||||||
|
GWBUF *buf = gw_create_change_user_packet(dcb->session->client_dcb->data, dcb->protocol);
|
||||||
|
return dcb_write(dcb, buf) ? 1 : 0;
|
||||||
|
}
|
||||||
|
else if (backend_protocol->ignore_reply)
|
||||||
|
{
|
||||||
|
if (MYSQL_IS_COM_QUIT((uint8_t*)GWBUF_DATA(queue)))
|
||||||
|
{
|
||||||
|
gwbuf_free(queue);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* We're still waiting on the reply to the COM_CHANGE_USER, append the
|
||||||
|
* buffer to the stored query. This is possible if the client sends
|
||||||
|
* BLOB data on the first command.
|
||||||
|
*/
|
||||||
|
backend_protocol->stored_query = gwbuf_append(backend_protocol->stored_query, queue);
|
||||||
|
}
|
||||||
|
spinlock_release(&dcb->authlock);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pick action according to state of protocol.
|
* Pick action according to state of protocol.
|
||||||
* If auth failed, return value is 0, write and buffered write
|
* If auth failed, return value is 0, write and buffered write
|
||||||
|
|||||||
@ -102,6 +102,7 @@ MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd)
|
|||||||
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
|
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
|
||||||
p->protocol_command.scom_nresponse_packets = 0;
|
p->protocol_command.scom_nresponse_packets = 0;
|
||||||
p->protocol_command.scom_nbytes_to_read = 0;
|
p->protocol_command.scom_nbytes_to_read = 0;
|
||||||
|
p->stored_query = NULL;
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
p->protocol_chk_top = CHK_NUM_PROTOCOL;
|
p->protocol_chk_top = CHK_NUM_PROTOCOL;
|
||||||
p->protocol_chk_tail = CHK_NUM_PROTOCOL;
|
p->protocol_chk_tail = CHK_NUM_PROTOCOL;
|
||||||
@ -145,6 +146,9 @@ void mysql_protocol_done(DCB* dcb)
|
|||||||
MXS_FREE(scmd);
|
MXS_FREE(scmd);
|
||||||
scmd = scmd2;
|
scmd = scmd2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gwbuf_free(p->stored_query);
|
||||||
|
|
||||||
p->protocol_state = MYSQL_PROTOCOL_DONE;
|
p->protocol_state = MYSQL_PROTOCOL_DONE;
|
||||||
|
|
||||||
retblock:
|
retblock:
|
||||||
|
|||||||
Reference in New Issue
Block a user