diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 5b17dbc7d..482ecea27 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -224,6 +224,7 @@ typedef struct backend_ref_st { bref_state_t bref_state; int bref_num_result_wait; sescmd_cursor_t bref_sescmd_cur; + GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; #endif diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 2f845f268..b1bd5481d 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -1642,6 +1642,8 @@ void protocol_archive_srv_command( server_command_t* h1; int len = 0; + CHK_PROTOCOL(p); + spinlock_acquire(&p->protocol_lock); if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE) @@ -1692,6 +1694,7 @@ void protocol_archive_srv_command( retblock: spinlock_release(&p->protocol_lock); + CHK_PROTOCOL(p); } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 451b29742..c602d8913 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1957,6 +1957,26 @@ static int routeQuery( if (succp) /*< Have DCB of the target backend */ { + backend_ref_t* bref; + sescmd_cursor_t* scur; + + bref = get_bref_from_dcb(router_cli_ses, target_dcb); + scur = &bref->bref_sescmd_cur; + /** + * Store current stmt if execution of previous session command + * haven't completed yet. Note that according to MySQL protocol + * there can only be one such non-sescmd stmt at the time. + */ + if (sescmd_cursor_is_active(scur)) + { + ss_dassert(bref->bref_pending_cmd == NULL); + bref->bref_pending_cmd = gwbuf_clone(querybuf); + + rses_end_locked_router_action(router_cli_ses); + ret = 1; + goto retblock; + } + if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1) { backend_ref_t* bref; @@ -2295,7 +2315,34 @@ static void clientReply ( ss_dassert(succp); } - /** Unlock router session */ + else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ + { + int ret; + + CHK_GWBUF(bref->bref_pending_cmd); + + if ((ret = bref->bref_dcb->func.write(bref->bref_dcb, + gwbuf_clone(bref->bref_pending_cmd))) == 1) + { + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; + atomic_add(&inst->stats.n_queries, 1); + /** + * Add one query response waiter to backend reference + */ + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query \"%s\" failed.", + bref->bref_pending_cmd))); + } + gwbuf_free(bref->bref_pending_cmd); + bref->bref_pending_cmd = NULL; + } + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); lock_failed: @@ -3656,6 +3703,14 @@ static bool route_session_write( { succp = false; } + else if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Wrote to %s:%d", + backend_ref[i].bref_backend->backend_server->name, + backend_ref[i].bref_backend->backend_server->port))); + } } } rses_end_locked_router_action(router_cli_ses);