Merge branch 'develop' of https://github.com/skysql/MaxScale into develop

This commit is contained in:
VilhoRaatikka
2014-06-30 22:38:15 +03:00
15 changed files with 662 additions and 335 deletions

View File

@ -67,16 +67,16 @@ start() {
sleep 2 sleep 2
my_check=`status -p $MAXSCALE_PIDFILE $MAXSCALE_BIN/maxscale` my_check=`status -p $MAXSCALE_PIDFILE $MAXSCALE_BIN/maxscale`
CHECK_RET=$? CHECK_RET=$?
[ $CHECK_RET -eq 0 ] && echo -n $my_check && success [ $CHECK_RET -eq 0 ] && echo -n $my_check && success || failure
fi fi
echo
# Return rigth code # Return rigth code
if [ $RETVAL -ne 0 ]; then if [ $RETVAL -ne 0 ]; then
failure
RETVAL=$_RETVAL_NOT_RUNNING RETVAL=$_RETVAL_NOT_RUNNING
fi fi
echo
return $RETVAL return $RETVAL
} }

View File

@ -35,6 +35,13 @@ passwd=maxpwd
# Valid options are: # Valid options are:
# #
# router=<name of router module> # router=<name of router module>
# max_slave_connections=<number or percentage>
# router_options=<option[=value]>,<option[=value]>,...
# valid options include:master,slave,synced,
# slave_selection_criteria=
# LEAST_CURRENT_OPERATIONS,
# LEAST_ROUTER_CONNECTIONS,
# LEAST_GLOBAL_CONNECTIONS
# servers=<server name>,<server name>,... # servers=<server name>,<server name>,...
# user=<User to fetch password inforamtion with> # user=<User to fetch password inforamtion with>
# passwd=<Password of the user, plain text currently> # passwd=<Password of the user, plain text currently>

View File

@ -152,6 +152,7 @@ GWBUF *gwbuf_clone_portion(
} }
atomic_add(&buf->sbuf->refcount, 1); atomic_add(&buf->sbuf->refcount, 1);
clonebuf->sbuf = buf->sbuf; clonebuf->sbuf = buf->sbuf;
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone info bits too */
clonebuf->start = (void *)((char*)buf->start)+start_offset; clonebuf->start = (void *)((char*)buf->start)+start_offset;
clonebuf->end = (void *)((char *)clonebuf->start)+length; clonebuf->end = (void *)((char *)clonebuf->start)+length;
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */ clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */
@ -232,12 +233,12 @@ GWBUF *ptr = head;
if (!head) if (!head)
return tail; return tail;
CHK_GWBUF(head); CHK_GWBUF(head);
CHK_GWBUF(tail);
while (ptr->next) while (ptr->next)
{ {
ptr = ptr->next; ptr = ptr->next;
} }
ptr->next = tail; ptr->next = tail;
return head; return head;
} }
@ -316,27 +317,12 @@ gwbuf_trim(GWBUF *buf, unsigned int n_bytes)
return buf; return buf;
} }
bool gwbuf_set_type( void gwbuf_set_type(
GWBUF* buf, GWBUF* buf,
gwbuf_type_t type) gwbuf_type_t type)
{ {
bool succp; CHK_GWBUF(buf);
CHK_GWBUF(buf); buf->gwbuf_type |= type;
switch (type) {
case GWBUF_TYPE_MYSQL:
case GWBUF_TYPE_PLAINSQL:
case GWBUF_TYPE_UNDEFINED:
case GWBUF_TYPE_SINGLE_STMT: /*< buffer contains one stmt */
buf->gwbuf_type |= type;
succp = true;
break;
default:
succp = false;
break;
}
ss_dassert(succp);
return succp;
} }

View File

@ -280,7 +280,8 @@ char *stat;
} }
dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections); dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current);
ptr = ptr->next; dcb_printf(dcb, "\tCurrent no. of operations: %d\n", ptr->stats.n_current_ops);
ptr = ptr->next;
} }
spinlock_release(&server_spin); spinlock_release(&server_spin);
} }
@ -342,6 +343,7 @@ SERVER_PARAM *param;
} }
dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections); dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current);
dcb_printf(dcb, "\tCurrent no. of operations: %d\n", server->stats.n_current_ops);
} }
/** /**

View File

@ -46,16 +46,22 @@
typedef enum typedef enum
{ {
GWBUF_TYPE_UNDEFINED = 0x00, GWBUF_TYPE_UNDEFINED = 0x00,
GWBUF_TYPE_PLAINSQL = 0x01, GWBUF_TYPE_PLAINSQL = 0x01,
GWBUF_TYPE_MYSQL = 0x02, GWBUF_TYPE_MYSQL = 0x02,
GWBUF_TYPE_SINGLE_STMT = 0x04 GWBUF_TYPE_SINGLE_STMT = 0x04,
GWBUF_TYPE_SESCMD_RESPONSE = 0x08,
GWBUF_TYPE_RESPONSE_END = 0x10,
GWBUF_TYPE_SESCMD = 0x20
} gwbuf_type_t; } gwbuf_type_t;
#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) #define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0)
#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) #define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL)
#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) #define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL)
#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT) #define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT)
#define GWBUF_IS_TYPE_SESCMD_RESPONSE(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD_RESPONSE)
#define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END)
#define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD)
/** /**
* A structure to encapsulate the data in a form that the data itself can be * A structure to encapsulate the data in a form that the data itself can be
@ -114,5 +120,5 @@ extern GWBUF *gwbuf_trim(GWBUF *head, unsigned int length);
extern unsigned int gwbuf_length(GWBUF *head); extern unsigned int gwbuf_length(GWBUF *head);
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len); extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type); extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
extern bool gwbuf_set_type(GWBUF *head, gwbuf_type_t type); extern void gwbuf_set_type(GWBUF *head, gwbuf_type_t type);
#endif #endif

View File

@ -297,6 +297,8 @@ bool dcb_set_state(
DCB* dcb, DCB* dcb,
dcb_state_t new_state, dcb_state_t new_state,
dcb_state_t* old_state); dcb_state_t* old_state);
void dcb_call_foreach (DCB_REASON reason);
void dcb_call_foreach ( void dcb_call_foreach (
DCB_REASON reason); DCB_REASON reason);

View File

@ -60,6 +60,7 @@ typedef struct server_params {
typedef struct { typedef struct {
int n_connections; /**< Number of connections */ int n_connections; /**< Number of connections */
int n_current; /**< Current connections */ int n_current; /**< Current connections */
int n_current_ops; /**< Current active operations */
} SERVER_STATS; } SERVER_STATS;
/** /**

View File

@ -66,6 +66,7 @@
#define GW_MYSQL_LOOP_TIMEOUT 300000000 #define GW_MYSQL_LOOP_TIMEOUT 300000000
#define GW_MYSQL_READ 0 #define GW_MYSQL_READ 0
#define GW_MYSQL_WRITE 1 #define GW_MYSQL_WRITE 1
#define MYSQL_HEADER_LEN 4L
#define GW_MYSQL_PROTOCOL_VERSION 10 // version is 10 #define GW_MYSQL_PROTOCOL_VERSION 10 // version is 10
#define GW_MYSQL_HANDSHAKE_FILLER 0x00 #define GW_MYSQL_HANDSHAKE_FILLER 0x00
@ -235,7 +236,8 @@ typedef enum mysql_server_cmd {
MYSQL_COM_STMT_RESET, MYSQL_COM_STMT_RESET,
MYSQL_COM_SET_OPTION, MYSQL_COM_SET_OPTION,
MYSQL_COM_STMT_FETCH, MYSQL_COM_STMT_FETCH,
MYSQL_COM_DAEMON MYSQL_COM_DAEMON,
MYSQL_COM_END /*< Must be the last */
} mysql_server_cmd_t; } mysql_server_cmd_t;
@ -245,9 +247,10 @@ typedef enum mysql_server_cmd {
* one MySQLProtocol and one server command list. * one MySQLProtocol and one server command list.
*/ */
typedef struct server_command_st { typedef struct server_command_st {
mysql_server_cmd_t cmd; mysql_server_cmd_t scom_cmd;
int nresponse_packets; /** filled when reply arrives */ int scom_nresponse_packets; /*< packets in response */
struct server_command_st* next; size_t scom_nbytes_to_read; /*< bytes left to read in current packet */
struct server_command_st* scom_next;
} server_command_t; } server_command_t;
/* /*
@ -262,6 +265,7 @@ typedef struct {
* we are running on */ * we are running on */
SPINLOCK protocol_lock; SPINLOCK protocol_lock;
server_command_t protocol_command; /*< list of active commands */ server_command_t protocol_command; /*< list of active commands */
server_command_t* protocol_cmd_history; /*< command history list */
mysql_auth_state_t protocol_auth_state; /*< Authentication status */ mysql_auth_state_t protocol_auth_state; /*< Authentication status */
uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble, uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble,
* created or received */ * created or received */
@ -285,6 +289,7 @@ typedef struct {
#define MYSQL_GET_STMTOK_NPARAM(payload) (gw_mysql_get_byte2(&payload[9])) #define MYSQL_GET_STMTOK_NPARAM(payload) (gw_mysql_get_byte2(&payload[9]))
#define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11])) #define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11]))
#define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff) #define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff)
#define MYSQL_GET_NATTR(payload) ((int)payload[4])
#endif /** _MYSQL_PROTOCOL_H */ #endif /** _MYSQL_PROTOCOL_H */
@ -365,8 +370,16 @@ void protocol_add_srv_command(MySQLProtocol* p, mysql_server_cmd_t cmd);
void protocol_remove_srv_command(MySQLProtocol* p); void protocol_remove_srv_command(MySQLProtocol* p);
bool protocol_waits_response(MySQLProtocol* p); bool protocol_waits_response(MySQLProtocol* p);
mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep); mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep);
int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd); int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd);
int protocol_get_nresponse_packets (MySQLProtocol* p); bool protocol_get_response_status (MySQLProtocol* p, int* npackets, size_t* nbytes);
bool protocol_set_nresponse_packets (MySQLProtocol* p, int nresponse_packets); void protocol_set_response_status (MySQLProtocol* p, int npackets, size_t nbytes);
void protocol_archive_srv_command(MySQLProtocol* p);
void init_response_status (
GWBUF* buf,
mysql_server_cmd_t cmd,
int* npackets,
size_t* nbytes);

View File

@ -52,13 +52,15 @@ typedef enum prep_stmt_state {
typedef enum bref_state { typedef enum bref_state {
BREF_IN_USE = 0x01, BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */ BREF_WAITING_RESULT = 0x02, /*< for session commands only */
BREF_CLOSED = 0x04 BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
BREF_CLOSED = 0x08
} bref_state_t; } bref_state_t;
#define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE) #define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) #define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT) #define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) (s->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) #define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
typedef enum backend_type_t { typedef enum backend_type_t {
@ -90,9 +92,10 @@ typedef enum rses_property_type_t {
typedef enum select_criteria { typedef enum select_criteria {
UNDEFINED_CRITERIA=0, UNDEFINED_CRITERIA=0,
LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */ LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */
DEFAULT_CRITERIA=LEAST_GLOBAL_CONNECTIONS,
LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */ LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */
LEAST_BEHIND_MASTER, LEAST_BEHIND_MASTER,
LEAST_CURRENT_OPERATIONS,
DEFAULT_CRITERIA=LEAST_CURRENT_OPERATIONS,
LAST_CRITERIA /*< not used except for an index */ LAST_CRITERIA /*< not used except for an index */
} select_criteria_t; } select_criteria_t;
@ -107,7 +110,9 @@ typedef enum select_criteria {
strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \ strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \
LEAST_BEHIND_MASTER : ( \ LEAST_BEHIND_MASTER : ( \
strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \ strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \
LEAST_ROUTER_CONNECTIONS : UNDEFINED_CRITERIA))) LEAST_ROUTER_CONNECTIONS : ( \
strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \
LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA))))
/** /**
* Session variable command * Session variable command
@ -191,6 +196,7 @@ typedef struct backend_ref_st {
BACKEND* bref_backend; BACKEND* bref_backend;
DCB* bref_dcb; DCB* bref_dcb;
bref_state_t bref_state; bref_state_t bref_state;
int bref_num_result_wait;
sescmd_cursor_t bref_sescmd_cur; sescmd_cursor_t bref_sescmd_cur;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail; skygw_chk_t bref_chk_tail;

View File

@ -555,7 +555,12 @@ MONITOR_SERVERS *root_master;
handle->status = MONITOR_STOPPED; handle->status = MONITOR_STOPPED;
return; return;
} }
/* reset num_servers */
num_servers = 0;
/* start from the first server in the list */
ptr = handle->databases; ptr = handle->databases;
while (ptr) while (ptr)
{ {
/* copy server status into monitor pending_status */ /* copy server status into monitor pending_status */
@ -1036,11 +1041,19 @@ static MONITOR_SERVERS *get_replication_tree(MYSQL_MONITOR *handle, int num_serv
ptr = ptr->next; ptr = ptr->next;
} }
/* If root master is in MAINT, return NULL */ /*
if (SERVER_IN_MAINT(handle->master->server)) { * Return the root master
*/
if (handle->master != NULL) {
/* If the root master is in MAINT, return NULL */
if (SERVER_IN_MAINT(handle->master->server)) {
return NULL;
} else {
return handle->master;
}
} else {
return NULL; return NULL;
} else {
return handle->master;
} }
} }

View File

@ -65,6 +65,10 @@ static int gw_backend_hangup(DCB *dcb);
static int backend_write_delayqueue(DCB *dcb); static int backend_write_delayqueue(DCB *dcb);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process);
#if defined(NOT_USED) #if defined(NOT_USED)
static int gw_session(DCB *backend_dcb, void *data); static int gw_session(DCB *backend_dcb, void *data);
#endif #endif
@ -416,21 +420,20 @@ static int gw_read_backend_event(DCB *dcb) {
/* reading MySQL command output from backend and writing to the client */ /* reading MySQL command output from backend and writing to the client */
{ {
GWBUF *readbuf = NULL; GWBUF *read_buffer = NULL;
ROUTER_OBJECT *router = NULL; ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL; ROUTER *router_instance = NULL;
void *rsession = NULL; void *rsession = NULL;
SESSION *session = dcb->session; SESSION *session = dcb->session;
int nbytes_read = 0; int nbytes_read = 0;
mysql_server_cmd_t srvcmd = MYSQL_COM_UNDEFINED;
CHK_SESSION(session); CHK_SESSION(session);
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session; rsession = session->router_session;
/* read available backend data */ /* read available backend data */
rc = dcb_read(dcb, &readbuf); rc = dcb_read(dcb, &read_buffer);
if (rc < 0) if (rc < 0)
{ {
@ -463,7 +466,7 @@ static int gw_read_backend_event(DCB *dcb) {
rc = 0; rc = 0;
goto return_rc; goto return_rc;
} }
nbytes_read = gwbuf_length(readbuf); nbytes_read = gwbuf_length(read_buffer);
if (nbytes_read == 0) if (nbytes_read == 0)
{ {
@ -471,74 +474,41 @@ static int gw_read_backend_event(DCB *dcb) {
} }
else else
{ {
ss_dassert(readbuf != NULL); ss_dassert(read_buffer != NULL);
} }
/** /** Packet prefix was read earlier */
* ask for next response (1 or more packets) like in if (dcb->dcb_readqueue)
* gw_MySQL_get_next_packet but gw_MySQL_get_next_response
*/
srvcmd = protocol_get_srv_command((MySQLProtocol *)dcb->protocol,
false);
/**
* If backend DCB is waiting for response to COM_STMT_PREPARE,
* it, then only that must be passed to clientReply.
*
* If response consists of ses cmd response and response to
* COM_STMT_PREPARE, there can't be anything after
* COM_STMT_PREPARE response because whole buffer may be
* discarded since router doesn't know the borderlines of MySQL
* packets.
*/
/**
* Read all packets from <readbuf> which belong to STMT PREPARE
* response.
* Move packets not belonging to STMT PREPARE response to
* dcb_readqueue.
* When whole response is read, pass <readbuf> forward to
* clientReply.
*/
if (srvcmd == MYSQL_COM_STMT_PREPARE)
{ {
MySQLProtocol* p; read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer);
int nresponse_packets; nbytes_read = gwbuf_length(read_buffer);
GWBUF* tmpbuf;
p = (MySQLProtocol *)dcb->protocol; if (nbytes_read < 5) /*< read at least command type */
nresponse_packets = protocol_get_nresponse_packets(p);
/** count only once per response */
if (nresponse_packets == 0)
{
nresponse_packets = get_stmt_nresponse_packets(
readbuf,
srvcmd);
}
tmpbuf = gw_MySQL_get_packets(&readbuf, &nresponse_packets);
gwbuf_append(dcb->dcb_readqueue, readbuf);
readbuf = tmpbuf;
/** <readbuf> contains incomplete response to STMT PREPARE */
if (nresponse_packets != 0)
{ {
rc = 0; rc = 0;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Backend fd %d read incomplete response packet. "
"Waiting %d more, cmd %s.",
dcb->fd,
nresponse_packets,
STRPACKETTYPE(srvcmd))));
/**
* store the number of how many packets the
* reponse consists of to backend's protocol.
*/
protocol_set_nresponse_packets(p, nresponse_packets);
goto return_rc; goto return_rc;
} }
protocol_remove_srv_command((MySQLProtocol *)dcb->protocol); /** There is at least length and command type. */
else
{
read_buffer = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL;
}
}
else
{
if (nbytes_read < 5)
{
gwbuf_append(dcb->dcb_readqueue, read_buffer);
rc = 0;
goto return_rc;
}
}
/** If protocol has command set it is session command */
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) !=
MYSQL_COM_UNDEFINED)
{
read_buffer = process_response_data(dcb, read_buffer, nbytes_read);
} }
/*< /*<
* If dcb->session->client is freed already it may be NULL. * If dcb->session->client is freed already it may be NULL.
@ -554,20 +524,20 @@ static int gw_read_backend_event(DCB *dcb) {
if (client_protocol->protocol_auth_state == if (client_protocol->protocol_auth_state ==
MYSQL_IDLE) MYSQL_IDLE)
{ {
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL); gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
router->clientReply(
router->clientReply(router_instance, router_instance,
rsession, rsession,
readbuf, read_buffer,
dcb); dcb);
rc = 1; rc = 1;
} }
goto return_rc; goto return_rc;
} }
else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL)
{ {
gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL); gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, readbuf, dcb); router->clientReply(router_instance, rsession, read_buffer, dcb);
rc = 1; rc = 1;
} }
} }
@ -672,7 +642,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
* If auth failed, return value is 0, write and buffered write * If auth failed, return value is 0, write and buffered write
* return 1. * return 1.
*/ */
switch(backend_protocol->protocol_auth_state) { switch (backend_protocol->protocol_auth_state) {
case MYSQL_AUTH_FAILED: case MYSQL_AUTH_FAILED:
{ {
size_t len; size_t len;
@ -712,22 +682,18 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb, dcb,
dcb->fd, dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
/** /**
* Statement type is used in readwrite split router.
* Command is *not* set for readconn router.
*
* Server commands are stored to MySQLProtocol structure * Server commands are stored to MySQLProtocol structure
* if buffer always includes a single statement. That * if buffer always includes a single statement.
* information is stored in GWBUF type field
* (GWBUF_TYPE_SINGLE_STMT bit).
*/ */
if (GWBUF_IS_TYPE_SINGLE_STMT(queue)) if (GWBUF_IS_TYPE_SINGLE_STMT(queue) &&
GWBUF_IS_TYPE_SESCMD(queue))
{ {
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's DCB fd %d "
"cmd %s protocol state %s.",
dcb->fd,
STRPACKETTYPE(cmd),
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */ /** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd); protocol_add_srv_command(backend_protocol, cmd);
} }
@ -751,21 +717,12 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb->fd, dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** /**
* Since it is known that buffer contains one complete * In case of session commands, store command to DCB's
* command, store the command to backend's protocol. When * protocol struct.
* backend server responses the command determines how
* response needs to be processed. This is mainly due to
* MYSQL_COM_STMT_PREPARE whose response consists of
* arbitrary number of packets.
*/ */
if (GWBUF_IS_TYPE_SINGLE_STMT(queue)) if (GWBUF_IS_TYPE_SINGLE_STMT(queue) &&
GWBUF_IS_TYPE_SESCMD(queue))
{ {
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Write to backend's delayqueue fd %d "
"protocol state %s.",
dcb->fd,
STRPROTOCOLSTATE(backend_protocol->protocol_auth_state))));
/** Record the command to backend's protocol */ /** Record the command to backend's protocol */
protocol_add_srv_command(backend_protocol, cmd); protocol_add_srv_command(backend_protocol, cmd);
} }
@ -797,7 +754,6 @@ static int gw_error_backend_event(DCB *dcb)
void* rsession; void* rsession;
ROUTER_OBJECT* router; ROUTER_OBJECT* router;
ROUTER* router_instance; ROUTER* router_instance;
int rc = 0;
GWBUF* errbuf; GWBUF* errbuf;
bool succp; bool succp;
@ -952,7 +908,6 @@ gw_backend_hangup(DCB *dcb)
void* rsession; void* rsession;
ROUTER_OBJECT* router; ROUTER_OBJECT* router;
ROUTER* router_instance; ROUTER* router_instance;
int rc = 0;
bool succp; bool succp;
GWBUF* errbuf; GWBUF* errbuf;
@ -1010,7 +965,6 @@ gw_backend_close(DCB *dcb)
DCB* client_dcb; DCB* client_dcb;
SESSION* session; SESSION* session;
GWBUF* quitbuf; GWBUF* quitbuf;
bool succp;
CHK_DCB(dcb); CHK_DCB(dcb);
session = dcb->session; session = dcb->session;
@ -1095,7 +1049,6 @@ static int backend_write_delayqueue(DCB *dcb)
ROUTER *router_instance = NULL; ROUTER *router_instance = NULL;
void *rsession = NULL; void *rsession = NULL;
SESSION *session = dcb->session; SESSION *session = dcb->session;
int receive_rc = 0;
CHK_SESSION(session); CHK_SESSION(session);
@ -1240,3 +1193,138 @@ static int gw_session(DCB *backend_dcb, void *data) {
return 1; return 1;
} }
*/ */
/**
* Move packets or parts of packets from redbuf to outbuf as the packet headers
* and lengths have been noticed and counted.
* Session commands need to be marked so that they can be handled properly in
* the router's clientReply.
* Return the pointer to outbuf.
*/
static GWBUF* process_response_data (
DCB* dcb,
GWBUF* readbuf,
int nbytes_to_process) /*< number of new bytes read */
{
int npackets_left = 0; /*< response's packet count */
size_t nbytes_left = 0; /*< nbytes to be read for the packet */
MySQLProtocol* p;
GWBUF* outbuf = NULL;
/** Get command which was stored in gw_MySQLWrite_backend */
p = DCB_PROTOCOL(dcb, MySQLProtocol);
CHK_PROTOCOL(p);
/** All buffers processed here are sescmd responses */
gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE);
/**
* Now it is known how many packets there should be and how much
* is read earlier.
*/
while (nbytes_to_process != 0)
{
mysql_server_cmd_t srvcmd;
bool succp;
srvcmd = protocol_get_srv_command(p, false);
/**
* Read values from protocol structure, fails if values are
* uninitialized.
*/
if (npackets_left == 0)
{
succp = protocol_get_response_status(p, &npackets_left, &nbytes_left);
if (!succp || npackets_left == 0)
{
/**
* Examine command type and the readbuf. Conclude response
* packet count from the command type or from the first
* packet content. Fails if read buffer doesn't include
* enough data to read the packet length.
*/
init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left);
}
}
/** Only session commands with responses should be processed */
ss_dassert(npackets_left > 0);
/** Read incomplete packet. */
if (nbytes_left > nbytes_to_process)
{
/** Includes length info so it can be processed */
if (nbytes_to_process >= 5)
{
/** discard source buffer */
readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf));
nbytes_left -= nbytes_to_process;
}
nbytes_to_process = 0;
}
/** Packet was read. All bytes belonged to the last packet. */
else if (nbytes_left == nbytes_to_process)
{
nbytes_left = 0;
nbytes_to_process = 0;
ss_dassert(npackets_left > 0);
npackets_left -= 1;
outbuf = gwbuf_append(outbuf, readbuf);
readbuf = NULL;
}
/**
* Packet was read. There should be more since bytes were
* left over.
* Move the next packet to its own buffer and add that next
* to the prev packet's buffer.
*/
else /*< nbytes_left < nbytes_to_process */
{
nbytes_to_process -= nbytes_left;
/** Move the prefix of the buffer to outbuf from redbuf */
outbuf = gwbuf_append(outbuf, gwbuf_clone_portion(readbuf, 0, nbytes_left));
readbuf = gwbuf_consume(readbuf, nbytes_left);
ss_dassert(npackets_left > 0);
npackets_left -= 1;
nbytes_left = 0;
}
/** Store new status to protocol structure */
protocol_set_response_status(p, npackets_left, nbytes_left);
/** A complete packet was read */
if (nbytes_left == 0)
{
/** No more packets in this response */
if (npackets_left == 0)
{
GWBUF* b = outbuf;
while (b->next != NULL)
{
b = b->next;
}
/** Mark last as end of response */
gwbuf_set_type(b, GWBUF_TYPE_RESPONSE_END);
/** Archive the command */
protocol_archive_srv_command(p);
}
/** Read next packet */
else
{
uint8_t* data;
/** Read next packet length */
data = GWBUF_DATA(readbuf);
nbytes_left = MYSQL_GET_PACKET_LEN(data)+MYSQL_HEADER_LEN;
/** Store new status to protocol structure */
protocol_set_response_status(p, npackets_left, nbytes_left);
}
}
}
return outbuf;
}

View File

@ -574,9 +574,8 @@ int gw_read_client_event(
else else
{ {
uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer); uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer);
size_t packetlen = MYSQL_GET_PACKET_LEN(data)+4;
if (nbytes_read < 3 || nbytes_read < packetlen) if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)+4)
{ {
gwbuf_append(dcb->dcb_readqueue, read_buffer); gwbuf_append(dcb->dcb_readqueue, read_buffer);
rc = 0; rc = 0;
@ -1291,11 +1290,12 @@ return_rc:
static int gw_error_client_event( static int gw_error_client_event(
DCB* dcb) DCB* dcb)
{ {
int rc;
SESSION* session; SESSION* session;
CHK_DCB(dcb); CHK_DCB(dcb);
session = dcb->session;
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [gw_error_client_event] Error event handling for DCB %p " "%lu [gw_error_client_event] Error event handling for DCB %p "
@ -1362,7 +1362,6 @@ gw_client_close(DCB *dcb)
static int static int
gw_client_hangup_event(DCB *dcb) gw_client_hangup_event(DCB *dcb)
{ {
int rc;
SESSION* session; SESSION* session;
CHK_DCB(dcb); CHK_DCB(dcb);

View File

@ -41,6 +41,9 @@ extern int gw_write_backend_event(DCB *dcb);
extern int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue); extern int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue);
extern int gw_error_backend_event(DCB *dcb); extern int gw_error_backend_event(DCB *dcb);
static server_command_t* server_command_init(server_command_t* srvcmd,
mysql_server_cmd_t cmd);
/** /**
* Creates MySQL protocol structure * Creates MySQL protocol structure
@ -77,7 +80,9 @@ MySQLProtocol* mysql_protocol_init(
goto return_p; goto return_p;
} }
p->protocol_auth_state = MYSQL_ALLOC; p->protocol_auth_state = MYSQL_ALLOC;
p->protocol_command.cmd = MYSQL_COM_UNDEFINED; p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
p->protocol_command.scom_nresponse_packets = 0;
p->protocol_command.scom_nbytes_to_read = 0;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
p->protocol_chk_top = CHK_NUM_PROTOCOL; p->protocol_chk_top = CHK_NUM_PROTOCOL;
p->protocol_chk_tail = CHK_NUM_PROTOCOL; p->protocol_chk_tail = CHK_NUM_PROTOCOL;
@ -1557,6 +1562,94 @@ GWBUF* gw_MySQL_get_packets(
} }
static server_command_t* server_command_init(
server_command_t* srvcmd,
mysql_server_cmd_t cmd)
{
server_command_t* c;
if (srvcmd != NULL)
{
c = srvcmd;
}
else
{
c = (server_command_t *)malloc(sizeof(server_command_t));
}
c->scom_cmd = cmd;
c->scom_nresponse_packets = -1;
c->scom_nbytes_to_read = 0;
c->scom_next = NULL;
return c;
}
static server_command_t* server_command_copy(
server_command_t* srvcmd)
{
server_command_t* c =
(server_command_t *)malloc(sizeof(server_command_t));
*c = *srvcmd;
return c;
}
#define MAX_CMD_HISTORY 10
void protocol_archive_srv_command(
MySQLProtocol* p)
{
server_command_t* s1;
server_command_t** s2;
int len;
spinlock_acquire(&p->protocol_lock);
s1 = &p->protocol_command;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Move command %s from fd %d to command history.",
STRPACKETTYPE(s1->scom_cmd),
p->owner_dcb->fd)));
/** Copy to history list */
s2 = &p->protocol_cmd_history;
if (*s2 != NULL)
{
len = 0;
while ((*s2)->scom_next != NULL)
{
*s2 = (*s2)->scom_next;
len += 1;
}
}
*s2 = server_command_copy(s1);
/** Keep history limits, remove oldest */
if (len > MAX_CMD_HISTORY)
{
server_command_t* c = p->protocol_cmd_history;
p->protocol_cmd_history = p->protocol_cmd_history->scom_next;
free(c);
}
/** Remove from command list */
if (s1->scom_next == NULL)
{
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
}
else
{
p->protocol_command = *(s1->scom_next);
free(s1->scom_next);
}
spinlock_release(&p->protocol_lock);
}
/** /**
* If router expects to get separate, complete statements, add MySQL command * If router expects to get separate, complete statements, add MySQL command
* to MySQLProtocol structure. It is removed when response has arrived. * to MySQLProtocol structure. It is removed when response has arrived.
@ -1565,49 +1658,42 @@ void protocol_add_srv_command(
MySQLProtocol* p, MySQLProtocol* p,
mysql_server_cmd_t cmd) mysql_server_cmd_t cmd)
{ {
spinlock_acquire(&p->protocol_lock); server_command_t* c;
if (p->protocol_command.cmd == MYSQL_COM_UNDEFINED) spinlock_acquire(&p->protocol_lock);
/** this is the only server command in protocol */
if (p->protocol_command.scom_cmd == MYSQL_COM_UNDEFINED)
{ {
p->protocol_command.cmd = cmd; /** write into structure */
p->protocol_command.nresponse_packets = 0; server_command_init(&p->protocol_command, cmd);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Added command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
} }
else else
{ {
server_command_t* c = /** add to the end of list */
(server_command_t *)malloc(sizeof(server_command_t)); p->protocol_command.scom_next = server_command_init(NULL, cmd);
c->cmd = cmd; }
c->nresponse_packets = 0;
c->next = NULL; LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
p->protocol_command.next = c; "Added command %s to fd %d.",
STRPACKETTYPE(cmd),
p->owner_dcb->fd)));
#if defined(SS_DEBUG)
c = &p->protocol_command;
while (c != NULL && c->scom_cmd != MYSQL_COM_UNDEFINED)
{
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"Added another command %s to fd %d.", "fd %d : %d %s",
STRPACKETTYPE(cmd), p->owner_dcb->fd,
p->owner_dcb->fd))); c->scom_cmd,
#if defined(SS_DEBUG) STRPACKETTYPE(c->scom_cmd))));
c = &p->protocol_command; c = c->scom_next;
while (c != NULL && c->cmd != MYSQL_COM_UNDEFINED)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"fd %d : %d %s",
p->owner_dcb->fd,
c->cmd,
STRPACKETTYPE(c->cmd))));
c = c->next;
}
#endif
} }
#endif
spinlock_release(&p->protocol_lock); spinlock_release(&p->protocol_lock);
} }
@ -1628,17 +1714,17 @@ void protocol_remove_srv_command(
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"Removed command %s from fd %d.", "Removed command %s from fd %d.",
STRPACKETTYPE(s->cmd), STRPACKETTYPE(s->scom_cmd),
p->owner_dcb->fd))); p->owner_dcb->fd)));
if (s->next == NULL) if (s->scom_next == NULL)
{ {
p->protocol_command.cmd = MYSQL_COM_UNDEFINED; p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
} }
else else
{ {
p->protocol_command = *(s->next); p->protocol_command = *(s->scom_next);
free(s->next); free(s->scom_next);
} }
spinlock_release(&p->protocol_lock); spinlock_release(&p->protocol_lock);
@ -1650,97 +1736,133 @@ mysql_server_cmd_t protocol_get_srv_command(
{ {
mysql_server_cmd_t cmd; mysql_server_cmd_t cmd;
cmd = p->protocol_command.cmd; cmd = p->protocol_command.scom_cmd;
if (removep) if (removep)
{ {
protocol_remove_srv_command(p); protocol_remove_srv_command(p);
} }
LOGIF(LT, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_DEBUG,
"Read command %s for fd %d.", "%lu [protocol_get_srv_command] Read command %s for fd %d.",
pthread_self(),
STRPACKETTYPE(cmd), STRPACKETTYPE(cmd),
p->owner_dcb->fd))); p->owner_dcb->fd)));
return cmd; return cmd;
} }
/**
* Return how many packets are included in the server's response. /**
* Examine command type and the readbuf. Conclude response
* packet count from the command type or from the first packet
* content.
* Fails if read buffer doesn't include enough data to read the
* packet length.
*/ */
int get_stmt_nresponse_packets( void init_response_status (
GWBUF* buf, GWBUF* buf,
mysql_server_cmd_t cmd) mysql_server_cmd_t cmd,
int* npackets,
size_t* nbytes_left)
{ {
int npackets;
uint8_t* packet; uint8_t* packet;
int nparam; int nparam;
int nattr; int nattr;
uint8_t* data; uint8_t* data;
switch (cmd) { ss_dassert(gwbuf_length(buf) >= 3);
case MYSQL_COM_STMT_PREPARE:
data = (uint8_t *)buf->start; data = (uint8_t *)buf->start;
if (data[4] == 0xff) if (data[4] == 0xff) /*< error */
{ {
npackets = 1; /*< error packet */ *npackets = 1;
} }
else else
{ {
switch (cmd) {
case MYSQL_COM_STMT_PREPARE:
packet = (uint8_t *)GWBUF_DATA(buf); packet = (uint8_t *)GWBUF_DATA(buf);
/** ok + nparam + eof + nattr + eof */ /** ok + nparam + eof + nattr + eof */
nparam = MYSQL_GET_STMTOK_NPARAM(packet); nparam = MYSQL_GET_STMTOK_NPARAM(packet);
nattr = MYSQL_GET_STMTOK_NATTR(packet); nattr = MYSQL_GET_STMTOK_NATTR(packet);
npackets = 1 + nparam + MIN(1, nparam) + *npackets = 1 + nparam + MIN(1, nparam) +
nattr + MIN(nattr, 1); nattr + MIN(nattr, 1);
ss_dassert(npackets<128); break;
}
break; case MYSQL_COM_QUIT:
case MYSQL_COM_STMT_SEND_LONG_DATA:
default: case MYSQL_COM_STMT_CLOSE:
npackets = 1; *npackets = 0; /*< these don't reply anything */
break; break;
default:
/**
* assume that other session commands respond
* OK or ERR
*/
*npackets = 1;
break;
}
} }
ss_dassert(npackets<128); *nbytes_left = MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN;
return npackets; /**
* There is at least one complete packet in the buffer so buffer is bigger
* than packet
*/
ss_dassert(*nbytes_left > 0);
ss_dassert(*npackets > 0);
ss_dassert(*npackets<128);
} }
int protocol_get_nresponse_packets (
MySQLProtocol* p)
{
int rval;
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
rval = p->protocol_command.nresponse_packets;
spinlock_release(&p->protocol_lock);
ss_dassert(rval<128);
return rval;
}
bool protocol_set_nresponse_packets (
/**
* Read how many packets are left from current response and how many bytes there
* is still to be read from the current packet.
*/
bool protocol_get_response_status (
MySQLProtocol* p, MySQLProtocol* p,
int nresponse_packets) int* npackets,
size_t* nbytes)
{ {
bool succp; bool succp;
CHK_PROTOCOL(p); CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock); spinlock_acquire(&p->protocol_lock);
if (p->protocol_command.nresponse_packets > 0 && *npackets = p->protocol_command.scom_nresponse_packets;
nresponse_packets > p->protocol_command.nresponse_packets) *nbytes = p->protocol_command.scom_nbytes_to_read;
spinlock_release(&p->protocol_lock);
if (*npackets < 0 && *nbytes == 0)
{ {
succp = false; succp = false;
} }
else else
{ {
p->protocol_command.nresponse_packets = nresponse_packets;
ss_dassert(nresponse_packets<128);
succp = true; succp = true;
} }
spinlock_release(&p->protocol_lock);
return succp; return succp;
} }
void protocol_set_response_status (
MySQLProtocol* p,
int npackets_left,
size_t nbytes)
{
CHK_PROTOCOL(p);
spinlock_acquire(&p->protocol_lock);
p->protocol_command.scom_nbytes_to_read = nbytes;
ss_dassert(p->protocol_command.scom_nbytes_to_read >= 0);
p->protocol_command.scom_nresponse_packets = npackets_left;
spinlock_release(&p->protocol_lock);
}

View File

@ -238,7 +238,7 @@ char *weightby;
} }
inst->servers[n]->server = server; inst->servers[n]->server = server;
inst->servers[n]->current_connection_count = 0; inst->servers[n]->current_connection_count = 0;
inst->servers[n]->weight = 100; inst->servers[n]->weight = 1000;
n++; n++;
} }
inst->servers[n] = NULL; inst->servers[n] = NULL;
@ -267,7 +267,9 @@ char *weightby;
int perc; int perc;
backend = inst->servers[n]; backend = inst->servers[n];
perc = (atoi(serverGetParameter(backend->server, perc = (atoi(serverGetParameter(backend->server,
weightby)) * 100) / total; weightby)) * 1000) / total;
if (perc == 0)
perc = 1;
backend->weight = perc; backend->weight = perc;
if (perc == 0) if (perc == 0)
{ {
@ -453,18 +455,18 @@ BACKEND *master_host = NULL;
candidate = inst->servers[i]; candidate = inst->servers[i];
} }
else if ((inst->servers[i]->current_connection_count else if ((inst->servers[i]->current_connection_count
* 100) / inst->servers[i]->weight < * 1000) / inst->servers[i]->weight <
(candidate->current_connection_count * (candidate->current_connection_count *
100) / candidate->weight) 1000) / candidate->weight)
{ {
/* This running server has fewer /* This running server has fewer
connections, set it as a new candidate */ connections, set it as a new candidate */
candidate = inst->servers[i]; candidate = inst->servers[i];
} }
else if ((inst->servers[i]->current_connection_count else if ((inst->servers[i]->current_connection_count
* 100) / inst->servers[i]->weight == * 1000) / inst->servers[i]->weight ==
(candidate->current_connection_count * (candidate->current_connection_count *
100) / candidate->weight && 1000) / candidate->weight &&
inst->servers[i]->server->stats.n_connections < inst->servers[i]->server->stats.n_connections <
candidate->server->stats.n_connections) candidate->server->stats.n_connections)
{ {
@ -756,9 +758,9 @@ char *weightby;
for (i = 0; router_inst->servers[i]; i++) for (i = 0; router_inst->servers[i]; i++)
{ {
backend = router_inst->servers[i]; backend = router_inst->servers[i];
dcb_printf(dcb, "\t\t%-20s %3d%% %d\n", dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n",
backend->server->unique_name, backend->server->unique_name,
backend->weight, (float)backend->weight / 10,
backend->current_connection_count); backend->current_connection_count);
} }

View File

@ -116,12 +116,18 @@ int bref_cmp_behind_master(
const void* bref1, const void* bref1,
const void* bref2); const void* bref2);
int bref_cmp_current_load(
const void* bref1,
const void* bref2);
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)= int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
{ {
NULL, NULL,
bref_cmp_global_conn, bref_cmp_global_conn,
bref_cmp_router_conn, bref_cmp_router_conn,
bref_cmp_behind_master bref_cmp_behind_master,
bref_cmp_current_load
}; };
static bool select_connect_backend_servers( static bool select_connect_backend_servers(
@ -212,10 +218,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
static bool sescmd_cursor_next( static bool sescmd_cursor_next(
sescmd_cursor_t* scur); sescmd_cursor_t* scur);
static GWBUF* sescmd_cursor_process_replies( static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref);
DCB* client_dcb,
GWBUF* replybuf,
backend_ref_t* bref);
static void tracelog_routed_query( static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
@ -484,7 +487,6 @@ static void* newSession(
int max_slave_rlag; /*< max allowed replication lag for any slave */ int max_slave_rlag; /*< max allowed replication lag for any slave */
int i; int i;
const int min_nservers = 1; /*< hard-coded for now */ const int min_nservers = 1; /*< hard-coded for now */
static uint64_t router_client_ses_seq; /*< ID for client session */
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
@ -1093,13 +1095,15 @@ static int routeQuery(
{ {
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
{ {
backend_ref_t* bref;
atomic_add(&inst->stats.n_slave, 1); atomic_add(&inst->stats.n_slave, 1);
/** /**
* This backend_ref waits resultset, flag it. * Add one query response waiter to backend reference
*/ */
bref_set_state(get_bref_from_dcb(router_cli_ses, bref = get_bref_from_dcb(router_cli_ses, slave_dcb);
slave_dcb), bref_set_state(bref, BREF_QUERY_ACTIVE);
BREF_WAITING_RESULT); bref_set_state(bref, BREF_WAITING_RESULT);
} }
else else
{ {
@ -1144,18 +1148,21 @@ static int routeQuery(
{ {
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER); succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
} }
if (succp) if (succp)
{ {
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
{ {
backend_ref_t* bref;
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
/** /**
* This backend_ref waits reply to write stmt, * Add one write response waiter to backend reference
* flag it.
*/ */
bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb), bref = get_bref_from_dcb(router_cli_ses, master_dcb);
BREF_WAITING_RESULT); bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
} }
} }
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -1358,8 +1365,7 @@ static void clientReply (
scur = &bref->bref_sescmd_cur; scur = &bref->bref_sescmd_cur;
/** /**
* Active cursor means that reply is from session command * Active cursor means that reply is from session command
* execution. Majority of the time there are no session commands * execution.
* being executed.
*/ */
if (sescmd_cursor_is_active(scur)) if (sescmd_cursor_is_active(scur))
{ {
@ -1370,8 +1376,7 @@ static void clientReply (
(uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf)); (uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
size_t len = MYSQL_GET_PACKET_LEN(buf); size_t len = MYSQL_GET_PACKET_LEN(buf);
char* cmdstr = (char *)malloc(len+1); char* cmdstr = (char *)malloc(len+1);
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
snprintf(cmdstr, len+1, "%s", &buf[5]); snprintf(cmdstr, len+1, "%s", &buf[5]);
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
@ -1383,30 +1388,51 @@ static void clientReply (
free(cmdstr); free(cmdstr);
/** Inform the client */ /** Inform the client */
handle_error_reply_client(ses,writebuf); handle_error_reply_client(ses, writebuf);
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
goto lock_failed; goto lock_failed;
} }
else else if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf))
{ {
/** /**
* Discard all those responses that have already been sent to * Discard all those responses that have already been sent to
* the client. Return with buffer including response that * the client. Return with buffer including response that
* needs to be sent to client or NULL. * needs to be sent to client or NULL.
*/ */
writebuf = sescmd_cursor_process_replies(client_dcb, writebuf = sescmd_cursor_process_replies(writebuf, bref);
writebuf, }
bref); else
{
ss_dassert(false);
}
/**
* If response will be sent to client, decrease waiter count.
* This applies to session commands only. Counter decrement
* for other type of queries is done outside this block.
*/
if (writebuf != NULL && client_dcb != NULL)
{
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
} }
} }
/**
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
* This applies for queries other than session commands.
*/
else if (BREF_IS_QUERY_ACTIVE(bref))
{
bref_clear_state(bref, BREF_QUERY_ACTIVE);
/** Set response status as replied */
bref_clear_state(bref, BREF_WAITING_RESULT);
}
if (writebuf != NULL && client_dcb != NULL) if (writebuf != NULL && client_dcb != NULL)
{ {
/** Write reply to client DCB */ /** Write reply to client DCB */
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
bref_clear_state(bref, BREF_WAITING_RESULT);
} }
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -1417,7 +1443,7 @@ static void clientReply (
/** Log to debug that router was closed */ /** Log to debug that router was closed */
goto lock_failed; goto lock_failed;
} }
/** There is one pending session command to be xexecuted. */ /** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur)) if (sescmd_cursor_is_active(scur))
{ {
bool succp; bool succp;
@ -1430,6 +1456,8 @@ static void clientReply (
bref->bref_backend->backend_server->port))); bref->bref_backend->backend_server->port)));
succp = execute_sescmd_in_backend(bref); succp = execute_sescmd_in_backend(bref);
ss_dassert(succp);
} }
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -1469,21 +1497,84 @@ int bref_cmp_behind_master(
return 1; return 1;
} }
int bref_cmp_current_load(
const void* bref1,
const void* bref2)
{
SERVER* s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server;
SERVER* s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server;
return ((s1->stats.n_current_ops < s2->stats.n_current_ops) ? -1 :
((s1->stats.n_current_ops > s2->stats.n_current_ops) ? 1 : 0));
}
static void bref_clear_state( static void bref_clear_state(
backend_ref_t* bref, backend_ref_t* bref,
bref_state_t state) bref_state_t state)
{ {
bref->bref_state &= ~state; if (state != BREF_WAITING_RESULT)
{
bref->bref_state &= ~state;
}
else
{
int prev1;
int prev2;
/** Decrease waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
ss_dassert(prev1 > 0);
/** Decrease global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
ss_dassert(prev2 > 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d after in %s:%d",
prev1-1,
prev2-1,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
}
} }
static void bref_set_state( static void bref_set_state(
backend_ref_t* bref, backend_ref_t* bref,
bref_state_t state) bref_state_t state)
{ {
bref->bref_state |= state; if (state != BREF_WAITING_RESULT)
LOGIF(LT, (skygw_log_write( {
LOGFILE_TRACE, bref->bref_state |= state;
"Set state %d for %s:%d fd %d", }
else
{
int prev1;
int prev2;
/** Increase waiter count */
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
ss_dassert(prev1 >= 0);
/** Increase global operation count */
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d before in %s:%d",
prev1,
prev2,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
}
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [bref_set_state] Set state %d for %s:%d fd %d",
pthread_self(),
bref->bref_state, bref->bref_state,
bref->bref_backend->backend_server->name, bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port, bref->bref_backend->backend_server->port,
@ -1662,6 +1753,13 @@ static bool select_connect_backend_servers(
b->backend_conn_count))); b->backend_conn_count)));
break; break;
case LEAST_CURRENT_OPERATIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->stats.n_current_ops)));
break;
default: default:
break; break;
} }
@ -1681,13 +1779,11 @@ static bool select_connect_backend_servers(
LOGIF(LT, (skygw_log_write_flush( LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"Examine server " "Examine server "
"%s:%d %s with %d connections. " "%s:%d %s with %d active operations.",
"router->bitvalue is %d",
b->backend_server->name, b->backend_server->name,
b->backend_server->port, b->backend_server->port,
STRSRVSTATUS(b->backend_server), STRSRVSTATUS(b->backend_server),
b->backend_conn_count, b->backend_server->stats.n_current_ops)));
router->bitmask)));
if (SERVER_IS_RUNNING(b->backend_server) && if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) == ((b->backend_server->status & router->bitmask) ==
@ -2174,13 +2270,9 @@ static void mysql_sescmd_done(
* 9. s+q+ * 9. s+q+
*/ */
static GWBUF* sescmd_cursor_process_replies( static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf, GWBUF* replybuf,
backend_ref_t* bref) backend_ref_t* bref)
{ {
const size_t headerlen = 4; /*< mysql packet header */
size_t packetlen;
uint8_t* packet;
mysql_sescmd_t* scmd; mysql_sescmd_t* scmd;
sescmd_cursor_t* scur; sescmd_cursor_t* scur;
@ -2188,7 +2280,6 @@ static GWBUF* sescmd_cursor_process_replies(
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock))); ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
scmd = sescmd_cursor_get_command(scur); scmd = sescmd_cursor_get_command(scur);
CHK_DCB(client_dcb);
CHK_GWBUF(replybuf); CHK_GWBUF(replybuf);
/** /**
@ -2200,35 +2291,21 @@ static GWBUF* sescmd_cursor_process_replies(
/** Faster backend has already responded to client : discard */ /** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied) if (scmd->my_sescmd_is_replied)
{ {
bool last_packet = false;
CHK_GWBUF(replybuf); CHK_GWBUF(replybuf);
packet = (uint8_t *)GWBUF_DATA(replybuf);
/** while (!last_packet)
* If it is response to MYSQL_COM_STMT_PREPARE, then buffer
* only includes the response.
*/
if (scmd->my_sescmd_prop->rses_prop_data.sescmd.my_sescmd_packet_type ==
MYSQL_COM_STMT_PREPARE)
{ {
while (replybuf != NULL) int buflen;
{
#if defined(SS_DEBUG) buflen = GWBUF_LENGTH(replybuf);
int buflen; last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf);
/** discard packet */
buflen = GWBUF_LENGTH(replybuf); replybuf = gwbuf_consume(replybuf, buflen);
replybuf = gwbuf_consume(replybuf, buflen);
#else
replybuf = gwbuf_consume(
replybuf,
GWBUF_LENGTH(replybuf));
#endif
}
}
/** Only consume the leading packet */
else
{
packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
} }
/** Set response status received */
bref_clear_state(bref, BREF_WAITING_RESULT);
} }
/** Response is in the buffer and it will be sent to client. */ /** Response is in the buffer and it will be sent to client. */
else if (replybuf != NULL) else if (replybuf != NULL)
@ -2430,9 +2507,11 @@ static bool execute_sescmd_in_backend(
uint8_t* ptr = GWBUF_DATA(tmpbuf); uint8_t* ptr = GWBUF_DATA(tmpbuf);
unsigned char cmd = MYSQL_GET_COMMAND(ptr); unsigned char cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LT, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_DEBUG,
"Just before write, fd %d : cmd %s.", "%lu [execute_sescmd_in_backend] Just before write, fd "
"%d : cmd %s.",
pthread_self(),
dcb->fd, dcb->fd,
STRPACKETTYPE(cmd)))); STRPACKETTYPE(cmd))));
} }
@ -2449,6 +2528,11 @@ static bool execute_sescmd_in_backend(
case MYSQL_COM_QUERY: case MYSQL_COM_QUERY:
case MYSQL_COM_INIT_DB: case MYSQL_COM_INIT_DB:
default: default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
rc = dcb->func.write( rc = dcb->func.write(
dcb, dcb,
sescmd_cursor_clone_querybuf(scur)); sescmd_cursor_clone_querybuf(scur));
@ -2728,14 +2812,13 @@ static bool route_session_write(
rses_property_done(prop); rses_property_done(prop);
succp = false; succp = false;
goto return_succp; goto return_succp;
} }
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/** /**
* Additional reference is created to querybuf to * Additional reference is created to querybuf to
* prevent it from being released before properties * prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up. * are cleaned up as a part of router sessionclean-up.
*/ */
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Add sescmd property to router client session */ /** Add sescmd property to router client session */
@ -2750,12 +2833,11 @@ static bool route_session_write(
scur = backend_ref_get_sescmd_cursor(&backend_ref[i]); scur = backend_ref_get_sescmd_cursor(&backend_ref[i]);
/** /**
* This backend_ref waits reply, flag it. * Add one waiter to backend reference.
*/ */
bref_set_state(get_bref_from_dcb(router_cli_ses, bref_set_state(get_bref_from_dcb(router_cli_ses,
backend_ref[i].bref_dcb), backend_ref[i].bref_dcb),
BREF_WAITING_RESULT); BREF_WAITING_RESULT);
/** /**
* Start execution if cursor is not already executing. * Start execution if cursor is not already executing.
* Otherwise, cursor will execute pending commands * Otherwise, cursor will execute pending commands
@ -2825,6 +2907,7 @@ static void rwsplit_process_options(
c == LEAST_GLOBAL_CONNECTIONS || c == LEAST_GLOBAL_CONNECTIONS ||
c == LEAST_ROUTER_CONNECTIONS || c == LEAST_ROUTER_CONNECTIONS ||
c == LEAST_BEHIND_MASTER || c == LEAST_BEHIND_MASTER ||
c == LEAST_CURRENT_OPERATIONS ||
c == UNDEFINED_CRITERIA); c == UNDEFINED_CRITERIA);
if (c == UNDEFINED_CRITERIA) if (c == UNDEFINED_CRITERIA)
@ -2834,7 +2917,7 @@ static void rwsplit_process_options(
"slave selection criteria \"%s\". " "slave selection criteria \"%s\". "
"Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", " "Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", "
"LEAST_ROUTER_CONNECTIONS, " "LEAST_ROUTER_CONNECTIONS, "
"and \"LEAST_ROUTER_CONNECTIONS\".", "and \"LEAST_CURRENT_OPERATIONS\".",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria)))); STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
} }
else else
@ -2870,7 +2953,6 @@ static void handleError (
error_action_t action, error_action_t action,
bool* succp) bool* succp)
{ {
DCB* client_dcb;
SESSION* session; SESSION* session;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
@ -2963,7 +3045,6 @@ static bool handle_error_new_connection(
/** failed DCB has already been replaced */ /** failed DCB has already been replaced */
if (bref == NULL) if (bref == NULL)
{ {
rses_end_locked_router_action(rses);
succp = true; succp = true;
goto return_succp; goto return_succp;
} }
@ -2974,7 +3055,6 @@ static bool handle_error_new_connection(
*/ */
if (backend_dcb->state != DCB_STATE_POLLING) if (backend_dcb->state != DCB_STATE_POLLING)
{ {
rses_end_locked_router_action(rses);
succp = true; succp = true;
goto return_succp; goto return_succp;
} }
@ -2986,7 +3066,7 @@ static bool handle_error_new_connection(
DCB* client_dcb; DCB* client_dcb;
client_dcb = ses->client; client_dcb = ses->client;
client_dcb->func.write(client_dcb, errmsg); client_dcb->func.write(client_dcb, errmsg);
bref_clear_state(bref, BREF_WAITING_RESULT); bref_clear_state(bref, BREF_WAITING_RESULT);
} }
bref_clear_state(bref, BREF_IN_USE); bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED); bref_set_state(bref, BREF_CLOSED);