Take SessionCommand into use in readwritesplit
Readwritesplit now uses the SessionCommand class as a "master list" of executed session commands. This allows the session commands to be easily copied over to slaves that are taken into use after session commands have already been executed. Currently, the code doesn't execute the session command history when a mid-session reconnection occurs. A method to cleanly copy the session commands needs to be exposed by the Backend class.
This commit is contained in:
@ -118,11 +118,11 @@ is_packet_a_query(int packet_type)
|
||||
* @param packet_type Type of packet (integer)
|
||||
* @return bool indicating whether packet contains a one way message
|
||||
*/
|
||||
bool
|
||||
is_packet_a_one_way_message(int packet_type)
|
||||
bool command_will_respond(uint8_t packet_type)
|
||||
{
|
||||
return (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA ||
|
||||
packet_type == MYSQL_COM_QUIT || packet_type == MYSQL_COM_STMT_CLOSE);
|
||||
return packet_type != MYSQL_COM_STMT_SEND_LONG_DATA &&
|
||||
packet_type != MYSQL_COM_QUIT &&
|
||||
packet_type != MYSQL_COM_STMT_CLOSE;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -234,7 +234,7 @@ bool handle_target_is_all(route_target_t route_target, ROUTER_INSTANCE *inst,
|
||||
MXS_FREE(query_str);
|
||||
MXS_FREE(qtype_str);
|
||||
}
|
||||
else if (route_session_write(rses, gwbuf_clone(querybuf), inst, packet_type, qtype))
|
||||
else if (route_session_write(rses, gwbuf_clone(querybuf), packet_type))
|
||||
{
|
||||
|
||||
result = true;
|
||||
@ -281,194 +281,24 @@ void closed_session_reply(GWBUF *querybuf)
|
||||
* @param scur Session cursor
|
||||
* @param bref Router session data for a backend server
|
||||
*/
|
||||
void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend_ref_t *bref)
|
||||
void check_session_command_reply(GWBUF *writebuf, SRWBackend bref)
|
||||
{
|
||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_ERR) &&
|
||||
MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
|
||||
if (MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
|
||||
{
|
||||
uint8_t *buf = (uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
|
||||
uint8_t *replybuf = (uint8_t *)GWBUF_DATA(writebuf);
|
||||
size_t len = MYSQL_GET_PAYLOAD_LEN(buf);
|
||||
size_t replylen = MYSQL_GET_PAYLOAD_LEN(replybuf);
|
||||
char *err = strndup(&((char *)replybuf)[8], 5);
|
||||
char *replystr = strndup(&((char *)replybuf)[13], replylen - 4 - 5);
|
||||
|
||||
ss_dassert(len + 4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
|
||||
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf));
|
||||
char replybuf[replylen];
|
||||
gwbuf_copy_data(writebuf, 0, gwbuf_length(writebuf), (uint8_t*)replybuf);
|
||||
std::string err;
|
||||
std::string msg;
|
||||
err.append(replybuf + 8, 5);
|
||||
msg.append(replybuf + 13, replylen - 4 - 5);
|
||||
|
||||
MXS_ERROR("Failed to execute session command in [%s]:%d. Error was: %s %s",
|
||||
bref->ref->server->name,
|
||||
bref->ref->server->port, err, replystr);
|
||||
MXS_FREE(err);
|
||||
MXS_FREE(replystr);
|
||||
bref->server()->name, bref->server()->port,
|
||||
err.c_str(), msg.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief If session command cursor is passive, sends the command to backend for
|
||||
* execution.
|
||||
*
|
||||
* Returns true if command was sent or added successfully to the queue.
|
||||
* Returns false if command sending failed or if there are no pending session
|
||||
* commands.
|
||||
*
|
||||
* Router session must be locked.
|
||||
*
|
||||
* @param backend_ref Router session backend database data
|
||||
* @return bool - true for success, false for failure
|
||||
*/
|
||||
/*
|
||||
* Uses MySQL specific values in the large switch statement, although it
|
||||
* may be possible to generalize them.
|
||||
*/
|
||||
bool execute_sescmd_in_backend(backend_ref_t *backend_ref)
|
||||
{
|
||||
ss_dassert(backend_ref);
|
||||
CHK_BACKEND_REF(backend_ref);
|
||||
bool succp = false;
|
||||
|
||||
if (BREF_IS_CLOSED(backend_ref))
|
||||
{
|
||||
return succp;
|
||||
}
|
||||
|
||||
DCB *dcb = backend_ref->bref_dcb;
|
||||
CHK_DCB(dcb);
|
||||
|
||||
/**
|
||||
* Get cursor pointer and copy of command buffer to cursor.
|
||||
*/
|
||||
sescmd_cursor_t *scur = &backend_ref->bref_sescmd_cur;
|
||||
|
||||
/** Return if there are no pending ses commands */
|
||||
if (sescmd_cursor_get_command(scur) == NULL)
|
||||
{
|
||||
succp = true;
|
||||
MXS_INFO("Cursor had no pending session commands.");
|
||||
return succp;
|
||||
}
|
||||
|
||||
if (!sescmd_cursor_is_active(scur))
|
||||
{
|
||||
/** Cursor is left active when function returns. */
|
||||
sescmd_cursor_set_active(scur, true);
|
||||
}
|
||||
|
||||
int rc = 0;
|
||||
GWBUF *buf;
|
||||
|
||||
switch (scur->scmd_cur_cmd->my_sescmd_packet_type)
|
||||
{
|
||||
case MYSQL_COM_CHANGE_USER:
|
||||
/** This makes it possible to handle replies correctly */
|
||||
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
|
||||
buf = sescmd_cursor_clone_querybuf(scur);
|
||||
rc = dcb->func.auth(dcb, NULL, dcb->session, buf);
|
||||
break;
|
||||
|
||||
case MYSQL_COM_INIT_DB:
|
||||
{
|
||||
/**
|
||||
* Record database name and store to session.
|
||||
*
|
||||
* TODO: Do this in the client protocol module
|
||||
*/
|
||||
GWBUF *tmpbuf;
|
||||
MYSQL_session* data;
|
||||
unsigned int qlen;
|
||||
|
||||
mxs_mysql_set_current_db(dcb->session, "");
|
||||
tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf;
|
||||
qlen = MYSQL_GET_PAYLOAD_LEN((unsigned char *) GWBUF_DATA(tmpbuf));
|
||||
if (qlen)
|
||||
{
|
||||
--qlen; // The COM_INIT_DB byte
|
||||
if (qlen > MYSQL_DATABASE_MAXLEN)
|
||||
{
|
||||
MXS_ERROR("Too long a database name received in COM_INIT_DB, "
|
||||
"trailing data will be cut.");
|
||||
qlen = MYSQL_DATABASE_MAXLEN;
|
||||
}
|
||||
|
||||
char db[qlen + 1];
|
||||
memcpy(db, (char*)GWBUF_DATA(tmpbuf) + 5, qlen);
|
||||
db[qlen] = 0;
|
||||
mxs_mysql_set_current_db(dcb->session, db);
|
||||
}
|
||||
}
|
||||
/** Fallthrough */
|
||||
case MYSQL_COM_QUERY:
|
||||
default:
|
||||
/**
|
||||
* Mark session command buffer, it triggers writing
|
||||
* MySQL command to protocol
|
||||
*/
|
||||
|
||||
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
|
||||
buf = sescmd_cursor_clone_querybuf(scur);
|
||||
rc = dcb->func.write(dcb, buf);
|
||||
break;
|
||||
}
|
||||
|
||||
if (rc == 1)
|
||||
{
|
||||
succp = true;
|
||||
ss_dassert(backend_ref->reply_state == REPLY_STATE_DONE);
|
||||
LOG_RS(backend_ref, REPLY_STATE_START);
|
||||
backend_ref->reply_state = REPLY_STATE_START;
|
||||
}
|
||||
|
||||
return succp;
|
||||
}
|
||||
|
||||
/*
|
||||
* End of functions called from other router modules; start of functions that
|
||||
* are internal to this module
|
||||
*/
|
||||
|
||||
/**
|
||||
* Get client DCB pointer of the router client session.
|
||||
* This routine must be protected by Router client session lock.
|
||||
*
|
||||
* APPEARS TO NEVER BE USED!!
|
||||
*
|
||||
* @param rses Router client session pointer
|
||||
*
|
||||
* @return Pointer to client DCB
|
||||
*/
|
||||
static DCB *rses_get_client_dcb(ROUTER_CLIENT_SES *rses)
|
||||
{
|
||||
DCB *dcb = NULL;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < rses->rses_nbackends; i++)
|
||||
{
|
||||
if ((dcb = rses->rses_backend_ref[i].bref_dcb) != NULL &&
|
||||
BREF_IS_IN_USE(&rses->rses_backend_ref[i]) && dcb->session != NULL &&
|
||||
dcb->session->client_dcb != NULL)
|
||||
{
|
||||
return dcb->session->client_dcb;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* The following are internal (directly or indirectly) to routing a statement
|
||||
* and should be moved to rwsplit_route_cmd.c if the MySQL specific code can
|
||||
* be removed.
|
||||
*/
|
||||
|
||||
sescmd_cursor_t *backend_ref_get_sescmd_cursor(backend_ref_t *bref)
|
||||
{
|
||||
sescmd_cursor_t *scur;
|
||||
CHK_BACKEND_REF(bref);
|
||||
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
CHK_SESCMD_CUR(scur);
|
||||
|
||||
return scur;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an error message to the client telling that the server is in read only mode
|
||||
* @param dcb Client DCB
|
||||
|
Reference in New Issue
Block a user