Support for prepared statement, namely support for following comands : COM_STMT_PREPARE, COM_STMT_EXECUTE, COM_STMT_SEND_LONG_DATA, COM_STMT_RESET, COM_STMT_CLOSE, SQLCOM_PREPARE, SQLCOM_EXECUTE, SQLCOM_DEALLOCATE_PREPARE (DEALLOCATE/DROP PREPARE stmt).

All prepare commands are executed in every backend server currently connected.

All executes are routed to master. If stmt type was recorded in prepare phase in rwsplit router, read-only stmts could be routed to slaves.

COM_STMT_PREPARE gets arbitrary number of response packets from backend database. Since statements are prepared in every backend server and only one multi-packet response can be replied to client, redundant multi-packet responses are discarded. This is done in router. Mechanisms from session command handling are utilized with little changes: router must identify when response consists of multiple packets so that it knows to calculate the number of packets in response and that it is able to discard correct number of packets.

Information to the reply-handling router is provided by backend protocol, which includes a ordered list of commands of commands sent to protocol-owning backend server. A command is stored to protocol struct in mysql_backend.c:gw_MySQLWrite_backend if the statement buffer's type has GWBUF_TYPE_SINGLE_STMT set in mysql_client.c:route_by_statement. GWBUF_TYPE_SINGLE_STATEMENT indicates that there is single statement in the buffer, as opposite to Read Connection router, which accepts streaming input from client.
This commit is contained in:
VilhoRaatikka
2014-06-25 17:15:46 +03:00
parent 2d128de85f
commit 7ff14e23a5
6 changed files with 791 additions and 326 deletions

View File

@ -511,6 +511,7 @@ static skygw_query_type_t resolve_query_type(
}
/**<! fall through */
case SQLCOM_CHANGE_DB:
case SQLCOM_DEALLOCATE_PREPARE:
type |= QUERY_TYPE_SESSION_WRITE;
break;

View File

@ -30,15 +30,33 @@
*/
#include <dcb.h>
#include <hashtable.h>
#undef PREP_STMT_CACHING
#if defined(PREP_STMT_CACHING)
typedef enum prep_stmt_type {
PREP_STMT_NAME,
PREP_STMT_ID
} prep_stmt_type_t;
typedef enum prep_stmt_state {
PREP_STMT_ALLOC,
PREP_STMT_SENT,
PREP_STMT_RECV,
PREP_STMT_DROPPED
} prep_stmt_state_t;
#endif /*< PREP_STMT_CACHING */
typedef enum bref_state {
BREF_NOT_USED = 0x00,
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */
BREF_CLOSED = 0x04
} bref_state_t;
#define BREF_IS_NOT_USED(s) (s->bref_state & BREF_NOT_USED)
#define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
@ -186,6 +204,25 @@ typedef struct rwsplit_config_st {
} rwsplit_config_t;
#if defined(PREP_STMT_CACHING)
typedef struct prep_stmt_st {
#if defined(SS_DEBUG)
skygw_chk_t pstmt_chk_top;
#endif
union id {
int seq;
char* name;
} pstmt_id;
prep_stmt_state_t pstmt_state;
prep_stmt_type_t pstmt_type;
#if defined(SS_DEBUG)
skygw_chk_t pstmt_chk_tail;
#endif
} prep_stmt_t;
#endif /*< PREP_STMT_CACHING */
/**
* The client session structure used within this router.
*/
@ -205,7 +242,9 @@ struct router_client_session {
int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled;
bool rses_transaction_active;
uint64_t rses_id; /*< ID for router client session */
#if defined(PREP_STMT_CACHING)
HASHTABLE* rses_prep_stmt[2];
#endif
struct router_client_session* next;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;

View File

@ -163,7 +163,7 @@ static int gw_read_backend_event(DCB *dcb) {
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
#if 1
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] Read dcb %p fd %d protocol "
@ -171,8 +171,9 @@ static int gw_read_backend_event(DCB *dcb) {
pthread_self(),
dcb,
dcb->fd,
backend_protocol->state,
STRPROTOCOLSTATE(backend_protocol->state))));
backend_protocol->protocol_auth_state,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
#endif
/* backend is connected:
@ -186,17 +187,18 @@ static int gw_read_backend_event(DCB *dcb) {
* If starting to auhenticate with backend server, lock dcb
* to prevent overlapping processing of auth messages.
*/
if (backend_protocol->state == MYSQL_CONNECTED) {
if (backend_protocol->protocol_auth_state == MYSQL_CONNECTED)
{
spinlock_acquire(&dcb->authlock);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
if (backend_protocol->state == MYSQL_CONNECTED) {
if (gw_read_backend_handshake(backend_protocol) != 0) {
backend_protocol->state = MYSQL_AUTH_FAILED;
if (backend_protocol->protocol_auth_state == MYSQL_CONNECTED)
{
if (gw_read_backend_handshake(backend_protocol) != 0)
{
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
@ -205,7 +207,9 @@ static int gw_read_backend_event(DCB *dcb) {
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else {
}
else
{
/* handshake decoded, send the auth credentials */
if (gw_send_authentication_to_backend(
current_session->db,
@ -213,7 +217,7 @@ static int gw_read_backend_event(DCB *dcb) {
current_session->client_sha1,
backend_protocol) != 0)
{
backend_protocol->state = MYSQL_AUTH_FAILED;
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
@ -221,31 +225,31 @@ static int gw_read_backend_event(DCB *dcb) {
"fd %d, state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else {
backend_protocol->state = MYSQL_AUTH_RECV;
}
else
{
backend_protocol->protocol_auth_state = MYSQL_AUTH_RECV;
}
}
}
spinlock_release(&dcb->authlock);
}
/*
* Now:
* -- check the authentication reply from backend
* OR
* -- handle a previous handshake error
*/
if (backend_protocol->state == MYSQL_AUTH_RECV ||
backend_protocol->state == MYSQL_AUTH_FAILED)
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_RECV ||
backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED)
{
spinlock_acquire(&dcb->authlock);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
if (backend_protocol->state == MYSQL_AUTH_RECV ||
backend_protocol->state == MYSQL_AUTH_FAILED)
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_RECV ||
backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED)
{
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
@ -259,7 +263,7 @@ static int gw_read_backend_event(DCB *dcb) {
router_instance = session->service->router_instance;
rsession = session->router_session;
if (backend_protocol->state == MYSQL_AUTH_RECV) {
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_RECV) {
/*<
* Read backed auth reply
*/
@ -268,14 +272,14 @@ static int gw_read_backend_event(DCB *dcb) {
switch (receive_rc) {
case -1:
backend_protocol->state = MYSQL_AUTH_FAILED;
backend_protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_receive_backend_authentication "
"fd %d, state = MYSQL_AUTH_FAILED.",
backend_protocol->owner_dcb->fd,
pthread_self())));
pthread_self(),
backend_protocol->owner_dcb->fd)));
LOGIF(LE, (skygw_log_write_flush(
@ -286,7 +290,7 @@ static int gw_read_backend_event(DCB *dcb) {
current_session->user)));
break;
case 1:
backend_protocol->state = MYSQL_IDLE;
backend_protocol->protocol_auth_state = MYSQL_IDLE;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
@ -316,7 +320,7 @@ static int gw_read_backend_event(DCB *dcb) {
} /* switch */
}
if (backend_protocol->state == MYSQL_AUTH_FAILED)
if (backend_protocol->protocol_auth_state == MYSQL_AUTH_FAILED)
{
/**
* protocol state won't change anymore,
@ -338,9 +342,14 @@ static int gw_read_backend_event(DCB *dcb) {
bool succp;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling.")));
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] "
"calling handleError. Backend "
"DCB %p, session %p",
pthread_self(),
dcb,
dcb->session)));
#endif
errbuf = mysql_create_custom_error(
@ -358,6 +367,15 @@ static int gw_read_backend_event(DCB *dcb) {
ss_dassert(!succp);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] "
"after calling handleError. Backend "
"DCB %p, session %p",
pthread_self(),
dcb,
dcb->session)));
if (session != NULL)
{
spinlock_acquire(&session->ses_lock);
@ -371,7 +389,7 @@ static int gw_read_backend_event(DCB *dcb) {
}
else
{
ss_dassert(backend_protocol->state == MYSQL_IDLE);
ss_dassert(backend_protocol->protocol_auth_state == MYSQL_IDLE);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] "
@ -396,11 +414,13 @@ static int gw_read_backend_event(DCB *dcb) {
/* reading MySQL command output from backend and writing to the client */
{
GWBUF *writebuf = NULL;
GWBUF *readbuf = NULL;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
int nbytes_read = 0;
mysql_server_cmd_t srvcmd = MYSQL_COM_UNDEFINED;
CHK_SESSION(session);
router = session->service->router;
@ -408,32 +428,17 @@ static int gw_read_backend_event(DCB *dcb) {
rsession = session->router_session;
/* read available backend data */
rc = dcb_read(dcb, &writebuf);
rc = dcb_read(dcb, &readbuf);
if (rc < 0)
{
/*< vraa : errorHandle */
/*<
* Backend generated EPOLLIN event and if backend has
* failed, connection must be closed to avoid backend
* dcb from getting hanged.
*/
GWBUF* errbuf;
bool succp;
/**
* - send error for client
* - mark failed backend BREF_NOT_USED
* - go through all servers and select one according to
* the criteria that user specified in the beginning.
*/
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling #2.")));
#endif
errbuf = mysql_create_custom_error(
1,
0,
@ -456,33 +461,111 @@ static int gw_read_backend_event(DCB *dcb) {
rc = 0;
goto return_rc;
}
nbytes_read = gwbuf_length(readbuf);
if (writebuf == NULL) {
rc = 0;
if (nbytes_read == 0)
{
goto return_rc;
}
else
{
ss_dassert(readbuf != NULL);
}
/**
* ask for next response (1 or more packets) like in
* gw_MySQL_get_next_packet but gw_MySQL_get_next_response
*/
srvcmd = protocol_get_srv_command((MySQLProtocol *)dcb->protocol,
false);
/**
* If backend DCB is waiting for response to COM_STMT_PREPARE,
* it, then only that must be passed to clientReply.
*
* If response consists of ses cmd response and response to
* COM_STMT_PREPARE, there can't be anything after
* COM_STMT_PREPARE response because whole buffer may be
* discarded since router doesn't know the borderlines of MySQL
* packets.
*/
/**
* Read all packets from <readbuf> which belong to STMT PREPARE
* response.
* Move packets not belonging to STMT PREPARE response to
* dcb_readqueue.
* When whole response is read, pass <readbuf> forward to
* clientReply.
*/
if (srvcmd == MYSQL_COM_STMT_PREPARE)
{
MySQLProtocol* p;
int nresponse_packets;
GWBUF* tmpbuf;
p = (MySQLProtocol *)dcb->protocol;
nresponse_packets = protocol_get_nresponse_packets(p);
/** count only once per response */
if (nresponse_packets == 0)
{
nresponse_packets = get_stmt_nresponse_packets(
readbuf,
srvcmd);
}
tmpbuf = gw_MySQL_get_packets(&readbuf, &nresponse_packets);
gwbuf_append(dcb->dcb_readqueue, readbuf);
readbuf = tmpbuf;
/** <readbuf> contains incomplete response to STMT PREPARE */
if (nresponse_packets != 0)
{
rc = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend fd %d read incomplete response packet. "
"Waiting %d more, cmd %s.",
dcb->fd,
nresponse_packets,
STRPACKETTYPE(srvcmd))));
/**
* store the number of how many packets the
* reponse consists of to backend's protocol.
*/
protocol_set_nresponse_packets(p, nresponse_packets);
goto return_rc;
}
protocol_remove_srv_command((MySQLProtocol *)dcb->protocol);
}
/*<
* If dcb->session->client is freed already it may be NULL.
*/
if (dcb->session->client != NULL) {
if (dcb->session->client != NULL)
{
client_protocol = SESSION_PROTOCOL(dcb->session,
MySQLProtocol);
if (client_protocol != NULL) {
if (client_protocol != NULL)
{
CHK_PROTOCOL(client_protocol);
if (client_protocol->state == MYSQL_IDLE)
if (client_protocol->protocol_auth_state ==
MYSQL_IDLE)
{
gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance,
rsession,
writebuf,
readbuf,
dcb);
rc = 1;
}
goto return_rc;
} else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) {
gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, writebuf, dcb);
}
else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL)
{
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, readbuf, dcb);
rc = 1;
}
}
@ -548,8 +631,8 @@ static int gw_write_backend_event(DCB *dcb) {
goto return_rc;
}
if (backend_protocol->state == MYSQL_PENDING_CONNECT) {
backend_protocol->state = MYSQL_CONNECTED;
if (backend_protocol->protocol_auth_state == MYSQL_PENDING_CONNECT) {
backend_protocol->protocol_auth_state = MYSQL_CONNECTED;
rc = 1;
goto return_rc;
}
@ -569,7 +652,7 @@ return_rc:
}
/*
* Write function for backend DCB
* Write function for backend DCB. Store command to protocol.
*
* @param dcb The DCB of the backend
* @param queue Queue of buffers to write
@ -587,7 +670,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* If auth failed, return value is 0, write and buffered write
* return 1.
*/
switch(backend_protocol->state) {
switch(backend_protocol->protocol_auth_state) {
case MYSQL_AUTH_FAILED:
{
size_t len;
@ -615,6 +698,10 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
}
case MYSQL_IDLE:
{
uint8_t* ptr = GWBUF_DATA(queue);
int cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] write to dcb %p "
@ -622,18 +709,37 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
pthread_self(),
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
spinlock_release(&dcb->authlock);
/**
* Server commands are stored to MySQLProtocol structure
* if buffer always includes a single statement. That
* information is stored in GWBUF type field
* (GWBUF_TYPE_SINGLE_STMT bit).
*/
if (GWBUF_IS_TYPE_SINGLE_STMT(queue))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's DCB fd %d "
"cmd %s protocol state %s.",
dcb->fd,
STRPACKETTYPE(cmd),
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd);
}
/** Write to backend */
rc = dcb_write(dcb, queue);
goto return_rc;
break;
}
default:
/*<
* Now put the incoming data to the delay queue unless backend is
* connected with auth ok
*/
{
uint8_t* ptr = GWBUF_DATA(queue);
int cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] delayed write to "
@ -641,13 +747,37 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
pthread_self(),
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state))));
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/**
* Since it is known that buffer contains one complete
* command, store the command to backend's protocol. When
* backend server responses the command determines how
* response needs to be processed. This is mainly due to
* MYSQL_COM_STMT_PREPARE whose response consists of
* arbitrary number of packets.
*/
if (GWBUF_IS_TYPE_SINGLE_STMT(queue))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's delayqueue fd %d "
"protocol state %s.",
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd);
}
/*<
* Now put the incoming data to the delay queue unless backend is
* connected with auth ok
*/
backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock);
rc = 1;
goto return_rc;
break;
}
}
return_rc:
return rc;
}
@ -755,7 +885,7 @@ static int gw_create_backend_connection(
case 0:
ss_dassert(fd > 0);
protocol->fd = fd;
protocol->state = MYSQL_CONNECTED;
protocol->protocol_auth_state = MYSQL_CONNECTED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_create_backend_connection] Established "
@ -770,7 +900,7 @@ static int gw_create_backend_connection(
case 1:
ss_dassert(fd > 0);
protocol->state = MYSQL_PENDING_CONNECT;
protocol->protocol_auth_state = MYSQL_PENDING_CONNECT;
protocol->fd = fd;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -785,7 +915,7 @@ static int gw_create_backend_connection(
default:
ss_dassert(fd == -1);
ss_dassert(protocol->state == MYSQL_ALLOC);
ss_dassert(protocol->protocol_auth_state == MYSQL_ALLOC);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_create_backend_connection] Connection "

View File

@ -582,7 +582,7 @@ int gw_read_client_event(
/**
* Now there should be at least one complete mysql packet in read_buffer.
*/
switch (protocol->state) {
switch (protocol->protocol_auth_state) {
case MYSQL_AUTH_SENT:
{
@ -595,7 +595,7 @@ int gw_read_client_event(
if (auth_val == 0)
{
SESSION *session = NULL;
protocol->state = MYSQL_AUTH_RECV;
protocol->protocol_auth_state = MYSQL_AUTH_RECV;
/**
* Create session, and a router session for it.
* If successful, there will be backend connection(s)
@ -607,7 +607,8 @@ int gw_read_client_event(
{
CHK_SESSION(session);
ss_dassert(session->state != SESSION_STATE_ALLOC);
protocol->state = MYSQL_IDLE;
protocol->protocol_auth_state = MYSQL_IDLE;
/**
* Send an AUTH_OK packet to the client,
* packet sequence is # 2
@ -616,7 +617,7 @@ int gw_read_client_event(
}
else
{
protocol->state = MYSQL_AUTH_FAILED;
protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] session "
@ -637,7 +638,7 @@ int gw_read_client_event(
}
else
{
protocol->state = MYSQL_AUTH_FAILED;
protocol->protocol_auth_state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] after "
@ -889,7 +890,7 @@ int gw_write_client_event(DCB *dcb)
protocol = (MySQLProtocol *)dcb->protocol;
CHK_PROTOCOL(protocol);
if (protocol->state == MYSQL_IDLE)
if (protocol->protocol_auth_state == MYSQL_IDLE)
{
dcb_drain_writeq(dcb);
goto return_1;
@ -1231,7 +1232,7 @@ int gw_MySQLAccept(DCB *listener)
MySQLSendHandshake(client_dcb);
// client protocol state change
protocol->state = MYSQL_AUTH_SENT;
protocol->protocol_auth_state = MYSQL_AUTH_SENT;
/**
* Set new descriptor to event set. At the same time,
@ -1289,8 +1290,15 @@ static int gw_error_client_event(
SESSION* session;
CHK_DCB(dcb);
session = dcb->session;
CHK_SESSION(session);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_error_client_event] Error event handling for DCB %p "
"in state %s, session %p.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state),
(session != NULL ? session : NULL))));
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
@ -1368,7 +1376,7 @@ gw_client_hangup_event(DCB *dcb)
/**
* Detect if buffer includes partial mysql packet or multiple packets.
* Store partial packet to pendingqueue. Send complete packets one by one
* Store partial packet to dcb_readqueue. Send complete packets one by one
* to router.
*
* It is assumed readbuf includes at least one complete packet.
@ -1387,6 +1395,20 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf)
if (packetbuf != NULL)
{
CHK_GWBUF(packetbuf);
/**
* This means that buffer includes exactly one MySQL
* statement.
* backend func.write uses the information. MySQL backend
* protocol, for example, stores the command identifier
* to protocol structure. When some other thread reads
* the corresponding response the command tells how to
* handle response.
*
* Set it here instead of gw_read_client_event to make
* sure it is set to each (MySQL) packet.
*/
gwbuf_set_type(packetbuf, GWBUF_TYPE_SINGLE_STMT);
/** Route query */
rc = SESSION_ROUTE_QUERY(session, packetbuf);
}
else

View File

@ -113,7 +113,7 @@ static void clientReply(
static void handleError(
ROUTER *instance,
void *router_session,
char *message,
GWBUF *errbuf,
DCB *backend_dcb,
int action,
bool *succp);
@ -708,7 +708,7 @@ static void
handleError(
ROUTER *instance,
void *router_session,
char *message,
GWBUF *errbuf,
DCB *backend_dcb,
int action,
bool *succp)

View File

@ -31,6 +31,7 @@
#include <dcb.h>
#include <spinlock.h>
#include <modinfo.h>
#include <mysql_client_server_protocol.h>
MODULE_INFO info = {
MODULE_API_ROUTER,
@ -97,6 +98,11 @@ static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
#if defined(PREP_STMT_CACHING)
static prep_stmt_t* prep_stmt_init(prep_stmt_type_t type, void* id);
static void prep_stmt_done(prep_stmt_t* pstmt);
#endif /*< PREP_STMT_CACHING */
int bref_cmp_global_conn(
const void* bref1,
const void* bref2);
@ -207,8 +213,7 @@ static bool sescmd_cursor_next(
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur,
bool* has_query);
backend_ref_t* bref);
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
@ -229,8 +234,15 @@ static void refreshInstance(
static void bref_clear_state(backend_ref_t* bref, bref_state_t state);
static void bref_set_state(backend_ref_t* bref, bref_state_t state);
static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref);
static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data);
static bool handle_error_new_connection(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
DCB* backend_dcb,
GWBUF* errmsg);
static bool handle_error_reply_client(SESSION* ses, GWBUF* errmsg);
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
@ -482,8 +494,6 @@ static void* newSession(
}
/** Copy config struct from router instance */
client_rses->rses_config = router->rwsplit_config;
/** Create ID for the new client (router_client_ses) session */
client_rses->rses_id = router_client_ses_seq += 1;
spinlock_release(&router->lock);
/**
@ -571,7 +581,6 @@ static void* newSession(
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
#endif
backend_ref[i].bref_state = 0;
bref_set_state(&backend_ref[i], BREF_NOT_USED);
backend_ref[i].bref_backend = router->servers[i];
/** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
@ -814,6 +823,7 @@ static bool get_dcb(
*p_dcb = backend_ref[i].bref_dcb;
smallest_nconn = b->backend_conn_count;
succp = true;
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
}
}
@ -825,6 +835,7 @@ static bool get_dcb(
{
*p_dcb = backend_ref->bref_dcb;
succp = true;
ss_dassert(backend_ref->bref_dcb->state != DCB_STATE_ZOMBIE);
ss_dassert(
SERVER_IS_MASTER(backend_ref->bref_backend->backend_server) &&
@ -892,7 +903,7 @@ static int routeQuery(
GWBUF* plainsqlbuf = NULL;
char* querystr = NULL;
char* startpos;
unsigned char packet_type;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
DCB* master_dcb = NULL;
@ -901,6 +912,7 @@ static int routeQuery(
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed = false;
size_t len;
MYSQL* mysql = NULL;
CHK_CLIENT_RSES(router_cli_ses);
@ -915,10 +927,10 @@ static int routeQuery(
if (rses_is_closed)
{
/**
* COM_QUIT may have sent by client and as a part of backend
* MYSQL_COM_QUIT may have sent by client and as a part of backend
* closing procedure.
*/
if (packet_type != COM_QUIT)
if (packet_type != MYSQL_COM_QUIT)
{
LOGIF(LE,
(skygw_log_write_flush(
@ -941,21 +953,24 @@ static int routeQuery(
CHK_DCB(master_dcb);
switch(packet_type) {
case COM_QUIT: /**< 1 QUIT will close all sessions */
case COM_INIT_DB: /**< 2 DDL must go to the master */
case COM_REFRESH: /**< 7 - I guess this is session but not sure */
case COM_DEBUG: /**< 0d all servers dump debug info to stdout */
case COM_PING: /**< 0e all servers are pinged */
case COM_CHANGE_USER: /**< 11 all servers change it accordingly */
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
qtype = QUERY_TYPE_SESSION_WRITE;
break;
case COM_CREATE_DB: /**< 5 DDL must go to the master */
case COM_DROP_DB: /**< 6 DDL must go to the master */
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
qtype = QUERY_TYPE_WRITE;
break;
case COM_QUERY:
case MYSQL_COM_QUERY:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
@ -963,24 +978,45 @@ static int routeQuery(
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
// querystr = (char *)GWBUF_DATA(plainsqlbuf);
/*
* querystr = master_dcb->func.getquerystr(
* (void *) gwbuf_clone(querybuf),
* &querystr_is_copy);
/**
* Use mysql handle to query information from parse tree.
* call skygw_query_classifier_free before exit!
*/
qtype = skygw_query_classifier_get_type(querystr, 0);
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
break;
case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case COM_STATISTICS: /**< 9 ? */
case COM_PROCESS_INFO: /**< 0a ? */
case COM_CONNECT: /**< 0b ? */
case COM_PROCESS_KILL: /**< 0c ? */
case COM_TIME: /**< 0f should this be run in gateway ? */
case COM_DELAYED_INSERT: /**< 10 ? */
case COM_DAEMON: /**< 1d ? */
case MYSQL_COM_STMT_PREPARE:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
qtype |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
plainsqlbuf = gwbuf_clone_transform(querybuf,
GWBUF_TYPE_PLAINSQL);
len = GWBUF_LENGTH(plainsqlbuf);
/** unnecessary if buffer includes additional terminating null */
querystr = (char *)malloc(len+1);
memcpy(querystr, startpos, len);
memset(&querystr[len], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
qtype |= QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
default:
break;
} /**< switch by packet type */
@ -1024,11 +1060,13 @@ static int routeQuery(
/**
* Session update is always routed in the same way.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE))
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
{
/**
* It is not sure if the session command in question requires
* response. Statement must be examined in route_session_write.
* response. Statement is examined in route_session_write.
*/
bool succp = route_session_write(
router_cli_ses,
@ -1050,9 +1088,8 @@ static int routeQuery(
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"[%s.%d]\tRead-only query, routing to Slave.",
inst->service->name,
router_cli_ses->rses_id)));
"[%s]\tRead-only query, routing to Slave.",
inst->service->name)));
ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ));
/** Lock router session */
@ -1060,7 +1097,6 @@ static int routeQuery(
{
goto return_ret;
}
succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE);
if (succp)
@ -1152,6 +1188,10 @@ return_ret:
{
free(querystr);
}
if (mysql != NULL)
{
skygw_query_classifier_free(mysql);
}
return ret;
}
@ -1270,7 +1310,7 @@ diagnostic(ROUTER *instance, DCB *dcb)
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void clientReply(
static void clientReply (
ROUTER* instance,
void* router_session,
GWBUF* writebuf,
@ -1279,8 +1319,7 @@ static void clientReply(
DCB* client_dcb;
ROUTER_CLIENT_SES* router_cli_ses;
sescmd_cursor_t* scur = NULL;
backend_ref_t* backend_ref;
int i;
backend_ref_t* bref;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
@ -1323,23 +1362,10 @@ static void clientReply(
/** Log to debug that router was closed */
goto lock_failed;
}
backend_ref = router_cli_ses->rses_backend_ref;
bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
/** find backend_dcb's corresponding BACKEND */
i = 0;
while (i<router_cli_ses->rses_nbackends &&
backend_ref[i].bref_dcb != backend_dcb)
{
i++;
}
ss_dassert(backend_ref[i].bref_dcb == backend_dcb);
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"reply_by_statement",
&backend_ref[i],
gwbuf_clone(writebuf)));
scur = &backend_ref[i].bref_sescmd_cur;
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
/**
* Active cursor means that reply is from session command
* execution. Majority of the time there are no session commands
@ -1347,26 +1373,76 @@ static void clientReply(
*/
if (sescmd_cursor_is_active(scur))
{
bool has_query;
writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf,
scur,
&has_query);
if (has_query)
if (MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
{
bref_clear_state(backend_ref, BREF_WAITING_RESULT);
}
}
SESSION* ses = backend_dcb->session;
uint8_t* buf =
(uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
size_t len = MYSQL_GET_PACKET_LEN(buf);
char* cmdstr = (char *)malloc(len+1);
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
snprintf(cmdstr, len+1, "%s", &buf[5]);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to execute %s in %s:%d.",
cmdstr,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
free(cmdstr);
/** Inform the client */
handle_error_reply_client(ses,writebuf);
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto lock_failed;
}
else
{
/**
* Discard all those responses that have already been sent to
* the client. Return with buffer including response that
* needs to be sent to client or NULL.
*/
writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf,
bref);
}
}
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf);
bref_clear_state(backend_ref, BREF_WAITING_RESULT);
bref_clear_state(bref, BREF_WAITING_RESULT);
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
/** Log to debug that router was closed */
goto lock_failed;
}
/** There is one pending session command to be xexecuted. */
if (sescmd_cursor_is_active(scur))
{
bool succp;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend %s:%d processed reply and starts to execute "
"active cursor.",
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
succp = execute_sescmd_in_backend(bref);
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
lock_failed:
return;
@ -1415,6 +1491,13 @@ static void bref_set_state(
bref_state_t state)
{
bref->bref_state |= state;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Set state %d for %s:%d fd %d",
bref->bref_state,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port,
bref->bref_dcb->fd)));
}
/**
@ -1595,7 +1678,7 @@ static bool select_connect_backend_servers(
{
BACKEND* b = backend_ref[i].bref_backend;
LOGIF(LT, (skygw_log_write(
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Examine server "
"%s:%d %s with %d connections. "
@ -1637,18 +1720,15 @@ static bool select_connect_backend_servers(
*/
execute_sescmd_history(&backend_ref[i]);
/**
* Callback which is called when
* node fails.
* When server fails, this callback
* is called.
*/
dcb_add_callback(
backend_ref[i].bref_dcb,
DCB_REASON_NOT_RESPONDING,
&router_handle_state_switch,
(void *)&backend_ref[i]);
bref_clear_state(&backend_ref[i],
BREF_CLOSED);
bref_clear_state(&backend_ref[i],
BREF_NOT_USED);
backend_ref[i].bref_state = 0;
bref_set_state(&backend_ref[i],
BREF_IN_USE);
/**
@ -1690,20 +1770,20 @@ static bool select_connect_backend_servers(
if (backend_ref[i].bref_dcb != NULL)
{
master_connected = true;
/**
* When server fails, this callback
* is called.
*/
dcb_add_callback(
backend_ref[i].bref_dcb,
DCB_REASON_NOT_RESPONDING,
&router_handle_state_switch,
(void *)&backend_ref[i]);
bref_clear_state(&backend_ref[i],
BREF_NOT_USED);
backend_ref[i].bref_state = 0;
bref_set_state(&backend_ref[i],
BREF_IN_USE);
/** Increase backend connection counter */
/** Increase backend connection counter */
/** Increase backend connection counters */
atomic_add(&b->backend_server->stats.n_current, 1);
atomic_add(&b->backend_server->stats.n_connections, 1);
atomic_add(&b->backend_conn_count, 1);
@ -1717,7 +1797,7 @@ static bool select_connect_backend_servers(
"connection with master %s:%d",
b->backend_server->name,
b->backend_server->port)));
/* handle connect error */
/** handle connect error */
}
}
}
@ -1897,8 +1977,7 @@ static bool select_connect_backend_servers(
/** disconnect opened connections */
dcb_close(backend_ref[i].bref_dcb);
bref_clear_state(&backend_ref[i], BREF_IN_USE);
bref_set_state(&backend_ref[i], BREF_NOT_USED);
/** Decrease backend's connection counter. */
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
}
}
@ -1948,7 +2027,7 @@ static void rses_property_done(
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
break;
default:
LOGIF(LD, (skygw_log_write_flush(
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [rses_property_done] Unknown property type %d "
"in property %p",
@ -2092,16 +2171,16 @@ static void mysql_sescmd_done(
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur,
bool* has_query)
backend_ref_t* bref)
{
const size_t headerlen = 4; /*< mysql packet header */
uint8_t* packet;
size_t packetlen;
uint8_t* packet;
mysql_sescmd_t* scmd;
sescmd_cursor_t* scur;
scur = &bref->bref_sescmd_cur;
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
CHK_DCB(client_dcb);
@ -2113,18 +2192,41 @@ static GWBUF* sescmd_cursor_process_replies(
*/
while (scmd != NULL && replybuf != NULL)
{
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
/**
* Discard heading packets if their related command is
* already replied.
*/
CHK_GWBUF(replybuf);
packet = (uint8_t *)GWBUF_DATA(replybuf);
/**
* If it is response to MYSQL_COM_STMT_PREPARE, then buffer
* only includes the response.
*/
if (scmd->my_sescmd_prop->rses_prop_data.sescmd.my_sescmd_packet_type ==
MYSQL_COM_STMT_PREPARE)
{
while (replybuf != NULL)
{
#if defined(SS_DEBUG)
int buflen;
buflen = GWBUF_LENGTH(replybuf);
replybuf = gwbuf_consume(replybuf, buflen);
#else
replybuf = gwbuf_consume(
replybuf,
GWBUF_LENGTH(replybuf));
#endif
}
}
/** Only consume the leading packet */
else
{
packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
}
else
}
/** Response is in the buffer and it will be sent to client. */
else if (replybuf != NULL)
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
@ -2141,8 +2243,6 @@ static GWBUF* sescmd_cursor_process_replies(
scur->scmd_cur_active = false;
}
}
/** vraa:this is set but only because there's not yet way to find out */
*has_query = false;
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
return replybuf;
@ -2279,12 +2379,13 @@ static bool execute_sescmd_in_backend(
backend_ref_t* backend_ref)
{
DCB* dcb;
bool succp = true;
bool succp;
int rc = 0;
sescmd_cursor_t* scur;
if (BREF_IS_CLOSED(backend_ref))
{
succp = false;
goto return_succp;
}
dcb = backend_ref->bref_dcb;
@ -2301,6 +2402,10 @@ static bool execute_sescmd_in_backend(
if (sescmd_cursor_get_command(scur) == NULL)
{
succp = false;
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Cursor had no pending session commands.")));
goto return_succp;
}
@ -2309,14 +2414,26 @@ static bool execute_sescmd_in_backend(
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
#if defined(SS_DEBUG)
LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses,
"execute_sescmd_in_backend",
backend_ref,
sescmd_cursor_clone_querybuf(scur)));
{
GWBUF* tmpbuf = sescmd_cursor_clone_querybuf(scur);
uint8_t* ptr = GWBUF_DATA(tmpbuf);
unsigned char cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Just before write, fd %d : cmd %s.",
dcb->fd,
STRPACKETTYPE(cmd))));
}
#endif
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case COM_CHANGE_USER:
case MYSQL_COM_CHANGE_USER:
rc = dcb->func.auth(
dcb,
NULL,
@ -2324,8 +2441,8 @@ static bool execute_sescmd_in_backend(
sescmd_cursor_clone_querybuf(scur));
break;
case COM_QUERY:
case COM_INIT_DB:
case MYSQL_COM_QUERY:
case MYSQL_COM_INIT_DB:
default:
rc = dcb->func.write(
dcb,
@ -2341,10 +2458,7 @@ static bool execute_sescmd_in_backend(
if (rc == 1)
{
/**
* All but COM_QUIT cause backend to send reply. flag backend_ref.
*/
bref_set_state(backend_ref, BREF_WAITING_RESULT);
succp = true;
}
else
{
@ -2456,7 +2570,7 @@ static void tracelog_routed_query(
be_type = BACKEND_TYPE(b);
if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL)
if (GWBUF_IS_TYPE_MYSQL(buf))
{
len = packet[0];
len += 256*packet[1];
@ -2480,6 +2594,28 @@ static void tracelog_routed_query(
dcb)));
free(querystr);
}
else if (packet_type == '\x22' ||
packet_type == 0x22 ||
packet_type == '\x26' ||
packet_type == 0x26 ||
true)
{
querystr = (char *)malloc(len);
memcpy(querystr, startpos, len-1);
querystr[len-1] = '\0';
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p",
pthread_self(),
funcname,
buflen,
querystr,
b->backend_server->name,
b->backend_server->port,
STRBETYPE(be_type),
dcb)));
free(querystr);
}
}
gwbuf_free(buf);
}
@ -2543,12 +2679,14 @@ static bool route_session_write(
backend_ref = router_cli_ses->rses_backend_ref;
/**
* COM_QUIT is one-way message. Server doesn't respond to that.
* These are one-way messages and server doesn't respond to them.
* Therefore reply processing is unnecessary and session
* command property is not needed. It is just routed to both
* command property is not needed. It is just routed to all available
* backends.
*/
if (packet_type == COM_QUIT)
if (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA ||
packet_type == MYSQL_COM_QUIT ||
packet_type == MYSQL_COM_STMT_CLOSE)
{
int rc;
@ -2579,6 +2717,14 @@ static bool route_session_write(
gwbuf_free(querybuf);
goto return_succp;
}
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
succp = false;
goto return_succp;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
@ -2587,19 +2733,40 @@ static bool route_session_write(
*/
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
succp = false;
goto return_succp;
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
sescmd_cursor_t* scur;
scur = backend_ref_get_sescmd_cursor(&backend_ref[i]);
/**
* This backend_ref waits reply, flag it.
*/
bref_set_state(get_bref_from_dcb(router_cli_ses,
backend_ref[i].bref_dcb),
BREF_WAITING_RESULT);
/**
* Start execution if cursor is not already executing.
* Otherwise, cursor will execute pending commands
* when it completes with previous commands.
*/
if (sescmd_cursor_is_active(scur))
{
succp = true;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend %s:%d already executing sescmd.",
backend_ref[i].bref_backend->backend_server->name,
backend_ref[i].bref_backend->backend_server->port)));
}
else
{
succp = execute_sescmd_in_backend(&backend_ref[i]);
@ -2614,6 +2781,7 @@ static bool route_session_write(
}
}
}
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
@ -2688,6 +2856,7 @@ static void rwsplit_process_options(
* Even if succp == true connecting to new slave may have failed. succp is to
* tell whether router has enough master/slave connections to continue work.
*/
static void handleError (
ROUTER* instance,
void* router_session,
@ -2711,10 +2880,6 @@ static void handleError (
switch (action) {
case ERRACT_NEW_CONNECTION:
{
int router_nservers;
int max_nslaves;
backend_ref_t* bref;
CHK_CLIENT_RSES(rses);
if (!rses_begin_locked_router_action(rses))
@ -2723,14 +2888,78 @@ static void handleError (
return;
}
*succp = handle_error_new_connection(inst,
rses,
backend_dcb,
errmsgbuf);
rses_end_locked_router_action(rses);
break;
}
case ERRACT_REPLY_CLIENT:
{
*succp = handle_error_reply_client(session, errmsgbuf);
break;
}
default:
*succp = false;
break;
}
}
static bool handle_error_reply_client(
SESSION* ses,
GWBUF* errmsg)
{
session_state_t sesstate;
DCB* client_dcb;
bool succp;
spinlock_acquire(&ses->ses_lock);
sesstate = ses->state;
client_dcb = ses->client;
spinlock_release(&ses->ses_lock);
if (sesstate == SESSION_STATE_ROUTER_READY)
{
CHK_DCB(client_dcb);
client_dcb->func.write(client_dcb, errmsg);
}
succp = false; /** false because new servers aren's selected. */
return succp;
}
/**
* This must be called with router lock
*/
static bool handle_error_new_connection(
ROUTER_INSTANCE* inst,
ROUTER_CLIENT_SES* rses,
DCB* backend_dcb,
GWBUF* errmsg)
{
SESSION* ses;
int router_nservers;
int max_nslaves;
backend_ref_t* bref;
bool succp;
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
ses = backend_dcb->session;
CHK_SESSION(ses);
bref = get_bref_from_dcb(rses, backend_dcb);
/** failed DCB has already been replaced */
if (bref == NULL)
{
rses_end_locked_router_action(rses);
*succp = true;
return;
succp = true;
goto return_succp;
}
/**
* Error handler is already called for this DCB because
@ -2740,8 +2969,8 @@ static void handleError (
if (backend_dcb->state != DCB_STATE_POLLING)
{
rses_end_locked_router_action(rses);
*succp = true;
return;
succp = true;
goto return_succp;
}
CHK_BACKEND_REF(bref);
@ -2749,12 +2978,11 @@ static void handleError (
if (BREF_IS_WAITING_RESULT(bref))
{
DCB* client_dcb;
client_dcb = session->client;
client_dcb->func.write(client_dcb, errmsgbuf);
client_dcb = ses->client;
client_dcb->func.write(client_dcb, errmsg);
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_NOT_USED);
bref_set_state(bref, BREF_CLOSED);
/**
* Remove callback because this DCB won't be used
@ -2772,50 +3000,27 @@ static void handleError (
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
*succp = select_connect_backend_servers(
succp = select_connect_backend_servers(
&rses->rses_master_ref,
rses->rses_backend_ref,
router_nservers,
max_nslaves,
rses->rses_config.rw_slave_select_criteria,
session,
ses,
inst);
rses_end_locked_router_action(rses);
break;
}
case ERRACT_REPLY_CLIENT:
{
session_state_t sesstate;
spinlock_acquire(&session->ses_lock);
sesstate = session->state;
client_dcb = session->client;
spinlock_release(&session->ses_lock);
if (sesstate == SESSION_STATE_ROUTER_READY)
{
CHK_DCB(client_dcb);
client_dcb->func.write(client_dcb, errmsgbuf);
}
succp = false; /** false because new servers aren's selected. */
break;
}
default:
*succp = false;
break;
}
return_succp:
return succp;
}
static void print_error_packet(
ROUTER_CLIENT_SES* rses,
GWBUF* buf,
DCB* dcb)
{
#if defined(SS_DEBUG)
if (buf->gwbuf_type == GWBUF_TYPE_MYSQL)
if (GWBUF_IS_TYPE_MYSQL(buf))
{
while (gwbuf_length(buf) > 0)
{
@ -2969,3 +3174,71 @@ static int router_handle_state_switch(
return_rc:
return rc;
}
static sescmd_cursor_t* backend_ref_get_sescmd_cursor (
backend_ref_t* bref)
{
sescmd_cursor_t* scur;
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
CHK_SESCMD_CUR(scur);
return scur;
}
#if defined(PREP_STMT_CACHING)
#define MAX_STMT_LEN 1024
static prep_stmt_t* prep_stmt_init(
prep_stmt_type_t type,
void* id)
{
prep_stmt_t* pstmt;
pstmt = (prep_stmt_t *)calloc(1, sizeof(prep_stmt_t));
if (pstmt != NULL)
{
#if defined(SS_DEBUG)
pstmt->pstmt_chk_top = CHK_NUM_PREP_STMT;
pstmt->pstmt_chk_tail = CHK_NUM_PREP_STMT;
#endif
pstmt->pstmt_state = PREP_STMT_ALLOC;
pstmt->pstmt_type = type;
if (type == PREP_STMT_NAME)
{
pstmt->pstmt_id.name = strndup((char *)id, MAX_STMT_LEN);
}
else
{
pstmt->pstmt_id.seq = 0;
}
}
CHK_PREP_STMT(pstmt);
return pstmt;
}
static void prep_stmt_done(
prep_stmt_t* pstmt)
{
CHK_PREP_STMT(pstmt);
if (pstmt->pstmt_type == PREP_STMT_NAME)
{
free(pstmt->pstmt_id.name);
}
free(pstmt);
}
static bool prep_stmt_drop(
prep_stmt_t* pstmt)
{
CHK_PREP_STMT(pstmt);
pstmt->pstmt_state = PREP_STMT_DROPPED;
return true;
}
#endif /*< PREP_STMT_CACHING */