readwritesplit.c : router is changed so that it guarantees to keep the execution order of session commands and queries when they are routed to backend servers. In the same way it maintains the order of response packets and discards duplicate responses.

For each session command a sescmd property is created and added to the end of list of session commands. List is owned by router client session and it includes all session commands from the beginning of router session.
Router maintains an individual session command cursor for each backend. A cursor refers to the first session command which the corresponding backend server haven't yet responded yet.
When response message arrives at any time from backend, first it is checked whether backend's cursor is active. Cursor is active if a session command is routed to backend and the backend haven't responded to it yet. If cursor is active, next it is checked whether the current session command property has been responded by other backend. If both are true, then response message is sent to client as is. If session command response is routed to client already, the arrived response is discarded.
This commit is contained in:
VilhoRaatikka 2014-03-16 19:43:49 +02:00
parent a3f7eebdc9
commit 90f701be8e
6 changed files with 182 additions and 211 deletions

View File

@ -768,6 +768,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
saved_errno = errno;
errno = 0;
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
{
if (saved_errno == EPIPE) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -780,7 +782,12 @@ dcb_write(DCB *dcb, GWBUF *queue)
dcb->fd,
saved_errno,
strerror(saved_errno))));
} else if (saved_errno != EAGAIN &&
}
}
if (LOG_IS_ENABLED(LOGFILE_ERROR))
{
if (saved_errno != EPIPE &&
saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
@ -794,6 +801,7 @@ dcb_write(DCB *dcb, GWBUF *queue)
saved_errno,
strerror(saved_errno))));
}
}
break;
}
/*

View File

@ -296,3 +296,4 @@ char *gw_strend(register const char *s);
int setnonblocking(int fd);
void setipaddress(struct in_addr *a, char *p);
int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b);
GWBUF* gw_MySQL_get_next_stmt(GWBUF** p_readbuf);

View File

@ -567,9 +567,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
/*<
* Now we set the last command received, from the current queue
*/
// memcpy(&dcb->command, &queue->command, sizeof(dcb->command));
spinlock_release(&dcb->authlock);
// LOGIF(LD, debuglog_statements(dcb, gwbuf_clone(queue)));
rc = dcb_write(dcb, queue);
return rc;
}
@ -847,7 +845,6 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
unsigned int auth_token_len = 0;
uint8_t *auth_token = NULL;
int rv = -1;
int len = 0;
int auth_ret = 1;
current_session = (MYSQL_session *)in_session->client->data;
@ -919,7 +916,6 @@ static int gw_session(DCB *backend_dcb, void *data) {
GWBUF *queue = NULL;
queue = (GWBUF *) data;
// queue->command = ROUTER_CHANGE_SESSION;
backend_dcb->func.write(backend_dcb, queue);
return 1;

View File

@ -57,7 +57,6 @@ static int route_by_statement(
ROUTER_OBJECT* router,
void* rsession,
GWBUF* read_buf);
static GWBUF* gw_MySQL_get_next_stmt(GWBUF** buffer);
/*
* The "module object" for the mysqld client protocol module.
@ -1248,51 +1247,6 @@ return_rc:
return rc;
}
/**
* Remove the first mysql statement from buffer. Return pointer to the removed
* statement or NULL if buffer is empty.
*
* Clone buf, calculate the length of included mysql stmt, and point the
* statement with cloned buffer. Move the start pointer of buf accordingly
* so that it only cover the remaining buffer.
*
*/
static GWBUF* gw_MySQL_get_next_stmt(
GWBUF** p_readbuf)
{
GWBUF* stmtbuf;
size_t buflen;
size_t strlen;
uint8_t* packet;
if (*p_readbuf == NULL)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
CHK_GWBUF(*p_readbuf);
if (GWBUF_EMPTY(*p_readbuf))
{
stmtbuf = NULL;
goto return_stmtbuf;
}
buflen = GWBUF_LENGTH((*p_readbuf));
packet = GWBUF_DATA((*p_readbuf));
strlen = MYSQL_GET_PACKET_LEN(packet);
/** vraa :Multi-packet stmt is not supported as of 7.3.14 */
if (strlen-1 > buflen-5)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4);
*p_readbuf = gwbuf_consume(*p_readbuf, strlen+4);
return_stmtbuf:
return stmtbuf;
}
/**
* Detect if buffer includes partial mysql packet or multiple packets.
@ -1318,13 +1272,11 @@ static int route_by_statement(
CHK_GWBUF(stmtbuf);
payload = (uint8_t *)GWBUF_DATA(stmtbuf);
len += MYSQL_GET_PACKET_LEN(payload);
/**
* If message is longer than read data, suspend routing and
* add statement buffer to wait queue.
*/
rc = router->routeQuery(router_instance, rsession, stmtbuf);
len = 0; /*< if routed, reset the length indicator */
}
while (readbuf != NULL);

View File

@ -1209,3 +1209,57 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const
return sizeof(mysql_packet_header) + mysql_payload_size;
}
/**
* Remove the first mysql statement from buffer. Return pointer to the removed
* statement or NULL if buffer is empty.
*
* Clone buf, calculate the length of included mysql stmt, and point the
* statement with cloned buffer. Move the start pointer of buf accordingly
* so that it only cover the remaining buffer.
*
*/
GWBUF* gw_MySQL_get_next_stmt(
GWBUF** p_readbuf)
{
GWBUF* stmtbuf;
size_t buflen;
size_t strlen;
uint8_t* packet;
if (*p_readbuf == NULL)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
CHK_GWBUF(*p_readbuf);
if (GWBUF_EMPTY(*p_readbuf))
{
stmtbuf = NULL;
goto return_stmtbuf;
}
buflen = GWBUF_LENGTH((*p_readbuf));
packet = GWBUF_DATA((*p_readbuf));
strlen = MYSQL_GET_PACKET_LEN(packet);
if (strlen+4 == buflen)
{
stmtbuf = *p_readbuf;
*p_readbuf = NULL;
goto return_stmtbuf;
}
/** vraa :Multi-packet stmt is not supported as of 7.3.14 */
if (strlen-1 > buflen-5)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4);
*p_readbuf = gwbuf_consume(*p_readbuf, strlen+4);
return_stmtbuf:
return stmtbuf;
}

View File

@ -677,43 +677,6 @@ static int routeQuery(
switch (qtype) {
case QUERY_TYPE_WRITE:
#if 0
/**
* Running this block cause deadlock because read mutex is
* on hold. This doesn't serialize subsequent session commands
* and queries if there are multiple session commands and other
* backend starts to lag behind. vraa : 14.3.13
*/
/**
* Wait until master has executed all its session commands.
* TODO: if master fails it needs to be detected in the loop.
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
while (sescmd_cursor_is_active(rses_get_sescmd_cursor(
router_cli_ses,
BE_MASTER)))
{
rses_end_locked_router_action(router_cli_ses);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [routeQuery:rwsplit] Session command is "
"active in MASTER. Waiting in loop.",
pthread_self())));
usleep(10);
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
}
rses_end_locked_router_action(router_cli_ses);
#endif
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
@ -733,53 +696,24 @@ static int routeQuery(
break;
case QUERY_TYPE_READ:
LOGIF(LT, (skygw_log_write(
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Slave.",
pthread_self(),
STRQTYPE(qtype))));
while (true)
{
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
/**
* If session command is being executed in slave
* route to master.
*/
if (!sescmd_cursor_is_active(rses_get_sescmd_cursor(
router_cli_ses,
BE_SLAVE)))
{
rses_end_locked_router_action(router_cli_ses);
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
slave_dcb,
gwbuf_clone(querybuf)));
ret = slave_dcb->func.write(slave_dcb, querybuf);
atomic_add(&inst->stats.n_slave, 1);
break;
}
else if (!sescmd_cursor_is_active(rses_get_sescmd_cursor(
router_cli_ses,
BE_MASTER)))
{
rses_end_locked_router_action(router_cli_ses);
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = slave_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
break;
}
rses_end_locked_router_action(router_cli_ses);
} /*< while (true) */
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
slave_dcb,
gwbuf_clone(querybuf)));
ret = slave_dcb->func.write(slave_dcb, querybuf);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Routed.",
pthread_self())));
atomic_add(&inst->stats.n_slave, 1);
goto return_ret;
break;
@ -807,7 +741,26 @@ static int routeQuery(
slave_dcb,
STRQTYPE(qtype),
STRPACKETTYPE(packet_type))));
/**
* COM_QUIT is one-way message. Server doesn't respond to that.
* Therefore reply processing is unnecessary and session
* command property is not needed. It is just routed to both
* backends.
*/
if (packet_type == COM_QUIT)
{
int rc;
int rc2;
rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf));
rc2 = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf));
if (rc == 1 && rc == rc2)
{
ret = 1;
}
goto return_ret;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
@ -1063,6 +1016,11 @@ static void clientReply(
{
be_type = BE_SLAVE;
}
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"reply_by_statement",
backend_dcb,
gwbuf_clone(writebuf)));
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
@ -1076,27 +1034,15 @@ static void clientReply(
*/
if (sescmd_cursor_is_active(scur))
{
mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur);
sescmd_reply_to_client(client_dcb, scmd, writebuf);
/** Read next sescmd property */
while (sescmd_cursor_next(scur))
{
if (!cont_exec_sescmd_in_backend(router_cli_ses, be_type))
{
/** Log error */
}
else
{
/** Log execution of pending sescmd */
}
}
/** Set cursor passive. */
sescmd_cursor_set_active(scur, false);
ss_dassert(!sescmd_cursor_is_active(scur));
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur);
sescmd_reply_to_client(client_dcb, scmd, writebuf);
/** When sescmd list is empty set cursor passive. */
if (!sescmd_cursor_next(scur))
{
sescmd_cursor_set_active(scur, false);
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
}
else if (client_dcb != NULL)
{
@ -1411,7 +1357,6 @@ static mysql_sescmd_t* mysql_sescmd_init (
/** Can't call rses_property_get_sescmd with uninitialized sescmd */
sescmd = &rses_prop->rses_prop_data.sescmd;
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
// sescmd->my_sescmd_rsession = rses;
#if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
@ -1452,21 +1397,43 @@ static bool sescmd_reply_to_client(
CHK_GWBUF(writebuf);
ss_dassert(SPINLOCK_IS_LOCKED(
&scmd->my_sescmd_prop->rses_prop_rsession->rses_lock));
if (!scmd->my_sescmd_is_replied)
{
client_dcb->func.write(client_dcb, writebuf);
scmd->my_sescmd_is_replied = true;
succp = true;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"%lu [sescmd_reply_to_client] Replied to client dcb %p.",
pthread_self(),
client_dcb)));
}
else
/**
* This depends on MySQL protoocl and doesn't work with others.
* TODO: write a function to MySQL protocol module and add
* a new protocol function 'discard n first messages'.
*/
if (scmd->my_sescmd_is_replied)
{
size_t len;
uint8_t* packet;
/**
* Skip reply message because it is duplicate of alredy
* replied message.
*/
packet = (uint8_t*)writebuf->start;
len = packet[0];
len += packet[1]*256;
len += packet[2]*256*256;
writebuf = gwbuf_consume(writebuf, len+4);
if (writebuf == NULL || GWBUF_EMPTY(writebuf))
{
succp = true;
goto return_succp;
}
}
client_dcb->func.write(client_dcb, writebuf);
scmd->my_sescmd_is_replied = true;
succp = true;
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_reply_to_client] Replied cmd %p to client dcb %p.",
pthread_self(),
scmd,
client_dcb)));
return_succp:
return succp;
}
@ -1575,52 +1542,44 @@ static bool execute_sescmd_in_backend(
succp = false;
goto return_succp;
}
if (!sescmd_cursor_is_active(scur))
{
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
LOGIF(LT, tracelog_routed_query(rses,
"execute_sescmd_in_backend",
dcb,
sescmd_cursor_clone_querybuf(scur)));
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case COM_CHANGE_USER:
rc = dcb->func.auth(
dcb,
NULL,
dcb->session,
sescmd_cursor_clone_querybuf(scur));
break;
case COM_QUIT:
case COM_QUERY:
case COM_INIT_DB:
default:
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
break;
}
if (rc != 1)
{
succp = false;
}
}
else
if (!sescmd_cursor_is_active(scur))
{
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [routeQuery] Couldn't directly send SESSION "
"WRITER command to dcb %p because session command "
"cursor was executing previous command. Added "
"command to the queue.",
pthread_self(),
dcb)));
/** Cursor is left active when function returns. */
sescmd_cursor_set_active(scur, true);
}
LOGIF(LT, tracelog_routed_query(rses,
"execute_sescmd_in_backend",
dcb,
sescmd_cursor_clone_querybuf(scur)));
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case COM_CHANGE_USER:
rc = dcb->func.auth(
dcb,
NULL,
dcb->session,
sescmd_cursor_clone_querybuf(scur));
break;
case COM_QUIT:
case COM_QUERY:
case COM_INIT_DB:
default:
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
break;
}
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [execute_sescmd_in_backend] Routed %s cmd %p.",
pthread_self(),
STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type),
scur->scmd_cur_cmd)));
if (rc != 1)
{
succp = false;
}
return_succp:
return succp;
}
@ -1664,7 +1623,8 @@ static bool cont_exec_sescmd_in_backend(
sescmd_cursor_clone_querybuf(scur)));
rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur));
if (rc != 1)
if (rc != 1)
{
succp = false;
}