Merge pull request #7 from skysql/MAX-167

Max 167
This commit is contained in:
Mark Riddoch 2014-06-30 16:18:27 +01:00
commit 6bc20f765a
13 changed files with 634 additions and 320 deletions

View File

@ -35,6 +35,13 @@ passwd=maxpwd
# Valid options are:
#
# router=<name of router module>
# max_slave_connections=<number or percentage>
# router_options=<option[=value]>,<option[=value]>,...
# valid options include:master,slave,synced,
# slave_selection_criteria=
# LEAST_CURRENT_OPERATIONS,
# LEAST_ROUTER_CONNECTIONS,
# LEAST_GLOBAL_CONNECTIONS
# servers=<server name>,<server name>,...
# user=<User to fetch password inforamtion with>
# passwd=<Password of the user, plain text currently>

View File

@ -152,6 +152,7 @@ GWBUF *gwbuf_clone_portion(
}
atomic_add(&buf->sbuf->refcount, 1);
clonebuf->sbuf = buf->sbuf;
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone info bits too */
clonebuf->start = (void *)((char*)buf->start)+start_offset;
clonebuf->end = (void *)((char *)clonebuf->start)+length;
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */
@ -232,12 +233,12 @@ GWBUF *ptr = head;
if (!head)
return tail;
CHK_GWBUF(head);
CHK_GWBUF(tail);
while (ptr->next)
{
ptr = ptr->next;
}
ptr->next = tail;
return head;
}
@ -316,27 +317,12 @@ gwbuf_trim(GWBUF *buf, unsigned int n_bytes)
return buf;
}
bool gwbuf_set_type(
void gwbuf_set_type(
GWBUF* buf,
gwbuf_type_t type)
{
bool succp;
CHK_GWBUF(buf);
switch (type) {
case GWBUF_TYPE_MYSQL:
case GWBUF_TYPE_PLAINSQL:
case GWBUF_TYPE_UNDEFINED:
case GWBUF_TYPE_SINGLE_STMT: /*< buffer contains one stmt */
buf->gwbuf_type |= type;
succp = true;
break;
default:
succp = false;
break;
}
ss_dassert(succp);
return succp;
CHK_GWBUF(buf);
buf->gwbuf_type |= type;
}

View File

@ -280,7 +280,8 @@ char *stat;
}
dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current);
ptr = ptr->next;
dcb_printf(dcb, "\tCurrent no. of operations: %d\n", ptr->stats.n_current_ops);
ptr = ptr->next;
}
spinlock_release(&server_spin);
}
@ -342,6 +343,7 @@ SERVER_PARAM *param;
}
dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current);
dcb_printf(dcb, "\tCurrent no. of operations: %d\n", server->stats.n_current_ops);
}
/**

View File

@ -46,16 +46,22 @@
typedef enum
{
GWBUF_TYPE_UNDEFINED = 0x00,
GWBUF_TYPE_PLAINSQL = 0x01,
GWBUF_TYPE_MYSQL = 0x02,
GWBUF_TYPE_SINGLE_STMT = 0x04
GWBUF_TYPE_UNDEFINED = 0x00,
GWBUF_TYPE_PLAINSQL = 0x01,
GWBUF_TYPE_MYSQL = 0x02,
GWBUF_TYPE_SINGLE_STMT = 0x04,
GWBUF_TYPE_SESCMD_RESPONSE = 0x08,
GWBUF_TYPE_RESPONSE_END = 0x10,
GWBUF_TYPE_SESCMD = 0x20
} gwbuf_type_t;
#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0)
#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL)
#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL)
#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT)
#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0)
#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL)
#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL)
#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT)
#define GWBUF_IS_TYPE_SESCMD_RESPONSE(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD_RESPONSE)
#define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END)
#define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD)
/**
* A structure to encapsulate the data in a form that the data itself can be
@ -114,5 +120,5 @@ extern GWBUF *gwbuf_trim(GWBUF *head, unsigned int length);
extern unsigned int gwbuf_length(GWBUF *head);
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
extern bool gwbuf_set_type(GWBUF *head, gwbuf_type_t type);
extern void gwbuf_set_type(GWBUF *head, gwbuf_type_t type);
#endif

View File

@ -297,6 +297,8 @@ bool dcb_set_state(
DCB* dcb,
dcb_state_t new_state,
dcb_state_t* old_state);
void dcb_call_foreach (DCB_REASON reason);
void dcb_call_foreach (
DCB_REASON reason);

View File

@ -60,6 +60,7 @@ typedef struct server_params {
typedef struct {
int n_connections; /**< Number of connections */
int n_current; /**< Current connections */
int n_current_ops; /**< Current active operations */
} SERVER_STATS;
/**

View File

@ -66,6 +66,7 @@
#define GW_MYSQL_LOOP_TIMEOUT 300000000
#define GW_MYSQL_READ 0
#define GW_MYSQL_WRITE 1
#define MYSQL_HEADER_LEN 4L
#define GW_MYSQL_PROTOCOL_VERSION 10 // version is 10
#define GW_MYSQL_HANDSHAKE_FILLER 0x00
@ -235,7 +236,8 @@ typedef enum mysql_server_cmd {
MYSQL_COM_STMT_RESET,
MYSQL_COM_SET_OPTION,
MYSQL_COM_STMT_FETCH,
MYSQL_COM_DAEMON
MYSQL_COM_DAEMON,
MYSQL_COM_END /*< Must be the last */
} mysql_server_cmd_t;
@ -245,9 +247,10 @@ typedef enum mysql_server_cmd {
* one MySQLProtocol and one server command list.
*/
typedef struct server_command_st {
mysql_server_cmd_t cmd;
int nresponse_packets; /** filled when reply arrives */
struct server_command_st* next;
mysql_server_cmd_t scom_cmd;
int scom_nresponse_packets; /*< packets in response */
size_t scom_nbytes_to_read; /*< bytes left to read in current packet */
struct server_command_st* scom_next;
} server_command_t;
/*
@ -262,6 +265,7 @@ typedef struct {
* we are running on */
SPINLOCK protocol_lock;
server_command_t protocol_command; /*< list of active commands */
server_command_t* protocol_cmd_history; /*< command history list */
mysql_auth_state_t protocol_auth_state; /*< Authentication status */
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble,
* created or received */
@ -285,6 +289,7 @@ typedef struct {
#define MYSQL_GET_STMTOK_NPARAM(payload) (gw_mysql_get_byte2(&payload[9]))
#define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11]))
#define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff)
#define MYSQL_GET_NATTR(payload) ((int)payload[4])
#endif /** _MYSQL_PROTOCOL_H */
@ -365,8 +370,16 @@ void protocol_add_srv_command(MySQLProtocol* p, mysql_server_cmd_t cmd);
void protocol_remove_srv_command(MySQLProtocol* p);
bool protocol_waits_response(MySQLProtocol* p);
mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep);
int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd);
int protocol_get_nresponse_packets (MySQLProtocol* p);
bool protocol_set_nresponse_packets (MySQLProtocol* p, int nresponse_packets);
int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd);
bool protocol_get_response_status (MySQLProtocol* p, int* npackets, size_t* nbytes);
void protocol_set_response_status (MySQLProtocol* p, int npackets, size_t nbytes);
void protocol_archive_srv_command(MySQLProtocol* p);
void init_response_status (
GWBUF* buf,
mysql_server_cmd_t cmd,
int* npackets,
size_t* nbytes);

View File

@ -52,13 +52,15 @@ typedef enum prep_stmt_state {
typedef enum bref_state {
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */
BREF_CLOSED = 0x04
BREF_WAITING_RESULT = 0x02, /*< for session commands only */
BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
BREF_CLOSED = 0x08
} bref_state_t;
#define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT)
#define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) (s->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
typedef enum backend_type_t {
@ -90,9 +92,10 @@ typedef enum rses_property_type_t {
typedef enum select_criteria {
UNDEFINED_CRITERIA=0,
LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */
DEFAULT_CRITERIA=LEAST_GLOBAL_CONNECTIONS,
LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */
LEAST_BEHIND_MASTER,
LEAST_CURRENT_OPERATIONS,
DEFAULT_CRITERIA=LEAST_CURRENT_OPERATIONS,
LAST_CRITERIA /*< not used except for an index */
} select_criteria_t;
@ -106,7 +109,9 @@ typedef enum select_criteria {
strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \
LEAST_BEHIND_MASTER : ( \
strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \
LEAST_ROUTER_CONNECTIONS : UNDEFINED_CRITERIA)))
LEAST_ROUTER_CONNECTIONS : ( \
strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \
LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA))))
/**
* Session variable command
@ -190,6 +195,7 @@ typedef struct backend_ref_st {
BACKEND* bref_backend;
DCB* bref_dcb;
bref_state_t bref_state;
int bref_num_result_wait;
sescmd_cursor_t bref_sescmd_cur;
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;

View File

@ -340,6 +340,8 @@ char *uname = handle->defaultUser;
char *passwd = handle->defaultPasswd;
unsigned long int server_version = 0;
char *server_string;
unsigned long id = handle->id;
int replication_heartbeat = handle->replicationHeartbeat;
if (database->server->monuser != NULL)
{

View File

@ -65,6 +65,10 @@ static int gw_backend_hangup(DCB *dcb);
static int backend_write_delayqueue(DCB *dcb);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process);
#if defined(NOT_USED)
static int gw_session(DCB *backend_dcb, void *data);
#endif
@ -416,21 +420,20 @@ static int gw_read_backend_event(DCB *dcb) {
/* reading MySQL command output from backend and writing to the client */
{
GWBUF *readbuf = NULL;
GWBUF *read_buffer = NULL;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
int nbytes_read = 0;
mysql_server_cmd_t srvcmd = MYSQL_COM_UNDEFINED;
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
/* read available backend data */
rc = dcb_read(dcb, &readbuf);
rc = dcb_read(dcb, &read_buffer);
if (rc < 0)
{
@ -463,7 +466,7 @@ static int gw_read_backend_event(DCB *dcb) {
rc = 0;
goto return_rc;
}
nbytes_read = gwbuf_length(readbuf);
nbytes_read = gwbuf_length(read_buffer);
if (nbytes_read == 0)
{
@ -471,74 +474,41 @@ static int gw_read_backend_event(DCB *dcb) {
}
else
{
ss_dassert(readbuf != NULL);
ss_dassert(read_buffer != NULL);
}
/**
* ask for next response (1 or more packets) like in
* gw_MySQL_get_next_packet but gw_MySQL_get_next_response
*/
srvcmd = protocol_get_srv_command((MySQLProtocol *)dcb->protocol,
false);
/**
* If backend DCB is waiting for response to COM_STMT_PREPARE,
* it, then only that must be passed to clientReply.
*
* If response consists of ses cmd response and response to
* COM_STMT_PREPARE, there can't be anything after
* COM_STMT_PREPARE response because whole buffer may be
* discarded since router doesn't know the borderlines of MySQL
* packets.
*/
/**
* Read all packets from <readbuf> which belong to STMT PREPARE
* response.
* Move packets not belonging to STMT PREPARE response to
* dcb_readqueue.
* When whole response is read, pass <readbuf> forward to
* clientReply.
*/
if (srvcmd == MYSQL_COM_STMT_PREPARE)
/** Packet prefix was read earlier */
if (dcb->dcb_readqueue)
{
MySQLProtocol* p;
int nresponse_packets;
GWBUF* tmpbuf;
read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer);
nbytes_read = gwbuf_length(read_buffer);
p = (MySQLProtocol *)dcb->protocol;
nresponse_packets = protocol_get_nresponse_packets(p);
/** count only once per response */
if (nresponse_packets == 0)
{
nresponse_packets = get_stmt_nresponse_packets(
readbuf,
srvcmd);
}
tmpbuf = gw_MySQL_get_packets(&readbuf, &nresponse_packets);
gwbuf_append(dcb->dcb_readqueue, readbuf);
readbuf = tmpbuf;
/** <readbuf> contains incomplete response to STMT PREPARE */
if (nresponse_packets != 0)
if (nbytes_read < 5) /*< read at least command type */
{
rc = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend fd %d read incomplete response packet. "
"Waiting %d more, cmd %s.",
dcb->fd,
nresponse_packets,
STRPACKETTYPE(srvcmd))));
/**
* store the number of how many packets the
* reponse consists of to backend's protocol.
*/
protocol_set_nresponse_packets(p, nresponse_packets);
goto return_rc;
}
protocol_remove_srv_command((MySQLProtocol *)dcb->protocol);
/** There is at least length and command type. */
else
{
read_buffer = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL;
}
}
else
{
if (nbytes_read < 5)
{
gwbuf_append(dcb->dcb_readqueue, read_buffer);
rc = 0;
goto return_rc;
}
}
/** If protocol has command set it is session command */
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) !=
MYSQL_COM_UNDEFINED)
{
read_buffer = process_response_data(dcb, read_buffer, nbytes_read);
}
/*<
* If dcb->session->client is freed already it may be NULL.
@ -554,20 +524,20 @@ static int gw_read_backend_event(DCB *dcb) {
if (client_protocol->protocol_auth_state ==
MYSQL_IDLE)
{
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance,
rsession,
readbuf,
dcb);
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
router->clientReply(
router_instance,
rsession,
read_buffer,
dcb);
rc = 1;
}
goto return_rc;
}
else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL)
{
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, readbuf, dcb);
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, read_buffer, dcb);
rc = 1;
}
}
@ -672,7 +642,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* If auth failed, return value is 0, write and buffered write
* return 1.
*/
switch(backend_protocol->protocol_auth_state) {
switch (backend_protocol->protocol_auth_state) {
case MYSQL_AUTH_FAILED:
{
size_t len;
@ -712,22 +682,18 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb,
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
spinlock_release(&dcb->authlock);
/**
* Statement type is used in readwrite split router.
* Command is *not* set for readconn router.
*
* Server commands are stored to MySQLProtocol structure
* if buffer always includes a single statement. That
* information is stored in GWBUF type field
* (GWBUF_TYPE_SINGLE_STMT bit).
* if buffer always includes a single statement.
*/
if (GWBUF_IS_TYPE_SINGLE_STMT(queue))
if (GWBUF_IS_TYPE_SINGLE_STMT(queue) &&
GWBUF_IS_TYPE_SESCMD(queue))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's DCB fd %d "
"cmd %s protocol state %s.",
dcb->fd,
STRPACKETTYPE(cmd),
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd);
}
@ -751,21 +717,12 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/**
* Since it is known that buffer contains one complete
* command, store the command to backend's protocol. When
* backend server responses the command determines how
* response needs to be processed. This is mainly due to
* MYSQL_COM_STMT_PREPARE whose response consists of
* arbitrary number of packets.
* In case of session commands, store command to DCB's
* protocol struct.
*/
if (GWBUF_IS_TYPE_SINGLE_STMT(queue))
if (GWBUF_IS_TYPE_SINGLE_STMT(queue) &&
GWBUF_IS_TYPE_SESCMD(queue))
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's delayqueue fd %d "
"protocol state %s.",
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd);
}
@ -797,7 +754,6 @@ static int gw_error_backend_event(DCB *dcb)
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
int rc = 0;
GWBUF* errbuf;
bool succp;
@ -952,7 +908,6 @@ gw_backend_hangup(DCB *dcb)
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
int rc = 0;
bool succp;
GWBUF* errbuf;
@ -1010,7 +965,6 @@ gw_backend_close(DCB *dcb)
DCB* client_dcb;
SESSION* session;
GWBUF* quitbuf;
bool succp;
CHK_DCB(dcb);
session = dcb->session;
@ -1095,7 +1049,6 @@ static int backend_write_delayqueue(DCB *dcb)
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
int receive_rc = 0;
CHK_SESSION(session);
@ -1240,3 +1193,138 @@ static int gw_session(DCB *backend_dcb, void *data) {
return 1;
}
*/
/**
* Move packets or parts of packets from redbuf to outbuf as the packet headers
* and lengths have been noticed and counted.
* Session commands need to be marked so that they can be handled properly in
* the router's clientReply.
* Return the pointer to outbuf.
*/
static GWBUF* process_response_data (
DCB* dcb,
GWBUF* readbuf,
int nbytes_to_process) /*< number of new bytes read */
{
int npackets_left = 0; /*< response's packet count */
size_t nbytes_left = 0; /*< nbytes to be read for the packet */
MySQLProtocol* p;
GWBUF* outbuf = NULL;
/** Get command which was stored in gw_MySQLWrite_backend */
p = DCB_PROTOCOL(dcb, MySQLProtocol);
CHK_PROTOCOL(p);
/** All buffers processed here are sescmd responses */
gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE);
/**
* Now it is known how many packets there should be and how much
* is read earlier.
*/
while (nbytes_to_process != 0)
{
mysql_server_cmd_t srvcmd;
bool succp;
srvcmd = protocol_get_srv_command(p, false);
/**
* Read values from protocol structure, fails if values are
* uninitialized.
*/
if (npackets_left == 0)
{
succp = protocol_get_response_status(p, &npackets_left, &nbytes_left);
if (!succp || npackets_left == 0)
{
/**
* Examine command type and the readbuf. Conclude response
* packet count from the command type or from the first
* packet content. Fails if read buffer doesn't include
* enough data to read the packet length.
*/
init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left);
}
}
/** Only session commands with responses should be processed */
ss_dassert(npackets_left > 0);
/** Read incomplete packet. */
if (nbytes_left > nbytes_to_process)
{
/** Includes length info so it can be processed */
if (nbytes_to_process >= 5)
{
/** discard source buffer */
readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf));
nbytes_left -= nbytes_to_process;
}
nbytes_to_process = 0;
}
/** Packet was read. All bytes belonged to the last packet. */
else if (nbytes_left == nbytes_to_process)
{
nbytes_left = 0;
nbytes_to_process = 0;
ss_dassert(npackets_left > 0);
npackets_left -= 1;
outbuf = gwbuf_append(outbuf, readbuf);
readbuf = NULL;
}
/**
* Packet was read. There should be more since bytes were
* left over.
* Move the next packet to its own buffer and add that next
* to the prev packet's buffer.
*/
else /*< nbytes_left < nbytes_to_process */
{
nbytes_to_process -= nbytes_left;
/** Move the prefix of the buffer to outbuf from redbuf */
outbuf = gwbuf_append(outbuf, gwbuf_clone_portion(readbuf, 0, nbytes_left));
readbuf = gwbuf_consume(readbuf, nbytes_left);
ss_dassert(npackets_left > 0);
npackets_left -= 1;
nbytes_left = 0;
}
/** Store new status to protocol structure */
protocol_set_response_status(p, npackets_left, nbytes_left);
/** A complete packet was read */
if (nbytes_left == 0)
{
/** No more packets in this response */
if (npackets_left == 0)
{
GWBUF* b = outbuf;
while (b->next != NULL)
{
b = b->next;
}
/** Mark last as end of response */
gwbuf_set_type(b, GWBUF_TYPE_RESPONSE_END);
/** Archive the command */
protocol_archive_srv_command(p);
}
/** Read next packet */
else
{
uint8_t* data;
/** Read next packet length */
data = GWBUF_DATA(readbuf);
nbytes_left = MYSQL_GET_PACKET_LEN(data)+MYSQL_HEADER_LEN;
/** Store new status to protocol structure */
protocol_set_response_status(p, npackets_left, nbytes_left);
}
}
}
return outbuf;
}

View File

@ -574,9 +574,8 @@ int gw_read_client_event(
else
{
uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer);
size_t packetlen = MYSQL_GET_PACKET_LEN(data)+4;
if (nbytes_read < 3 || nbytes_read < packetlen)
if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)+4)
{
gwbuf_append(dcb->dcb_readqueue, read_buffer);
rc = 0;
@ -1291,11 +1290,12 @@ return_rc:
static int gw_error_client_event(
DCB* dcb)
{
int rc;
SESSION* session;
CHK_DCB(dcb);
session = dcb->session;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_error_client_event] Error event handling for DCB %p "
@ -1362,7 +1362,6 @@ gw_client_close(DCB *dcb)
static int
gw_client_hangup_event(DCB *dcb)
{
int rc;
SESSION* session;
CHK_DCB(dcb);

View File

@ -41,6 +41,9 @@ extern int gw_write_backend_event(DCB *dcb);
extern int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue);
extern int gw_error_backend_event(DCB *dcb);
static server_command_t* server_command_init(server_command_t* srvcmd,
mysql_server_cmd_t cmd);
/**
* Creates MySQL protocol structure
@ -77,7 +80,9 @@ MySQLProtocol* mysql_protocol_init(
goto return_p;
}
p->protocol_auth_state = MYSQL_ALLOC;
p->protocol_command.cmd = MYSQL_COM_UNDEFINED;
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
p->protocol_command.scom_nresponse_packets = 0;
p->protocol_command.scom_nbytes_to_read = 0;
#if defined(SS_DEBUG)
p->protocol_chk_top = CHK_NUM_PROTOCOL;
p->protocol_chk_tail = CHK_NUM_PROTOCOL;
@ -1557,6 +1562,94 @@ GWBUF* gw_MySQL_get_packets(
}
static server_command_t* server_command_init(
server_command_t* srvcmd,
mysql_server_cmd_t cmd)
{
server_command_t* c;
if (srvcmd != NULL)
{
c = srvcmd;
}
else
{
c = (server_command_t *)malloc(sizeof(server_command_t));
}
c->scom_cmd = cmd;
c->scom_nresponse_packets = -1;
c->scom_nbytes_to_read = 0;
c->scom_next = NULL;
return c;
}
static server_command_t* server_command_copy(
server_command_t* srvcmd)
{
server_command_t* c =
(server_command_t *)malloc(sizeof(server_command_t));
*c = *srvcmd;
return c;
}
#define MAX_CMD_HISTORY 10
void protocol_archive_srv_command(
MySQLProtocol* p)
{
server_command_t* s1;
server_command_t** s2;
int len;
spinlock_acquire(&p->protocol_lock);
s1 = &p->protocol_command;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Move command %s from fd %d to command history.",
STRPACKETTYPE(s1->scom_cmd),
p->owner_dcb->fd)));
/** Copy to history list */
s2 = &p->protocol_cmd_history;
if (*s2 != NULL)
{
len = 0;
while ((*s2)->scom_next != NULL)
{
*s2 = (*s2)->scom_next;
len += 1;
}
}
*s2 = server_command_copy(s1);
/** Keep history limits, remove oldest */
if (len > MAX_CMD_HISTORY)
{
server_command_t* c = p->protocol_cmd_history;
p->protocol_cmd_history = p->protocol_cmd_history->scom_next;
free(c);
}
/** Remove from command list */
if (s1->scom_next == NULL)
{
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
}
else
{
p->protocol_command = *(s1->scom_next);
free(s1->scom_next);
}
spinlock_release(&p->protocol_lock);
}
/**
* If router expects to get separate, complete statements, add MySQL command
* to MySQLProtocol structure. It is removed when response has arrived.
@ -1565,49 +1658,42 @@ void protocol_add_srv_command(
MySQLProtocol* p,
mysql_server_cmd_t cmd)
{
spinlock_acquire(&p->protocol_lock);
server_command_t* c;
if (p->protocol_command.cmd == MYSQL_COM_UNDEFINED)
spinlock_acquire(&p->protocol_lock);
/** this is the only server command in protocol */
if (p->protocol_command.scom_cmd == MYSQL_COM_UNDEFINED)
{
p->protocol_command.cmd = cmd;
p->protocol_command.nresponse_packets = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Added command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
/** write into structure */
server_command_init(&p->protocol_command, cmd);
}
else
{
server_command_t* c =
(server_command_t *)malloc(sizeof(server_command_t));
c->cmd = cmd;
c->nresponse_packets = 0;
c->next = NULL;
p->protocol_command.next = c;
/** add to the end of list */
p->protocol_command.scom_next = server_command_init(NULL, cmd);
}
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Added command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
#if defined(SS_DEBUG)
c = &p->protocol_command;
while (c != NULL && c->scom_cmd != MYSQL_COM_UNDEFINED)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Added another command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
#if defined(SS_DEBUG)
c = &p->protocol_command;
while (c != NULL && c->cmd != MYSQL_COM_UNDEFINED)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"fd %d : %d %s",
p->owner_dcb->fd,
c->cmd,
STRPACKETTYPE(c->cmd))));
c = c->next;
}
#endif
"fd %d : %d %s",
p->owner_dcb->fd,
c->scom_cmd,
STRPACKETTYPE(c->scom_cmd))));
c = c->scom_next;
}
#endif
spinlock_release(&p->protocol_lock);
}
@ -1628,17 +1714,17 @@ void protocol_remove_srv_command(
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Removed command %s from fd %d.",
STRPACKETTYPE(s->cmd),
STRPACKETTYPE(s->scom_cmd),
p->owner_dcb->fd)));
if (s->next == NULL)
if (s->scom_next == NULL)
{
p->protocol_command.cmd = MYSQL_COM_UNDEFINED;
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
}
else
{
p->protocol_command = *(s->next);
free(s->next);
p->protocol_command = *(s->scom_next);
free(s->scom_next);
}
spinlock_release(&p->protocol_lock);
@ -1650,97 +1736,133 @@ mysql_server_cmd_t protocol_get_srv_command(
{
mysql_server_cmd_t cmd;
cmd = p->protocol_command.cmd;
cmd = p->protocol_command.scom_cmd;
if (removep)
{
protocol_remove_srv_command(p);
}
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Read command %s for fd %d.",
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [protocol_get_srv_command] Read command %s for fd %d.",
pthread_self(),
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
return cmd;
}
/**
* Return how many packets are included in the server's response.
/**
* Examine command type and the readbuf. Conclude response
* packet count from the command type or from the first packet
* content.
* Fails if read buffer doesn't include enough data to read the
* packet length.
*/
int get_stmt_nresponse_packets(
void init_response_status (
GWBUF* buf,
mysql_server_cmd_t cmd)
mysql_server_cmd_t cmd,
int* npackets,
size_t* nbytes_left)
{
int npackets;
uint8_t* packet;
int nparam;
int nattr;
uint8_t* data;
switch (cmd) {
case MYSQL_COM_STMT_PREPARE:
data = (uint8_t *)buf->start;
if (data[4] == 0xff)
{
npackets = 1; /*< error packet */
}
else
{
ss_dassert(gwbuf_length(buf) >= 3);
data = (uint8_t *)buf->start;
if (data[4] == 0xff) /*< error */
{
*npackets = 1;
}
else
{
switch (cmd) {
case MYSQL_COM_STMT_PREPARE:
packet = (uint8_t *)GWBUF_DATA(buf);
/** ok + nparam + eof + nattr + eof */
nparam = MYSQL_GET_STMTOK_NPARAM(packet);
nattr = MYSQL_GET_STMTOK_NATTR(packet);
npackets = 1 + nparam + MIN(1, nparam) +
nattr + MIN(nattr, 1);
ss_dassert(npackets<128);
}
break;
default:
npackets = 1;
break;
*npackets = 1 + nparam + MIN(1, nparam) +
nattr + MIN(nattr, 1);
break;
case MYSQL_COM_QUIT:
case MYSQL_COM_STMT_SEND_LONG_DATA:
case MYSQL_COM_STMT_CLOSE:
*npackets = 0; /*< these don't reply anything */
break;
default:
/**
* assume that other session commands respond
* OK or ERR
*/
*npackets = 1;
break;
}
}
ss_dassert(npackets<128);
return npackets;
*nbytes_left = MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN;
/**
* There is at least one complete packet in the buffer so buffer is bigger
* than packet
*/
ss_dassert(*nbytes_left > 0);
ss_dassert(*npackets > 0);
ss_dassert(*npackets<128);
}
int protocol_get_nresponse_packets (
MySQLProtocol* p)
{
int rval;
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
rval = p->protocol_command.nresponse_packets;
spinlock_release(&p->protocol_lock);
ss_dassert(rval<128);
return rval;
}
bool protocol_set_nresponse_packets (
/**
* Read how many packets are left from current response and how many bytes there
* is still to be read from the current packet.
*/
bool protocol_get_response_status (
MySQLProtocol* p,
int nresponse_packets)
int* npackets,
size_t* nbytes)
{
bool succp;
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
if (p->protocol_command.nresponse_packets > 0 &&
nresponse_packets > p->protocol_command.nresponse_packets)
*npackets = p->protocol_command.scom_nresponse_packets;
*nbytes = p->protocol_command.scom_nbytes_to_read;
spinlock_release(&p->protocol_lock);
if (*npackets < 0 && *nbytes == 0)
{
succp = false;
}
else
{
p->protocol_command.nresponse_packets = nresponse_packets;
ss_dassert(nresponse_packets<128);
succp = true;
}
spinlock_release(&p->protocol_lock);
return succp;
}
void protocol_set_response_status (
MySQLProtocol* p,
int npackets_left,
size_t nbytes)
{
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
p->protocol_command.scom_nbytes_to_read = nbytes;
ss_dassert(p->protocol_command.scom_nbytes_to_read >= 0);
p->protocol_command.scom_nresponse_packets = npackets_left;
spinlock_release(&p->protocol_lock);
}

View File

@ -115,12 +115,18 @@ int bref_cmp_behind_master(
const void* bref1,
const void* bref2);
int bref_cmp_current_load(
const void* bref1,
const void* bref2);
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
{
NULL,
bref_cmp_global_conn,
bref_cmp_router_conn,
bref_cmp_behind_master
bref_cmp_behind_master,
bref_cmp_current_load
};
static bool select_connect_backend_servers(
@ -210,10 +216,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
static bool sescmd_cursor_next(
sescmd_cursor_t* scur);
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
backend_ref_t* bref);
static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref);
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
@ -464,7 +467,6 @@ static void* newSession(
int max_nslaves; /*< max # of slaves used in this session */
int i;
const int min_nservers = 1; /*< hard-coded for now */
static uint64_t router_client_ses_seq; /*< ID for client session */
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
@ -1115,13 +1117,15 @@ static int routeQuery(
{
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
{
backend_ref_t* bref;
atomic_add(&inst->stats.n_slave, 1);
/**
* This backend_ref waits resultset, flag it.
* Add one query response waiter to backend reference
*/
bref_set_state(get_bref_from_dcb(router_cli_ses,
slave_dcb),
BREF_WAITING_RESULT);
bref = get_bref_from_dcb(router_cli_ses, slave_dcb);
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
@ -1166,18 +1170,21 @@ static int routeQuery(
{
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
}
if (succp)
{
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
{
backend_ref_t* bref;
atomic_add(&inst->stats.n_master, 1);
/**
* This backend_ref waits reply to write stmt,
* flag it.
* Add one write response waiter to backend reference
*/
bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb),
BREF_WAITING_RESULT);
bref = get_bref_from_dcb(router_cli_ses, master_dcb);
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
}
rses_end_locked_router_action(router_cli_ses);
@ -1380,8 +1387,7 @@ static void clientReply (
scur = &bref->bref_sescmd_cur;
/**
* Active cursor means that reply is from session command
* execution. Majority of the time there are no session commands
* being executed.
* execution.
*/
if (sescmd_cursor_is_active(scur))
{
@ -1392,8 +1398,7 @@ static void clientReply (
(uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
size_t len = MYSQL_GET_PACKET_LEN(buf);
char* cmdstr = (char *)malloc(len+1);
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
snprintf(cmdstr, len+1, "%s", &buf[5]);
LOGIF(LE, (skygw_log_write_flush(
@ -1405,30 +1410,51 @@ static void clientReply (
free(cmdstr);
/** Inform the client */
handle_error_reply_client(ses,writebuf);
handle_error_reply_client(ses, writebuf);
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto lock_failed;
}
else
else if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf))
{
/**
* Discard all those responses that have already been sent to
* the client. Return with buffer including response that
* needs to be sent to client or NULL.
*/
writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf,
bref);
writebuf = sescmd_cursor_process_replies(writebuf, bref);
}
else
{
ss_dassert(false);
}
/**
* If response will be sent to client, decrease waiter count.
* This applies to session commands only. Counter decrement
* for other type of queries is done outside this block.
*/
if (writebuf != NULL && client_dcb != NULL)
{
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
}
/**
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
* This applies for queries other than session commands.
*/
else if (BREF_IS_QUERY_ACTIVE(bref))
{
bref_clear_state(bref, BREF_QUERY_ACTIVE);
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
bref_clear_state(bref, BREF_WAITING_RESULT);
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
@ -1439,7 +1465,7 @@ static void clientReply (
/** Log to debug that router was closed */
goto lock_failed;
}
/** There is one pending session command to be xexecuted. */
/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
{
bool succp;
@ -1452,6 +1478,8 @@ static void clientReply (
bref->bref_backend->backend_server->port)));
succp = execute_sescmd_in_backend(bref);
ss_dassert(succp);
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
@ -1491,21 +1519,84 @@ int bref_cmp_behind_master(
return 1;
}
int bref_cmp_current_load(
const void* bref1,
const void* bref2)
{
SERVER* s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server;
SERVER* s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server;
return ((s1->stats.n_current_ops < s2->stats.n_current_ops) ? -1 :
((s1->stats.n_current_ops > s2->stats.n_current_ops) ? 1 : 0));
}
static void bref_clear_state(
backend_ref_t* bref,
bref_state_t state)
{
bref->bref_state &= ~state;
if (state != BREF_WAITING_RESULT)
{
bref->bref_state &= ~state;
}
else
{
int prev1;
int prev2;
/** Decrease waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
ss_dassert(prev1 > 0);
/** Decrease global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d after in %s:%d",
prev1-1,
prev2-1,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
}
}
static void bref_set_state(
backend_ref_t* bref,
bref_state_t state)
{
bref->bref_state |= state;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Set state %d for %s:%d fd %d",
if (state != BREF_WAITING_RESULT)
{
bref->bref_state |= state;
}
else
{
int prev1;
int prev2;
/** Increase waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
ss_dassert(prev1 >= 0);
/** Increase global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d before in %s:%d",
prev1,
prev2,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
}
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [bref_set_state] Set state %d for %s:%d fd %d",
pthread_self(),
bref->bref_state,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port,
@ -1680,6 +1771,13 @@ static bool select_connect_backend_servers(
b->backend_conn_count)));
break;
case LEAST_CURRENT_OPERATIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->stats.n_current_ops)));
break;
default:
break;
}
@ -1699,13 +1797,11 @@ static bool select_connect_backend_servers(
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Examine server "
"%s:%d %s with %d connections. "
"router->bitvalue is %d",
"%s:%d %s with %d active operations.",
b->backend_server->name,
b->backend_server->port,
STRSRVSTATUS(b->backend_server),
b->backend_conn_count,
router->bitmask)));
b->backend_server->stats.n_current_ops)));
if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) ==
@ -2191,13 +2287,9 @@ static void mysql_sescmd_done(
* 9. s+q+
*/
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
backend_ref_t* bref)
{
const size_t headerlen = 4; /*< mysql packet header */
size_t packetlen;
uint8_t* packet;
mysql_sescmd_t* scmd;
sescmd_cursor_t* scur;
@ -2205,7 +2297,6 @@ static GWBUF* sescmd_cursor_process_replies(
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur);
CHK_DCB(client_dcb);
CHK_GWBUF(replybuf);
/**
@ -2217,35 +2308,21 @@ static GWBUF* sescmd_cursor_process_replies(
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
bool last_packet = false;
CHK_GWBUF(replybuf);
packet = (uint8_t *)GWBUF_DATA(replybuf);
/**
* If it is response to MYSQL_COM_STMT_PREPARE, then buffer
* only includes the response.
*/
if (scmd->my_sescmd_prop->rses_prop_data.sescmd.my_sescmd_packet_type ==
MYSQL_COM_STMT_PREPARE)
while (!last_packet)
{
while (replybuf != NULL)
{
#if defined(SS_DEBUG)
int buflen;
buflen = GWBUF_LENGTH(replybuf);
replybuf = gwbuf_consume(replybuf, buflen);
#else
replybuf = gwbuf_consume(
replybuf,
GWBUF_LENGTH(replybuf));
#endif
}
}
/** Only consume the leading packet */
else
{
packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
int buflen;
buflen = GWBUF_LENGTH(replybuf);
last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf);
/** discard packet */
replybuf = gwbuf_consume(replybuf, buflen);
}
/** Set response status received */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
/** Response is in the buffer and it will be sent to client. */
else if (replybuf != NULL)
@ -2447,9 +2524,11 @@ static bool execute_sescmd_in_backend(
uint8_t* ptr = GWBUF_DATA(tmpbuf);
unsigned char cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Just before write, fd %d : cmd %s.",
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [execute_sescmd_in_backend] Just before write, fd "
"%d : cmd %s.",
pthread_self(),
dcb->fd,
STRPACKETTYPE(cmd))));
}
@ -2466,6 +2545,11 @@ static bool execute_sescmd_in_backend(
case MYSQL_COM_QUERY:
case MYSQL_COM_INIT_DB:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
@ -2745,14 +2829,13 @@ static bool route_session_write(
rses_property_done(prop);
succp = false;
goto return_succp;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
}
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Add sescmd property to router client session */
@ -2767,12 +2850,11 @@ static bool route_session_write(
scur = backend_ref_get_sescmd_cursor(&backend_ref[i]);
/**
* This backend_ref waits reply, flag it.
* Add one waiter to backend reference.
*/
bref_set_state(get_bref_from_dcb(router_cli_ses,
backend_ref[i].bref_dcb),
BREF_WAITING_RESULT);
BREF_WAITING_RESULT);
/**
* Start execution if cursor is not already executing.
* Otherwise, cursor will execute pending commands
@ -2842,6 +2924,7 @@ static void rwsplit_process_options(
c == LEAST_GLOBAL_CONNECTIONS ||
c == LEAST_ROUTER_CONNECTIONS ||
c == LEAST_BEHIND_MASTER ||
c == LEAST_CURRENT_OPERATIONS ||
c == UNDEFINED_CRITERIA);
if (c == UNDEFINED_CRITERIA)
@ -2851,7 +2934,7 @@ static void rwsplit_process_options(
"slave selection criteria \"%s\". "
"Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", "
"LEAST_ROUTER_CONNECTIONS, "
"and \"LEAST_ROUTER_CONNECTIONS\".",
"and \"LEAST_CURRENT_OPERATIONS\".",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
}
else
@ -2887,7 +2970,6 @@ static void handleError (
error_action_t action,
bool* succp)
{
DCB* client_dcb;
SESSION* session;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
@ -2979,7 +3061,6 @@ static bool handle_error_new_connection(
/** failed DCB has already been replaced */
if (bref == NULL)
{
rses_end_locked_router_action(rses);
succp = true;
goto return_succp;
}
@ -2990,7 +3071,6 @@ static bool handle_error_new_connection(
*/
if (backend_dcb->state != DCB_STATE_POLLING)
{
rses_end_locked_router_action(rses);
succp = true;
goto return_succp;
}
@ -3002,7 +3082,7 @@ static bool handle_error_new_connection(
DCB* client_dcb;
client_dcb = ses->client;
client_dcb->func.write(client_dcb, errmsg);
bref_clear_state(bref, BREF_WAITING_RESULT);
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);