Complete fix candidate for #645, http://bugs.skysql.com/show_bug.cgi?id=645 and #648, http://bugs.skysql.com/show_bug.cgi?id=648
tee.c:closeSession removed unnecessary dcb_free, router/service closes all backend DCBs and the client DCB, and client DCB is the one that was tried to free in closeSession. readwritesplit.c:routeQuery now handles untyped and typed GWBUFs. Untyped means that read buffer may consist of incomplete and multiple MySQL packets. Typed buffer always consists of a single MySQL packet (which can be split to many buffers inside GWBUF). Fixed Coverity cases #84840 and #84841
This commit is contained in:
@ -190,7 +190,7 @@ DCB *rval;
|
|||||||
rval->readcheck = 0;
|
rval->readcheck = 0;
|
||||||
rval->polloutbusy = 0;
|
rval->polloutbusy = 0;
|
||||||
rval->writecheck = 0;
|
rval->writecheck = 0;
|
||||||
rval->fd = -1;
|
rval->fd = DCBFD_CLOSED;
|
||||||
|
|
||||||
rval->evq.next = NULL;
|
rval->evq.next = NULL;
|
||||||
rval->evq.prev = NULL;
|
rval->evq.prev = NULL;
|
||||||
@ -235,8 +235,10 @@ DCB *rval;
|
|||||||
void
|
void
|
||||||
dcb_free(DCB *dcb)
|
dcb_free(DCB *dcb)
|
||||||
{
|
{
|
||||||
if (dcb->fd == -1)
|
if (dcb->fd == DCBFD_CLOSED)
|
||||||
|
{
|
||||||
dcb_final_free(dcb);
|
dcb_final_free(dcb);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
@ -308,7 +310,7 @@ DCB *clone;
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
clone->fd = -1;
|
clone->fd = DCBFD_CLONED;;
|
||||||
clone->flags |= DCBF_CLONE;
|
clone->flags |= DCBF_CLONE;
|
||||||
clone->state = orig->state;
|
clone->state = orig->state;
|
||||||
clone->data = orig->data;
|
clone->data = orig->data;
|
||||||
@ -551,40 +553,42 @@ bool succp = false;
|
|||||||
DCB* dcb_next = NULL;
|
DCB* dcb_next = NULL;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
|
||||||
/*<
|
if (dcb->fd > 0)
|
||||||
* Close file descriptor and move to clean-up phase.
|
{
|
||||||
*/
|
/*<
|
||||||
rc = close(dcb->fd);
|
* Close file descriptor and move to clean-up phase.
|
||||||
|
*/
|
||||||
|
rc = close(dcb->fd);
|
||||||
|
|
||||||
if (rc < 0) {
|
if (rc < 0)
|
||||||
int eno = errno;
|
{
|
||||||
errno = 0;
|
int eno = errno;
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
errno = 0;
|
||||||
LOGFILE_ERROR,
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
"Error : Failed to close "
|
LOGFILE_ERROR,
|
||||||
"socket %d on dcb %p due error %d, %s.",
|
"Error : Failed to close "
|
||||||
dcb->fd,
|
"socket %d on dcb %p due error %d, %s.",
|
||||||
dcb,
|
dcb->fd,
|
||||||
eno,
|
dcb,
|
||||||
strerror(eno))));
|
eno,
|
||||||
}
|
strerror(eno))));
|
||||||
#if defined(SS_DEBUG)
|
}
|
||||||
else {
|
else
|
||||||
LOGIF(LD, (skygw_log_write_flush(
|
{
|
||||||
LOGFILE_DEBUG,
|
dcb->fd = DCBFD_CLOSED;
|
||||||
"%lu [dcb_process_zombies] Closed socket "
|
|
||||||
"%d on dcb %p.",
|
LOGIF(LD, (skygw_log_write_flush(
|
||||||
pthread_self(),
|
LOGFILE_DEBUG,
|
||||||
dcb->fd,
|
"%lu [dcb_process_zombies] Closed socket "
|
||||||
dcb)));
|
"%d on dcb %p.",
|
||||||
#endif /* SS_DEBUG */
|
pthread_self(),
|
||||||
|
dcb->fd,
|
||||||
|
dcb)));
|
||||||
#if defined(FAKE_CODE)
|
#if defined(FAKE_CODE)
|
||||||
conn_open[dcb->fd] = false;
|
conn_open[dcb->fd] = false;
|
||||||
#endif /* FAKE_CODE */
|
#endif /* FAKE_CODE */
|
||||||
#if defined(SS_DEBUG)
|
}
|
||||||
ss_debug(dcb->fd = -1;)
|
}
|
||||||
}
|
|
||||||
#endif /* SS_DEBUG */
|
|
||||||
LOGIF_MAYBE(LT, (dcb_get_ses_log_info(
|
LOGIF_MAYBE(LT, (dcb_get_ses_log_info(
|
||||||
dcb,
|
dcb,
|
||||||
&tls_log_info.li_sesid,
|
&tls_log_info.li_sesid,
|
||||||
@ -657,7 +661,7 @@ int rc;
|
|||||||
}
|
}
|
||||||
fd = dcb->func.connect(dcb, server, session);
|
fd = dcb->func.connect(dcb, server, session);
|
||||||
|
|
||||||
if (fd == -1) {
|
if (fd == DCBFD_CLOSED) {
|
||||||
LOGIF(LD, (skygw_log_write(
|
LOGIF(LD, (skygw_log_write(
|
||||||
LOGFILE_DEBUG,
|
LOGFILE_DEBUG,
|
||||||
"%lu [dcb_connect] Failed to connect to server %s:%d, "
|
"%lu [dcb_connect] Failed to connect to server %s:%d, "
|
||||||
@ -683,7 +687,7 @@ int rc;
|
|||||||
session->client,
|
session->client,
|
||||||
session->client->fd)));
|
session->client->fd)));
|
||||||
}
|
}
|
||||||
ss_dassert(dcb->fd == -1); /*< must be uninitialized at this point */
|
ss_dassert(dcb->fd == DCBFD_CLOSED); /*< must be uninitialized at this point */
|
||||||
/*<
|
/*<
|
||||||
* Successfully connected to backend. Assign file descriptor to dcb
|
* Successfully connected to backend. Assign file descriptor to dcb
|
||||||
*/
|
*/
|
||||||
@ -704,7 +708,7 @@ int rc;
|
|||||||
*/
|
*/
|
||||||
rc = poll_add_dcb(dcb);
|
rc = poll_add_dcb(dcb);
|
||||||
|
|
||||||
if (rc == -1) {
|
if (rc == DCBFD_CLOSED) {
|
||||||
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
|
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
|
||||||
dcb_final_free(dcb);
|
dcb_final_free(dcb);
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -736,11 +740,22 @@ int dcb_read(
|
|||||||
GWBUF *buffer = NULL;
|
GWBUF *buffer = NULL;
|
||||||
int b;
|
int b;
|
||||||
int rc;
|
int rc;
|
||||||
int n ;
|
int n;
|
||||||
int nread = 0;
|
int nread = 0;
|
||||||
|
|
||||||
CHK_DCB(dcb);
|
CHK_DCB(dcb);
|
||||||
while (true)
|
|
||||||
|
if (dcb->fd <= 0)
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Read failed, dcb is %s.",
|
||||||
|
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable")));
|
||||||
|
n = 0;
|
||||||
|
goto return_n;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true)
|
||||||
{
|
{
|
||||||
int bufsize;
|
int bufsize;
|
||||||
|
|
||||||
@ -864,6 +879,14 @@ int below_water;
|
|||||||
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
|
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
|
||||||
ss_dassert(queue != NULL);
|
ss_dassert(queue != NULL);
|
||||||
|
|
||||||
|
if (dcb->fd <= 0)
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Write failed, dcb is %s.",
|
||||||
|
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not writable")));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* SESSION_STATE_STOPPING means that one of the backends is closing
|
* SESSION_STATE_STOPPING means that one of the backends is closing
|
||||||
* the router session. Some backends may have not completed
|
* the router session. Some backends may have not completed
|
||||||
@ -1209,46 +1232,42 @@ dcb_close(DCB *dcb)
|
|||||||
*/
|
*/
|
||||||
if (dcb->state == DCB_STATE_POLLING)
|
if (dcb->state == DCB_STATE_POLLING)
|
||||||
{
|
{
|
||||||
if (dcb->fd != -1)
|
rc = poll_remove_dcb(dcb);
|
||||||
{
|
|
||||||
rc = poll_remove_dcb(dcb);
|
|
||||||
|
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
LOGIF(LD, (skygw_log_write(
|
LOGIF(LD, (skygw_log_write(
|
||||||
LOGFILE_DEBUG,
|
LOGFILE_DEBUG,
|
||||||
"%lu [dcb_close] Removed dcb %p in state %s from "
|
"%lu [dcb_close] Removed dcb %p in state %s from "
|
||||||
"poll set.",
|
"poll set.",
|
||||||
pthread_self(),
|
pthread_self(),
|
||||||
dcb,
|
dcb,
|
||||||
STRDCBSTATE(dcb->state))));
|
STRDCBSTATE(dcb->state))));
|
||||||
} else {
|
} else {
|
||||||
LOGIF(LE, (skygw_log_write(
|
LOGIF(LE, (skygw_log_write(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error : Removing DCB fd == %d in state %s from "
|
"Error : Removing DCB fd == %d in state %s from "
|
||||||
"poll set failed.",
|
"poll set failed.",
|
||||||
dcb->fd,
|
dcb->fd,
|
||||||
STRDCBSTATE(dcb->state))));
|
STRDCBSTATE(dcb->state))));
|
||||||
}
|
|
||||||
|
|
||||||
if (rc == 0)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* close protocol and router session
|
|
||||||
*/
|
|
||||||
if (dcb->func.close != NULL)
|
|
||||||
{
|
|
||||||
dcb->func.close(dcb);
|
|
||||||
}
|
|
||||||
dcb_call_callback(dcb, DCB_REASON_CLOSE);
|
|
||||||
|
|
||||||
|
|
||||||
if (dcb->state == DCB_STATE_NOPOLLING)
|
|
||||||
{
|
|
||||||
dcb_add_to_zombieslist(dcb);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (rc == 0)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* close protocol and router session
|
||||||
|
*/
|
||||||
|
if (dcb->func.close != NULL)
|
||||||
|
{
|
||||||
|
dcb->func.close(dcb);
|
||||||
|
}
|
||||||
|
dcb_call_callback(dcb, DCB_REASON_CLOSE);
|
||||||
|
|
||||||
|
|
||||||
|
if (dcb->state == DCB_STATE_NOPOLLING)
|
||||||
|
{
|
||||||
|
dcb_add_to_zombieslist(dcb);
|
||||||
|
}
|
||||||
|
}
|
||||||
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
|
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
|
||||||
dcb->state == DCB_STATE_ZOMBIE);
|
dcb->state == DCB_STATE_ZOMBIE);
|
||||||
}
|
}
|
||||||
@ -1764,7 +1783,8 @@ gw_write(DCB *dcb, const void *buf, size_t nbytes)
|
|||||||
int w;
|
int w;
|
||||||
int fd = dcb->fd;
|
int fd = dcb->fd;
|
||||||
#if defined(FAKE_CODE)
|
#if defined(FAKE_CODE)
|
||||||
if (dcb_fake_write_errno[fd] != 0) {
|
if (fd > 0 && dcb_fake_write_errno[fd] != 0)
|
||||||
|
{
|
||||||
ss_dassert(dcb_fake_write_ev[fd] != 0);
|
ss_dassert(dcb_fake_write_ev[fd] != 0);
|
||||||
w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */
|
w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */
|
||||||
|
|
||||||
@ -1772,11 +1792,15 @@ gw_write(DCB *dcb, const void *buf, size_t nbytes)
|
|||||||
w = -1;
|
w = -1;
|
||||||
errno = dcb_fake_write_errno[fd];
|
errno = dcb_fake_write_errno[fd];
|
||||||
}
|
}
|
||||||
} else {
|
} else if (fd > 0)
|
||||||
|
{
|
||||||
w = write(fd, buf, nbytes);
|
w = write(fd, buf, nbytes);
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
w = write(fd, buf, nbytes);
|
if (fd > 0)
|
||||||
|
{
|
||||||
|
w = write(fd, buf, nbytes);
|
||||||
|
}
|
||||||
#endif /* FAKE_CODE */
|
#endif /* FAKE_CODE */
|
||||||
|
|
||||||
#if defined(SS_DEBUG_MYSQL)
|
#if defined(SS_DEBUG_MYSQL)
|
||||||
|
|||||||
@ -341,19 +341,26 @@ poll_remove_dcb(DCB *dcb)
|
|||||||
/*<
|
/*<
|
||||||
* Set state to NOPOLLING and remove dcb from poll set.
|
* Set state to NOPOLLING and remove dcb from poll set.
|
||||||
*/
|
*/
|
||||||
if (dcb_set_state(dcb, new_state, &old_state)) {
|
if (dcb_set_state(dcb, new_state, &old_state))
|
||||||
rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
|
{
|
||||||
|
/**
|
||||||
|
* Only positive fds can be removed from epoll set.
|
||||||
|
*/
|
||||||
|
if (dcb->fd > 0)
|
||||||
|
{
|
||||||
|
rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
|
||||||
|
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
int eno = errno;
|
int eno = errno;
|
||||||
errno = 0;
|
errno = 0;
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"Error : epoll_ctl failed due %d, %s.",
|
"Error : epoll_ctl failed due %d, %s.",
|
||||||
eno,
|
eno,
|
||||||
strerror(eno))));
|
strerror(eno))));
|
||||||
}
|
}
|
||||||
ss_dassert(rc == 0); /*< trap in debug */
|
ss_dassert(rc == 0); /*< trap in debug */
|
||||||
|
}
|
||||||
}
|
}
|
||||||
/*<
|
/*<
|
||||||
* This call was redundant, but the end result is correct.
|
* This call was redundant, but the end result is correct.
|
||||||
|
|||||||
@ -129,6 +129,9 @@ typedef struct {
|
|||||||
*/
|
*/
|
||||||
#define GWPROTOCOL_VERSION {1, 0, 0}
|
#define GWPROTOCOL_VERSION {1, 0, 0}
|
||||||
|
|
||||||
|
#define DCBFD_CLOSED -1
|
||||||
|
#define DCBFD_CLONED -2
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The statitics gathered on a descriptor control block
|
* The statitics gathered on a descriptor control block
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -372,11 +372,11 @@ SESSION *bsession;
|
|||||||
/** Close router session and all its connections */
|
/** Close router session and all its connections */
|
||||||
router->closeSession(router_instance, rsession);
|
router->closeSession(router_instance, rsession);
|
||||||
}
|
}
|
||||||
dcb_free(my_session->branch_dcb);
|
|
||||||
/* No need to free the session, this is done as
|
/* No need to free the session, this is done as
|
||||||
* a side effect of closing the client DCB of the
|
* a side effect of closing the client DCB of the
|
||||||
* session.
|
* session.
|
||||||
*/
|
*/
|
||||||
|
my_session->active = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -323,4 +323,8 @@ typedef struct router_instance {
|
|||||||
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
|
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
|
||||||
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
|
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
|
||||||
|
|
||||||
|
#define RSES_SESSION(r) (r->rses_backend_ref->bref_dcb->session)
|
||||||
|
|
||||||
|
#define RSES_CLIENT_DCB(r) (RSES_SESSION(r)->client)
|
||||||
|
|
||||||
#endif /*< _RWSPLITROUTER_H */
|
#endif /*< _RWSPLITROUTER_H */
|
||||||
|
|||||||
@ -1807,502 +1807,108 @@ static int routeQuery(
|
|||||||
void* router_session,
|
void* router_session,
|
||||||
GWBUF* querybuf)
|
GWBUF* querybuf)
|
||||||
{
|
{
|
||||||
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
|
||||||
mysql_server_cmd_t packet_type;
|
|
||||||
uint8_t* packet;
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
DCB* master_dcb = NULL;
|
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||||
DCB* target_dcb = NULL;
|
|
||||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
|
||||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||||
bool rses_is_closed = false;
|
|
||||||
route_target_t route_target;
|
|
||||||
bool succp = false;
|
bool succp = false;
|
||||||
int rlag_max = MAX_RLAG_UNDEFINED;
|
|
||||||
backend_type_t btype; /*< target backend type */
|
|
||||||
|
|
||||||
CHK_CLIENT_RSES(router_cli_ses);
|
CHK_CLIENT_RSES(router_cli_ses);
|
||||||
|
|
||||||
/** Dirty read for quick check if router is closed. */
|
/**
|
||||||
if (router_cli_ses->rses_closed)
|
* Untyped GWBUF means that it can consist of incomplete and/or multiple
|
||||||
{
|
* MySQL packets.
|
||||||
rses_is_closed = true;
|
* Read and route found MySQL packets one by one and store potential
|
||||||
}
|
* incomplete packet to DCB's dcb_readqueue.
|
||||||
#if 1
|
*/
|
||||||
if (GWBUF_IS_TYPE_UNDEFINED(querybuf))
|
if (GWBUF_IS_TYPE_UNDEFINED(querybuf))
|
||||||
{
|
{
|
||||||
GWBUF* tmpbuf = querybuf;
|
GWBUF* tmpbuf = querybuf;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Try to read complete MySQL packet from tmpbuf.
|
||||||
|
* Append leftover to client's read queue.
|
||||||
|
*/
|
||||||
if ((querybuf = modutil_get_next_MySQL_packet(&tmpbuf)) == NULL)
|
if ((querybuf = modutil_get_next_MySQL_packet(&tmpbuf)) == NULL)
|
||||||
{
|
{
|
||||||
ret = 1;
|
if (GWBUF_LENGTH(tmpbuf) > 0)
|
||||||
|
{
|
||||||
|
DCB* dcb = RSES_CLIENT_DCB(router_cli_ses);
|
||||||
|
|
||||||
|
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, tmpbuf);
|
||||||
|
}
|
||||||
|
succp = true;
|
||||||
goto retblock;
|
goto retblock;
|
||||||
}
|
}
|
||||||
/** Mark buffer to as MySQL type */
|
/** Mark buffer to as MySQL type */
|
||||||
gwbuf_set_type(querybuf, GWBUF_TYPE_MYSQL);
|
gwbuf_set_type(querybuf, GWBUF_TYPE_MYSQL);
|
||||||
gwbuf_set_type(querybuf, GWBUF_TYPE_SINGLE_STMT);
|
gwbuf_set_type(querybuf, GWBUF_TYPE_SINGLE_STMT);
|
||||||
succp = route_single_stmt(inst, router_cli_ses, querybuf);
|
|
||||||
|
/**
|
||||||
|
* If router is closed, discard the packet
|
||||||
|
*/
|
||||||
|
if (router_cli_ses->rses_closed)
|
||||||
|
{
|
||||||
|
uint8_t* packet;
|
||||||
|
mysql_server_cmd_t packet_type;
|
||||||
|
|
||||||
|
packet = GWBUF_DATA(querybuf);
|
||||||
|
packet_type = packet[4];
|
||||||
|
|
||||||
|
if (packet_type != MYSQL_COM_QUIT)
|
||||||
|
{
|
||||||
|
char* query_str = modutil_get_query(querybuf);
|
||||||
|
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error: Can't route %s:\"%s\" to "
|
||||||
|
"backend server. Router is closed.",
|
||||||
|
STRPACKETTYPE(packet_type),
|
||||||
|
(query_str == NULL ? "(empty)" : query_str))));
|
||||||
|
free(query_str);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
succp = route_single_stmt(inst, router_cli_ses, querybuf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (tmpbuf != NULL);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* If router is closed, discard the packet
|
||||||
|
*/
|
||||||
|
else if (router_cli_ses->rses_closed)
|
||||||
|
{
|
||||||
|
uint8_t* packet;
|
||||||
|
mysql_server_cmd_t packet_type;
|
||||||
|
|
||||||
|
packet = GWBUF_DATA(querybuf);
|
||||||
|
packet_type = packet[4];
|
||||||
|
|
||||||
|
if (packet_type != MYSQL_COM_QUIT)
|
||||||
|
{
|
||||||
|
char* query_str = modutil_get_query(querybuf);
|
||||||
|
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error: Can't route %s:\"%s\" to "
|
||||||
|
"backend server. Router is closed.",
|
||||||
|
STRPACKETTYPE(packet_type),
|
||||||
|
(query_str == NULL ? "(empty)" : query_str))));
|
||||||
|
free(query_str);
|
||||||
}
|
}
|
||||||
while (tmpbuf != NULL && succp);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
succp = route_single_stmt(inst, router_cli_ses, querybuf);
|
succp = route_single_stmt(inst, router_cli_ses, querybuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (succp)
|
|
||||||
{
|
|
||||||
ret = 1;
|
|
||||||
}
|
|
||||||
goto retblock;
|
|
||||||
#else
|
|
||||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
|
||||||
|
|
||||||
packet = GWBUF_DATA(querybuf);
|
|
||||||
packet_type = packet[4];
|
|
||||||
|
|
||||||
if (rses_is_closed)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* MYSQL_COM_QUIT may have sent by client and as a part of backend
|
|
||||||
* closing procedure.
|
|
||||||
*/
|
|
||||||
if (packet_type != MYSQL_COM_QUIT)
|
|
||||||
{
|
|
||||||
char* query_str = modutil_get_query(querybuf);
|
|
||||||
|
|
||||||
LOGIF(LE,
|
|
||||||
(skygw_log_write_flush(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"Error: Can't route %s:%s:\"%s\" to "
|
|
||||||
"backend server. Router is closed.",
|
|
||||||
STRPACKETTYPE(packet_type),
|
|
||||||
STRQTYPE(qtype),
|
|
||||||
(query_str == NULL ? "(empty)" : query_str))));
|
|
||||||
free(query_str);
|
|
||||||
}
|
|
||||||
ret = 0;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Read stored master DCB pointer. If master is not set, routing must
|
|
||||||
* be aborted
|
|
||||||
*/
|
|
||||||
if ((master_dcb = router_cli_ses->rses_master_ref->bref_dcb) == NULL)
|
|
||||||
{
|
|
||||||
char* query_str = modutil_get_query(querybuf);
|
|
||||||
CHK_DCB(master_dcb);
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"Error: Can't route %s:%s:\"%s\" to "
|
|
||||||
"backend server. Session doesn't have a Master "
|
|
||||||
"node",
|
|
||||||
STRPACKETTYPE(packet_type),
|
|
||||||
STRQTYPE(qtype),
|
|
||||||
(query_str == NULL ? "(empty)" : query_str))));
|
|
||||||
free(query_str);
|
|
||||||
ret = 0;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** If buffer is not contiguous, make it such */
|
|
||||||
if (querybuf->next != NULL)
|
|
||||||
{
|
|
||||||
querybuf = gwbuf_make_contiguous(querybuf);
|
|
||||||
}
|
|
||||||
|
|
||||||
switch(packet_type) {
|
|
||||||
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
|
||||||
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
|
||||||
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
|
||||||
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
|
||||||
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
|
||||||
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
|
||||||
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
|
|
||||||
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
|
||||||
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
|
||||||
qtype = QUERY_TYPE_SESSION_WRITE;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
|
||||||
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
|
|
||||||
qtype = QUERY_TYPE_WRITE;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MYSQL_COM_QUERY:
|
|
||||||
qtype = query_classifier_get_type(querybuf);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MYSQL_COM_STMT_PREPARE:
|
|
||||||
qtype = query_classifier_get_type(querybuf);
|
|
||||||
qtype |= QUERY_TYPE_PREPARE_STMT;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MYSQL_COM_STMT_EXECUTE:
|
|
||||||
/** Parsing is not needed for this type of packet */
|
|
||||||
qtype = QUERY_TYPE_EXEC_STMT;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
|
||||||
case MYSQL_COM_STATISTICS: /**< 9 ? */
|
|
||||||
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
|
|
||||||
case MYSQL_COM_CONNECT: /**< 0b ? */
|
|
||||||
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
|
|
||||||
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
|
|
||||||
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
|
|
||||||
case MYSQL_COM_DAEMON: /**< 1d ? */
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
} /**< switch by packet type */
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if the query has anything to do with temporary tables.
|
|
||||||
*/
|
|
||||||
qtype = is_read_tmp_table(inst,router_cli_ses,querybuf,qtype);
|
|
||||||
check_create_tmp_table(inst,router_cli_ses,querybuf,qtype);
|
|
||||||
check_drop_tmp_table(inst,router_cli_ses,querybuf,qtype);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If autocommit is disabled or transaction is explicitly started
|
|
||||||
* transaction becomes active and master gets all statements until
|
|
||||||
* transaction is committed and autocommit is enabled again.
|
|
||||||
*/
|
|
||||||
if (router_cli_ses->rses_autocommit_enabled &&
|
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_autocommit_enabled = false;
|
|
||||||
|
|
||||||
if (!router_cli_ses->rses_transaction_active)
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_transaction_active = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (!router_cli_ses->rses_transaction_active &&
|
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_transaction_active = true;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
|
|
||||||
*/
|
|
||||||
if (router_cli_ses->rses_autocommit_enabled &&
|
|
||||||
router_cli_ses->rses_transaction_active &&
|
|
||||||
(QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) ||
|
|
||||||
QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK)))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_transaction_active = false;
|
|
||||||
}
|
|
||||||
else if (!router_cli_ses->rses_autocommit_enabled &&
|
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_autocommit_enabled = true;
|
|
||||||
router_cli_ses->rses_transaction_active = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
|
||||||
{
|
|
||||||
uint8_t* packet = GWBUF_DATA(querybuf);
|
|
||||||
unsigned char ptype = packet[4];
|
|
||||||
size_t len = MIN(GWBUF_LENGTH(querybuf),
|
|
||||||
MYSQL_GET_PACKET_LEN((unsigned char *)querybuf->start)-1);
|
|
||||||
char* data = (char*)&packet[5];
|
|
||||||
char* contentstr = strndup(data, len);
|
|
||||||
char* qtypestr = skygw_get_qtype_str(qtype);
|
|
||||||
|
|
||||||
skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"> Autocommit: %s, trx is %s, cmd: %s, type: %s, "
|
|
||||||
"stmt: %s%s %s",
|
|
||||||
(router_cli_ses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"),
|
|
||||||
(router_cli_ses->rses_transaction_active ? "[open]" : "[not open]"),
|
|
||||||
STRPACKETTYPE(ptype),
|
|
||||||
(qtypestr==NULL ? "N/A" : qtypestr),
|
|
||||||
contentstr,
|
|
||||||
(querybuf->hint == NULL ? "" : ", Hint:"),
|
|
||||||
(querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type)));
|
|
||||||
|
|
||||||
free(contentstr);
|
|
||||||
free(qtypestr);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Find out where to route the query. Result may not be clear; it is
|
|
||||||
* possible to have a hint for routing to a named server which can
|
|
||||||
* be either slave or master.
|
|
||||||
* If query would otherwise be routed to slave then the hint determines
|
|
||||||
* actual target server if it exists.
|
|
||||||
*
|
|
||||||
* route_target is a bitfield and may include :
|
|
||||||
* TARGET_ALL
|
|
||||||
* - route to all connected backend servers
|
|
||||||
* TARGET_SLAVE[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
|
||||||
* - route primarily according to hints, then to slave and if those
|
|
||||||
* failed, eventually to master
|
|
||||||
* TARGET_MASTER[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
|
||||||
* - route primarily according to the hints and if they failed,
|
|
||||||
* eventually to master
|
|
||||||
*/
|
|
||||||
route_target = get_route_target(qtype,
|
|
||||||
router_cli_ses->rses_transaction_active,
|
|
||||||
router_cli_ses->rses_config.rw_use_sql_variables_in,
|
|
||||||
querybuf->hint);
|
|
||||||
|
|
||||||
if (TARGET_IS_ALL(route_target))
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* It is not sure if the session command in question requires
|
|
||||||
* response. Statement is examined in route_session_write.
|
|
||||||
* Router locking is done inside the function.
|
|
||||||
*/
|
|
||||||
succp = route_session_write(router_cli_ses,
|
|
||||||
gwbuf_clone(querybuf),
|
|
||||||
inst,
|
|
||||||
packet_type,
|
|
||||||
qtype);
|
|
||||||
|
|
||||||
if (succp)
|
|
||||||
{
|
|
||||||
atomic_add(&inst->stats.n_all, 1);
|
|
||||||
ret = 1;
|
|
||||||
}
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Lock router session */
|
|
||||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Route query aborted! Routing session is closed <")));
|
|
||||||
ret = 0;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* There is a hint which either names the target backend or
|
|
||||||
* hint which sets maximum allowed replication lag for the
|
|
||||||
* backend.
|
|
||||||
*/
|
|
||||||
if (TARGET_IS_NAMED_SERVER(route_target) ||
|
|
||||||
TARGET_IS_RLAG_MAX(route_target))
|
|
||||||
{
|
|
||||||
HINT* hint;
|
|
||||||
char* named_server = NULL;
|
|
||||||
|
|
||||||
hint = querybuf->hint;
|
|
||||||
|
|
||||||
while (hint != NULL)
|
|
||||||
{
|
|
||||||
if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Set the name of searched
|
|
||||||
* backend server.
|
|
||||||
*/
|
|
||||||
named_server = hint->data;
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Hint: route to server "
|
|
||||||
"'%s'",
|
|
||||||
named_server)));
|
|
||||||
}
|
|
||||||
else if (hint->type == HINT_PARAMETER &&
|
|
||||||
(strncasecmp((char *)hint->data,
|
|
||||||
"max_slave_replication_lag",
|
|
||||||
strlen("max_slave_replication_lag")) == 0))
|
|
||||||
{
|
|
||||||
int val = (int) strtol((char *)hint->value,
|
|
||||||
(char **)NULL, 10);
|
|
||||||
|
|
||||||
if (val != 0 || errno == 0)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Set max. acceptable
|
|
||||||
* replication lag
|
|
||||||
* value for backend srv
|
|
||||||
*/
|
|
||||||
rlag_max = val;
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Hint: "
|
|
||||||
"max_slave_replication_lag=%d",
|
|
||||||
rlag_max)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hint = hint->next;
|
|
||||||
} /*< while */
|
|
||||||
|
|
||||||
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
|
|
||||||
{
|
|
||||||
rlag_max = rses_get_max_replication_lag(router_cli_ses);
|
|
||||||
}
|
|
||||||
btype = BE_UNDEFINED; /*< target may be master or slave */
|
|
||||||
/**
|
|
||||||
* Search backend server by name or replication lag.
|
|
||||||
* If it fails, then try to find valid slave or master.
|
|
||||||
*/
|
|
||||||
succp = get_dcb(&target_dcb,
|
|
||||||
router_cli_ses,
|
|
||||||
btype,
|
|
||||||
named_server,
|
|
||||||
rlag_max);
|
|
||||||
if (!succp)
|
|
||||||
{
|
|
||||||
if (TARGET_IS_NAMED_SERVER(route_target))
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Was supposed to route to named server "
|
|
||||||
"%s but couldn't find the server in a "
|
|
||||||
"suitable state.",
|
|
||||||
named_server)));
|
|
||||||
}
|
|
||||||
else if (TARGET_IS_RLAG_MAX(route_target))
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Was supposed to route to server with "
|
|
||||||
"replication lag at most %d but couldn't "
|
|
||||||
"find such a slave.",
|
|
||||||
rlag_max)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (TARGET_IS_SLAVE(route_target))
|
|
||||||
{
|
|
||||||
btype = BE_SLAVE;
|
|
||||||
|
|
||||||
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
|
|
||||||
{
|
|
||||||
rlag_max = rses_get_max_replication_lag(router_cli_ses);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Search suitable backend server, get DCB in target_dcb
|
|
||||||
*/
|
|
||||||
succp = get_dcb(&target_dcb,
|
|
||||||
router_cli_ses,
|
|
||||||
BE_SLAVE,
|
|
||||||
NULL,
|
|
||||||
rlag_max);
|
|
||||||
if (succp)
|
|
||||||
{
|
|
||||||
#if defined(SS_EXTRA_DEBUG)
|
|
||||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
|
||||||
"Found DCB for slave.")));
|
|
||||||
#endif
|
|
||||||
ss_dassert(get_bref_from_dcb(router_cli_ses, target_dcb) !=
|
|
||||||
router_cli_ses->rses_master_ref);
|
|
||||||
ss_dassert(get_root_master_bref(router_cli_ses) ==
|
|
||||||
router_cli_ses->rses_master_ref);
|
|
||||||
atomic_add(&inst->stats.n_slave, 1);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
|
||||||
"Was supposed to route to slave"
|
|
||||||
"but finding suitable one "
|
|
||||||
"failed.")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (TARGET_IS_MASTER(route_target))
|
|
||||||
{
|
|
||||||
DCB* curr_master_dcb = NULL;
|
|
||||||
|
|
||||||
succp = get_dcb(&curr_master_dcb,
|
|
||||||
router_cli_ses,
|
|
||||||
BE_MASTER,
|
|
||||||
NULL,
|
|
||||||
MAX_RLAG_UNDEFINED);
|
|
||||||
|
|
||||||
if (succp && master_dcb == curr_master_dcb)
|
|
||||||
{
|
|
||||||
atomic_add(&inst->stats.n_master, 1);
|
|
||||||
target_dcb = master_dcb;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (succp && master_dcb != curr_master_dcb)
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
|
||||||
"Was supposed to "
|
|
||||||
"route to master "
|
|
||||||
"but master has "
|
|
||||||
"changed.")));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
|
||||||
"Was supposed to "
|
|
||||||
"route to master "
|
|
||||||
"but couldn't find "
|
|
||||||
"master in a "
|
|
||||||
"suitable state.")));
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Master has changed. Return with error indicator.
|
|
||||||
*/
|
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
|
||||||
succp = false;
|
|
||||||
ret = 0;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (succp) /*< Have DCB of the target backend */
|
|
||||||
{
|
|
||||||
backend_ref_t* bref;
|
|
||||||
sescmd_cursor_t* scur;
|
|
||||||
|
|
||||||
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
|
|
||||||
scur = &bref->bref_sescmd_cur;
|
|
||||||
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
|
||||||
LOGFILE_TRACE,
|
|
||||||
"Route query to %s \t%s:%d <",
|
|
||||||
(SERVER_IS_MASTER(bref->bref_backend->backend_server) ?
|
|
||||||
"master" : "slave"),
|
|
||||||
bref->bref_backend->backend_server->name,
|
|
||||||
bref->bref_backend->backend_server->port)));
|
|
||||||
/**
|
|
||||||
* Store current stmt if execution of previous session command
|
|
||||||
* haven't completed yet. Note that according to MySQL protocol
|
|
||||||
* there can only be one such non-sescmd stmt at the time.
|
|
||||||
*/
|
|
||||||
if (sescmd_cursor_is_active(scur))
|
|
||||||
{
|
|
||||||
ss_dassert(bref->bref_pending_cmd == NULL);
|
|
||||||
bref->bref_pending_cmd = gwbuf_clone(querybuf);
|
|
||||||
|
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
|
||||||
ret = 1;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
|
|
||||||
{
|
|
||||||
backend_ref_t* bref;
|
|
||||||
|
|
||||||
atomic_add(&inst->stats.n_queries, 1);
|
|
||||||
/**
|
|
||||||
* Add one query response waiter to backend reference
|
|
||||||
*/
|
|
||||||
bref = get_bref_from_dcb(router_cli_ses, target_dcb);
|
|
||||||
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
|
||||||
bref_set_state(bref, BREF_WAITING_RESULT);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"Error : Routing query failed.")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
|
||||||
#endif
|
|
||||||
retblock:
|
retblock:
|
||||||
#if defined(SS_DEBUG2)
|
#if defined(SS_DEBUG2)
|
||||||
{
|
if (querybuf != NULL)
|
||||||
char* canonical_query_str;
|
{
|
||||||
|
char* canonical_query_str;
|
||||||
|
|
||||||
canonical_query_str = skygw_get_canonical(querybuf);
|
canonical_query_str = skygw_get_canonical(querybuf);
|
||||||
|
|
||||||
@ -2316,10 +1922,14 @@ retblock:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
gwbuf_free(querybuf);
|
if (querybuf != NULL) gwbuf_free(querybuf);
|
||||||
|
if (succp) ret = 1;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static bool route_single_stmt(
|
static bool route_single_stmt(
|
||||||
ROUTER_INSTANCE* inst,
|
ROUTER_INSTANCE* inst,
|
||||||
ROUTER_CLIENT_SES* rses,
|
ROUTER_CLIENT_SES* rses,
|
||||||
@ -2331,9 +1941,8 @@ static bool route_single_stmt(
|
|||||||
int ret = 0;
|
int ret = 0;
|
||||||
DCB* master_dcb = NULL;
|
DCB* master_dcb = NULL;
|
||||||
DCB* target_dcb = NULL;
|
DCB* target_dcb = NULL;
|
||||||
bool rses_is_closed = false;
|
|
||||||
route_target_t route_target;
|
route_target_t route_target;
|
||||||
bool succp;
|
bool succp = false;
|
||||||
int rlag_max = MAX_RLAG_UNDEFINED;
|
int rlag_max = MAX_RLAG_UNDEFINED;
|
||||||
backend_type_t btype; /*< target backend type */
|
backend_type_t btype; /*< target backend type */
|
||||||
|
|
||||||
@ -2341,31 +1950,7 @@ static bool route_single_stmt(
|
|||||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||||
packet = GWBUF_DATA(querybuf);
|
packet = GWBUF_DATA(querybuf);
|
||||||
packet_type = packet[4];
|
packet_type = packet[4];
|
||||||
|
|
||||||
if (rses_is_closed)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* MYSQL_COM_QUIT may have sent by client and as a part of backend
|
|
||||||
* closing procedure.
|
|
||||||
*/
|
|
||||||
if (packet_type != MYSQL_COM_QUIT)
|
|
||||||
{
|
|
||||||
char* query_str = modutil_get_query(querybuf);
|
|
||||||
|
|
||||||
LOGIF(LE,
|
|
||||||
(skygw_log_write_flush(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"Error: Can't route %s:%s:\"%s\" to "
|
|
||||||
"backend server. Router is closed.",
|
|
||||||
STRPACKETTYPE(packet_type),
|
|
||||||
STRQTYPE(qtype),
|
|
||||||
(query_str == NULL ? "(empty)" : query_str))));
|
|
||||||
free(query_str);
|
|
||||||
}
|
|
||||||
succp = false;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read stored master DCB pointer. If master is not set, routing must
|
* Read stored master DCB pointer. If master is not set, routing must
|
||||||
* be aborted
|
* be aborted
|
||||||
@ -2552,9 +2137,19 @@ static bool route_single_stmt(
|
|||||||
/** Lock router session */
|
/** Lock router session */
|
||||||
if (!rses_begin_locked_router_action(rses))
|
if (!rses_begin_locked_router_action(rses))
|
||||||
{
|
{
|
||||||
LOGIF(LT, (skygw_log_write(
|
if (packet_type != MYSQL_COM_QUIT)
|
||||||
LOGFILE_TRACE,
|
{
|
||||||
"Route query aborted! Routing session is closed <")));
|
char* query_str = modutil_get_query(querybuf);
|
||||||
|
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error: Can't route %s:%s:\"%s\" to "
|
||||||
|
"backend server. Router is closed.",
|
||||||
|
STRPACKETTYPE(packet_type),
|
||||||
|
STRQTYPE(qtype),
|
||||||
|
(query_str == NULL ? "(empty)" : query_str))));
|
||||||
|
free(query_str);
|
||||||
|
}
|
||||||
succp = false;
|
succp = false;
|
||||||
goto retblock;
|
goto retblock;
|
||||||
}
|
}
|
||||||
@ -2729,6 +2324,8 @@ static bool route_single_stmt(
|
|||||||
bref = get_bref_from_dcb(rses, target_dcb);
|
bref = get_bref_from_dcb(rses, target_dcb);
|
||||||
scur = &bref->bref_sescmd_cur;
|
scur = &bref->bref_sescmd_cur;
|
||||||
|
|
||||||
|
ss_dassert(target_dcb != NULL);
|
||||||
|
|
||||||
LOGIF(LT, (skygw_log_write(
|
LOGIF(LT, (skygw_log_write(
|
||||||
LOGFILE_TRACE,
|
LOGFILE_TRACE,
|
||||||
"Route query to %s \t%s:%d <",
|
"Route query to %s \t%s:%d <",
|
||||||
|
|||||||
Reference in New Issue
Block a user