Set current command when executing queued queries
When readwritesplit is routing any queued queries, the currently executed command of the protocol modules needs to be adjusted by readwritesplit. This is not a true fix but more of a workaround to fix the problems of queued query execution. The correct solution would be to move the queued query handling into the client protocol module so that all components see the same state.
This commit is contained in:
@ -402,39 +402,59 @@ static inline bool MYSQL_IS_CHANGE_USER(const uint8_t* header)
|
||||
/* The following can be compared using memcmp to detect a null password */
|
||||
extern uint8_t null_client_sha1[MYSQL_SCRAMBLE_LEN];
|
||||
|
||||
/**
|
||||
* Allocate a new MySQL_session
|
||||
*
|
||||
* @return New MySQL_session or NULL if memory allocation failed
|
||||
*/
|
||||
MYSQL_session* mysql_session_alloc();
|
||||
|
||||
/**
|
||||
* Create MySQL protocol structure
|
||||
*
|
||||
* @param dcb Owning DCB
|
||||
* @param fd File descriptor of the DCB
|
||||
*
|
||||
* @return New protocol or NULL on error
|
||||
*/
|
||||
MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd);
|
||||
bool mysql_protocol_done (DCB* dcb);
|
||||
|
||||
/**
|
||||
* Free protocol object
|
||||
*
|
||||
* @param dcb Owner DCB
|
||||
*
|
||||
* @return True if protocol was closed
|
||||
*/
|
||||
bool mysql_protocol_done(DCB* dcb);
|
||||
|
||||
/**
|
||||
* Return a string representation of a MySQL protocol state.
|
||||
*
|
||||
* @param state The protocol state
|
||||
*
|
||||
* @return String representation of the state
|
||||
*/
|
||||
const char *gw_mysql_protocol_state2string(int state);
|
||||
int mysql_send_com_quit(DCB* dcb, int packet_number, GWBUF* buf);
|
||||
GWBUF* mysql_create_com_quit(GWBUF* bufparam, int packet_number);
|
||||
|
||||
int mysql_send_custom_error (
|
||||
DCB *dcb,
|
||||
int packet_number,
|
||||
int in_affected_rows,
|
||||
const char* mysql_message);
|
||||
/**
|
||||
* Set current command being executed
|
||||
*
|
||||
* @param dcb The DCB whose protocol is modified
|
||||
* @param cmd The command being executed
|
||||
*
|
||||
* @note This function should not be used in normal operation
|
||||
*/
|
||||
void mysql_protocol_set_current_command(DCB* dcb, mxs_mysql_cmd_t cmd);
|
||||
|
||||
GWBUF* mysql_create_custom_error(
|
||||
int packet_number,
|
||||
int affected_rows,
|
||||
const char* msg);
|
||||
GWBUF* mysql_create_com_quit(GWBUF* bufparam, int sequence);
|
||||
GWBUF* mysql_create_custom_error(int sequence, int affected_rows, const char* msg);
|
||||
GWBUF *mysql_create_standard_error(int sequence, int error_number, const char *msg);
|
||||
|
||||
GWBUF *mysql_create_standard_error(int packet_number,
|
||||
int error_number,
|
||||
const char *error_message);
|
||||
|
||||
int mysql_send_standard_error(DCB *dcb,
|
||||
int packet_number,
|
||||
int error_number,
|
||||
const char *error_message);
|
||||
|
||||
int mysql_send_auth_error (
|
||||
DCB *dcb,
|
||||
int packet_number,
|
||||
int in_affected_rows,
|
||||
const char* mysql_message);
|
||||
int mysql_send_com_quit(DCB* dcb, int sequence, GWBUF* buf);
|
||||
int mysql_send_custom_error(DCB *dcb, int sequence, int affected_rows, const char* msg);
|
||||
int mysql_send_standard_error(DCB *dcb, int sequence, int errnum, const char *msg);
|
||||
int mysql_send_auth_error(DCB *dcb, int sequence, int affected_rows, const char* msg);
|
||||
|
||||
GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf);
|
||||
GWBUF* gw_MySQL_get_packets(GWBUF** p_readbuf, int* npackets);
|
||||
@ -443,8 +463,8 @@ void protocol_remove_srv_command(MySQLProtocol* p);
|
||||
bool protocol_waits_response(MySQLProtocol* p);
|
||||
mxs_mysql_cmd_t protocol_get_srv_command(MySQLProtocol* p, bool removep);
|
||||
int get_stmt_nresponse_packets(GWBUF* buf, mxs_mysql_cmd_t cmd);
|
||||
bool protocol_get_response_status (MySQLProtocol* p, int* npackets, size_t* nbytes);
|
||||
void protocol_set_response_status (MySQLProtocol* p, int npackets, size_t nbytes);
|
||||
bool protocol_get_response_status(MySQLProtocol* p, int* npackets, size_t* nbytes);
|
||||
void protocol_set_response_status(MySQLProtocol* p, int npackets, size_t nbytes);
|
||||
void protocol_archive_srv_command(MySQLProtocol* p);
|
||||
|
||||
char* create_auth_fail_str(char *username, char *hostaddr, bool password, char *db, int);
|
||||
|
@ -29,10 +29,6 @@ uint8_t null_client_sha1[MYSQL_SCRAMBLE_LEN] = "";
|
||||
|
||||
static server_command_t* server_command_init(server_command_t* srvcmd, mxs_mysql_cmd_t cmd);
|
||||
|
||||
/**
|
||||
* @brief Allocate a new MySQL_session
|
||||
* @return New MySQL_session or NULL if memory allocation failed
|
||||
*/
|
||||
MYSQL_session* mysql_session_alloc()
|
||||
{
|
||||
MYSQL_session *ses = MXS_CALLOC(1, sizeof(MYSQL_session));
|
||||
@ -48,19 +44,6 @@ MYSQL_session* mysql_session_alloc()
|
||||
return ses;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates MySQL protocol structure
|
||||
*
|
||||
* @param dcb * Must be non-NULL.
|
||||
* @param fd
|
||||
*
|
||||
* @return
|
||||
*
|
||||
*
|
||||
* @details Protocol structure does not have fd because dcb is not
|
||||
* connected yet.
|
||||
*
|
||||
*/
|
||||
MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd)
|
||||
{
|
||||
MySQLProtocol* p;
|
||||
@ -94,13 +77,6 @@ return_p:
|
||||
return p;
|
||||
}
|
||||
|
||||
/**
|
||||
* Free protocol object
|
||||
*
|
||||
* @param dcb Owner DCB
|
||||
*
|
||||
* @return True if protocol was closed
|
||||
*/
|
||||
bool mysql_protocol_done(DCB* dcb)
|
||||
{
|
||||
bool rval = false;
|
||||
@ -126,13 +102,6 @@ bool mysql_protocol_done(DCB* dcb)
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a string representation of a MySQL protocol state.
|
||||
*
|
||||
* @param state The protocol state
|
||||
* @return String representation of the state
|
||||
*
|
||||
*/
|
||||
const char* gw_mysql_protocol_state2string (int state)
|
||||
{
|
||||
switch (state)
|
||||
@ -156,6 +125,12 @@ const char* gw_mysql_protocol_state2string (int state)
|
||||
}
|
||||
}
|
||||
|
||||
void mysql_protocol_set_current_command(DCB* dcb, mxs_mysql_cmd_t cmd)
|
||||
{
|
||||
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
||||
proto->current_command = cmd;
|
||||
}
|
||||
|
||||
GWBUF* mysql_create_com_quit(GWBUF* bufparam,
|
||||
int packet_number)
|
||||
{
|
||||
|
@ -466,6 +466,11 @@ static bool route_stored_query(RWSplitSession *rses)
|
||||
GWBUF *temp_storage = rses->query_queue;
|
||||
rses->query_queue = NULL;
|
||||
|
||||
// TODO: Move the handling of queued queries to the client protocol
|
||||
// TODO: module where the command tracking is done automatically.
|
||||
uint8_t cmd = mxs_mysql_get_command(query_queue);
|
||||
mysql_protocol_set_current_command(rses->client_dcb, (mxs_mysql_cmd_t)cmd);
|
||||
|
||||
if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue))
|
||||
{
|
||||
rval = false;
|
||||
@ -917,6 +922,7 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
|
||||
querybuf = NULL;
|
||||
rval = 1;
|
||||
ss_dassert(rses->expected_responses > 0);
|
||||
|
||||
if (rses->expected_responses == 0 && !route_stored_query(rses))
|
||||
{
|
||||
|
Reference in New Issue
Block a user