Fixed bug in how response messages from backends were handled. Now messages are examined if session command cursor is active

. That means that (at least) the first packet  in the message is response to session command. Then if the response is alread
y forwarded to client it is discarded. Otherwise it is routed to client and the command is marked as responded so that the o
ther backend knows to discard its duplicate response.
This commit is contained in:
VilhoRaatikka
2014-03-17 22:26:32 +02:00
parent 90f701be8e
commit 67d9b3afb9

View File

@ -141,10 +141,10 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
static bool sescmd_cursor_next( static bool sescmd_cursor_next(
sescmd_cursor_t* scur); sescmd_cursor_t* scur);
static bool sescmd_reply_to_client( static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb, DCB* client_dcb,
mysql_sescmd_t* scmd, GWBUF* replybuf,
GWBUF* writebuf); sescmd_cursor_t* scur);
static bool cont_exec_sescmd_in_backend( static bool cont_exec_sescmd_in_backend(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
@ -1020,37 +1020,34 @@ static void clientReply(
"reply_by_statement", "reply_by_statement",
backend_dcb, backend_dcb,
gwbuf_clone(writebuf))); gwbuf_clone(writebuf)));
/** Lock router session */ /** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses)) if (!rses_begin_locked_router_action(router_cli_ses))
{ {
/** Log to debug that router was closed */ /** Log to debug that router was closed */
goto lock_failed; goto lock_failed;
} }
scur = rses_get_sescmd_cursor(router_cli_ses, be_type); scur = rses_get_sescmd_cursor(router_cli_ses, be_type);
/** /**
* Active cursor means that reply is from session command * Active cursor means that reply is from session command
* execution. * execution. Majority of the time there are no session commands
* being executed.
*/ */
if (sescmd_cursor_is_active(scur)) if (sescmd_cursor_is_active(scur))
{ {
mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); writebuf = sescmd_cursor_process_replies(client_dcb,
sescmd_reply_to_client(client_dcb, scmd, writebuf); writebuf,
/** When sescmd list is empty set cursor passive. */ scur);
if (!sescmd_cursor_next(scur))
{
sescmd_cursor_set_active(scur, false);
} }
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
}
else if (client_dcb != NULL) if (writebuf != NULL && client_dcb != NULL)
{ {
/** Write reply to client DCB */ /** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf); client_dcb->func.write(client_dcb, writebuf);
ss_dassert(!sescmd_cursor_is_active(scur));
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
LOGIF(LT, (skygw_log_write_flush( LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [clientReply:rwsplit] client dcb %p, " "%lu [clientReply:rwsplit] client dcb %p, "
@ -1059,6 +1056,7 @@ static void clientReply(
client_dcb, client_dcb,
backend_dcb))); backend_dcb)));
} }
lock_failed: lock_failed:
return; return;
} }
@ -1307,7 +1305,10 @@ static void rses_property_add(
} }
} }
/** Router session must be locked */ /**
* Router session must be locked.
* Return session command pointer if succeed, NULL if failed.
*/
static mysql_sescmd_t* rses_property_get_sescmd( static mysql_sescmd_t* rses_property_get_sescmd(
rses_property_t* prop) rses_property_t* prop)
{ {
@ -1377,66 +1378,89 @@ static void mysql_sescmd_done(
memset(sescmd, 0, sizeof(mysql_sescmd_t)); memset(sescmd, 0, sizeof(mysql_sescmd_t));
} }
/** /**
* Write session command reply from backend to client if command haven't yet * All cases where backend message starts at least with one response to session
* been replied. * command are handled here.
* Return true if succeed, false if command was already replied. * Read session commands from property list. If command is already replied,
* * discard packet. Else send reply to client. In both cases move cursor forward
* Router session must be locked */ * until all session command replies are handled.
static bool sescmd_reply_to_client( */
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb, DCB* client_dcb,
mysql_sescmd_t* scmd, GWBUF* replybuf,
GWBUF* writebuf) sescmd_cursor_t* scur)
{ {
bool succp = false; const size_t headerlen = 4; /*< mysql packet header */
uint8_t* packet;
size_t packetlen;
mysql_sescmd_t* scmd;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
CHK_DCB(client_dcb); CHK_DCB(client_dcb);
CHK_MYSQL_SESCMD(scmd); CHK_GWBUF(replybuf);
CHK_GWBUF(writebuf);
ss_dassert(SPINLOCK_IS_LOCKED(
&scmd->my_sescmd_prop->rses_prop_rsession->rses_lock));
/** /**
* This depends on MySQL protoocl and doesn't work with others. * Walk through packets in the message and the list of session
* TODO: write a function to MySQL protocol module and add *commands.
* a new protocol function 'discard n first messages'.
*/ */
while (scmd != NULL && replybuf != NULL)
{
if (scmd->my_sescmd_is_replied) if (scmd->my_sescmd_is_replied)
{ {
size_t len;
uint8_t* packet;
/** /**
* Skip reply message because it is duplicate of alredy * Discard heading packets if their related command is
* replied message. * already replied.
*/ */
packet = (uint8_t*)writebuf->start; CHK_GWBUF(replybuf);
len = packet[0]; packet = (uint8_t *)GWBUF_DATA(replybuf);
len += packet[1]*256; packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
len += packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
writebuf = gwbuf_consume(writebuf, len+4);
if (writebuf == NULL || GWBUF_EMPTY(writebuf))
{
succp = true;
goto return_succp;
}
}
client_dcb->func.write(client_dcb, writebuf);
scmd->my_sescmd_is_replied = true;
succp = true;
LOGIF(LT, (skygw_log_write_flush( LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [sescmd_reply_to_client] Replied cmd %p to client dcb %p.", "%lu [sescmd_cursor_process_replies] cmd %p "
"is already replied. Discarded %d bytes from "
"the %s replybuffer.",
pthread_self(), pthread_self(),
scmd, scmd,
client_dcb))); packetlen+headerlen,
STRBETYPE(scur->scmd_cur_be_type))));
}
else
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_cursor_process_replies] Marked "
"cmd %p to as replied. Left message to %s's "
"buffer for reply.",
pthread_self(),
scmd,
STRBETYPE(scur->scmd_cur_be_type))));
}
return_succp: if (sescmd_cursor_next(scur))
return succp; {
scmd = sescmd_cursor_get_command(scur);
}
else
{
scmd = NULL;
/** All session commands are replied */
scur->scmd_cur_active = false;
}
}
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
return replybuf;
} }
/** /**
* Get the address of current session command. * Get the address of current session command.
* *
@ -1446,8 +1470,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
{ {
mysql_sescmd_t* scmd; mysql_sescmd_t* scmd;
ss_dassert(SPINLOCK_IS_LOCKED(&scur->scmd_cur_rses->rses_lock)); ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property); scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property);
CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); CHK_MYSQL_SESCMD(scur->scmd_cur_cmd);
@ -1542,6 +1565,7 @@ static bool execute_sescmd_in_backend(
succp = false; succp = false;
goto return_succp; goto return_succp;
} }
if (!sescmd_cursor_is_active(scur)) if (!sescmd_cursor_is_active(scur))
{ {
/** Cursor is left active when function returns. */ /** Cursor is left active when function returns. */
@ -1551,6 +1575,7 @@ static bool execute_sescmd_in_backend(
"execute_sescmd_in_backend", "execute_sescmd_in_backend",
dcb, dcb,
sescmd_cursor_clone_querybuf(scur))); sescmd_cursor_clone_querybuf(scur)));
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case COM_CHANGE_USER: case COM_CHANGE_USER:
rc = dcb->func.auth( rc = dcb->func.auth(
@ -1584,57 +1609,10 @@ return_succp:
return succp; return succp;
} }
/**
* Execute session commands when cursor is already active.
*
* Router session must be locked
*
* Return true if there was pending sescmd and sending command to
* backend server succeed. Otherwise false.
*/
static bool cont_exec_sescmd_in_backend(
ROUTER_CLIENT_SES* rses,
backend_type_t be_type)
{
DCB* dcb;
bool succp = true;
int rc = 0;
sescmd_cursor_t* scur;
dcb = rses->rses_dcb[be_type];
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
scur = rses_get_sescmd_cursor(rses, be_type);
ss_dassert(sescmd_cursor_is_active(scur));
/** Return if there are no pending ses commands */
if (scur->scmd_cur_cmd == NULL)
{
succp = false;
goto return_succp;
}
LOGIF(LT, tracelog_routed_query(rses,
"cont_exec_sescmd_in_backend",
dcb,
sescmd_cursor_clone_querybuf(scur)));
rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur));
if (rc != 1)
{
succp = false;
}
return_succp:
return succp;
}
/** /**
* Moves cursor to next property and copied address of its sescmd to cursor. * Moves cursor to next property and copied address of its sescmd to cursor.
* Current propery must be non-null. * Current propery must be non-null.
* If current property is the last on the list, *scur->scmd_ptr_property == NULL
* *
* Router session must be locked * Router session must be locked
*/ */
@ -1695,7 +1673,7 @@ static bool sescmd_cursor_next(
} }
else else
{ {
/** Log error, sescmd shouldn't be NULL */ ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */
} }
return_succp: return_succp:
return succp; return succp;