This commit is contained in:
Mark Riddoch 2014-09-30 13:27:03 +01:00
commit bce0716861
3 changed files with 60 additions and 1 deletions

View File

@ -224,6 +224,7 @@ typedef struct backend_ref_st {
bref_state_t bref_state;
int bref_num_result_wait;
sescmd_cursor_t bref_sescmd_cur;
GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif

View File

@ -1642,6 +1642,8 @@ void protocol_archive_srv_command(
server_command_t* h1;
int len = 0;
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
@ -1692,6 +1694,7 @@ void protocol_archive_srv_command(
retblock:
spinlock_release(&p->protocol_lock);
CHK_PROTOCOL(p);
}

View File

@ -1957,6 +1957,26 @@ static int routeQuery(
if (succp) /*< Have DCB of the target backend */
{
backend_ref_t* bref;
sescmd_cursor_t* scur;
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
scur = &bref->bref_sescmd_cur;
/**
* Store current stmt if execution of previous session command
* haven't completed yet. Note that according to MySQL protocol
* there can only be one such non-sescmd stmt at the time.
*/
if (sescmd_cursor_is_active(scur))
{
ss_dassert(bref->bref_pending_cmd == NULL);
bref->bref_pending_cmd = gwbuf_clone(querybuf);
rses_end_locked_router_action(router_cli_ses);
ret = 1;
goto retblock;
}
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
{
backend_ref_t* bref;
@ -2295,7 +2315,34 @@ static void clientReply (
ss_dassert(succp);
}
/** Unlock router session */
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
{
int ret;
CHK_GWBUF(bref->bref_pending_cmd);
if ((ret = bref->bref_dcb->func.write(bref->bref_dcb,
gwbuf_clone(bref->bref_pending_cmd))) == 1)
{
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
atomic_add(&inst->stats.n_queries, 1);
/**
* Add one query response waiter to backend reference
*/
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed.",
bref->bref_pending_cmd)));
}
gwbuf_free(bref->bref_pending_cmd);
bref->bref_pending_cmd = NULL;
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
lock_failed:
@ -3656,6 +3703,14 @@ static bool route_session_write(
{
succp = false;
}
else if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Wrote to %s:%d",
backend_ref[i].bref_backend->backend_server->name,
backend_ref[i].bref_backend->backend_server->port)));
}
}
}
rses_end_locked_router_action(router_cli_ses);