Execution of session commands failed because session commands and normal sql statements were executed in a wrong order if backend was executing previous session command while new sql stmt was routed to that backend. There was a window where ordering went wrong. It is possible that one normal sql stmt arrives while previous sescmds are still being executed. Introduced a new member in backend_ref_t structure, bref_pending_cmd where new sql stmt pointer is stored in that case. When sescmds are executed completely, that command is automatically executed next.
This commit is contained in:
@ -485,7 +485,6 @@ bool succp = false;
|
|||||||
/*<
|
/*<
|
||||||
* Close file descriptor and move to clean-up phase.
|
* Close file descriptor and move to clean-up phase.
|
||||||
*/
|
*/
|
||||||
ss_dassert(excluded != dcb);
|
|
||||||
rc = close(dcb->fd);
|
rc = close(dcb->fd);
|
||||||
|
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
|
@ -224,6 +224,7 @@ typedef struct backend_ref_st {
|
|||||||
bref_state_t bref_state;
|
bref_state_t bref_state;
|
||||||
int bref_num_result_wait;
|
int bref_num_result_wait;
|
||||||
sescmd_cursor_t bref_sescmd_cur;
|
sescmd_cursor_t bref_sescmd_cur;
|
||||||
|
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
|
||||||
#if defined(SS_DEBUG)
|
#if defined(SS_DEBUG)
|
||||||
skygw_chk_t bref_chk_tail;
|
skygw_chk_t bref_chk_tail;
|
||||||
#endif
|
#endif
|
||||||
|
@ -1642,6 +1642,8 @@ void protocol_archive_srv_command(
|
|||||||
server_command_t* h1;
|
server_command_t* h1;
|
||||||
int len = 0;
|
int len = 0;
|
||||||
|
|
||||||
|
CHK_PROTOCOL(p);
|
||||||
|
|
||||||
spinlock_acquire(&p->protocol_lock);
|
spinlock_acquire(&p->protocol_lock);
|
||||||
|
|
||||||
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
|
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
|
||||||
@ -1692,6 +1694,7 @@ void protocol_archive_srv_command(
|
|||||||
|
|
||||||
retblock:
|
retblock:
|
||||||
spinlock_release(&p->protocol_lock);
|
spinlock_release(&p->protocol_lock);
|
||||||
|
CHK_PROTOCOL(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1957,6 +1957,26 @@ static int routeQuery(
|
|||||||
|
|
||||||
if (succp) /*< Have DCB of the target backend */
|
if (succp) /*< Have DCB of the target backend */
|
||||||
{
|
{
|
||||||
|
backend_ref_t* bref;
|
||||||
|
sescmd_cursor_t* scur;
|
||||||
|
|
||||||
|
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
|
||||||
|
scur = &bref->bref_sescmd_cur;
|
||||||
|
/**
|
||||||
|
* Store current stmt if execution of previous session command
|
||||||
|
* haven't completed yet. Note that according to MySQL protocol
|
||||||
|
* there can only be one such non-sescmd stmt at the time.
|
||||||
|
*/
|
||||||
|
if (sescmd_cursor_is_active(scur))
|
||||||
|
{
|
||||||
|
ss_dassert(bref->bref_pending_cmd == NULL);
|
||||||
|
bref->bref_pending_cmd = gwbuf_clone(querybuf);
|
||||||
|
|
||||||
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
ret = 1;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
|
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
|
||||||
{
|
{
|
||||||
backend_ref_t* bref;
|
backend_ref_t* bref;
|
||||||
@ -2295,7 +2315,34 @@ static void clientReply (
|
|||||||
|
|
||||||
ss_dassert(succp);
|
ss_dassert(succp);
|
||||||
}
|
}
|
||||||
/** Unlock router session */
|
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
CHK_GWBUF(bref->bref_pending_cmd);
|
||||||
|
|
||||||
|
if ((ret = bref->bref_dcb->func.write(bref->bref_dcb,
|
||||||
|
gwbuf_clone(bref->bref_pending_cmd))) == 1)
|
||||||
|
{
|
||||||
|
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||||
|
atomic_add(&inst->stats.n_queries, 1);
|
||||||
|
/**
|
||||||
|
* Add one query response waiter to backend reference
|
||||||
|
*/
|
||||||
|
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||||
|
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Routing query \"%s\" failed.",
|
||||||
|
bref->bref_pending_cmd)));
|
||||||
|
}
|
||||||
|
gwbuf_free(bref->bref_pending_cmd);
|
||||||
|
bref->bref_pending_cmd = NULL;
|
||||||
|
}
|
||||||
|
/** Unlock router session */
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
|
||||||
lock_failed:
|
lock_failed:
|
||||||
@ -3656,6 +3703,14 @@ static bool route_session_write(
|
|||||||
{
|
{
|
||||||
succp = false;
|
succp = false;
|
||||||
}
|
}
|
||||||
|
else if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Wrote to %s:%d",
|
||||||
|
backend_ref[i].bref_backend->backend_server->name,
|
||||||
|
backend_ref[i].bref_backend->backend_server->port)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
Reference in New Issue
Block a user