Addition of DISCONNECT ALL and DISCONNECT SERVER

Addition of DISCONNECT ALL and DISCONNECT SERVER server_id
Added dbc_close in closeSession
Changed slave mode output display in diagnostics
This commit is contained in:
MassimilianoPinto 2015-02-17 18:29:03 +01:00
parent dc65c9e1cc
commit 517524a8b4
2 changed files with 237 additions and 5 deletions

View File

@ -34,6 +34,7 @@
* Date Who Description
* 02/04/2014 Mark Riddoch Initial implementation
* 17/02/2015 Massimiliano Pinto Addition of slave port and username in diagnostics
* 18/02/2015 Massimiliano Pinto Addition of dcb_close in closeSession
*
* @endverbatim
*/
@ -626,6 +627,14 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
/* Unlock */
rses_end_locked_router_action(slave);
/**
* Close the slave server connection
*/
if (slave->dcb != NULL) {
CHK_DCB(slave->dcb);
dcb_close(slave->dcb);
}
}
}
@ -915,18 +924,21 @@ struct tm tm;
dcb_printf(dcb, "\t\tSeconds behind master %u\n", router_inst->lastEventTimestamp - session->lastEventTimestamp);
}
if ((session->cstate & CS_UPTODATE) == 0)
if (session->state == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
dcb_printf(dcb, "\t\tSlave_mode: connected\n");
}
else if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave_mode: catchup. %s%s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."),
((session->cstate & CS_BUSY) == 0 ? "" :
" Busy in slave catchup."));
}
else
{
dcb_printf(dcb, "\t\tSlave is in normal mode.\n");
dcb_printf(dcb, "\t\tSlave_mode: follow\n");
if (session->binlog_pos != router_inst->binlog_position)
{
dcb_printf(dcb, "\t\tSlave reports up to date however "

View File

@ -31,8 +31,9 @@
* @verbatim
* Revision History
*
* Date Who Description
* Date Who Description
* 14/04/2014 Mark Riddoch Initial implementation
* 18/02/2015 Massimiliano Pinto Addition of DISCONNECT ALL and DISCONNECT SERVER server_id
*
* @endverbatim
*/
@ -76,6 +77,9 @@ static int blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *sla
static int blr_slave_send_fieldcount(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int count);
static int blr_slave_send_columndef(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
static int blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno);
static int blr_slave_send_disconnected_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id, int found);
static int blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_disconnect_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id);
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
@ -395,6 +399,35 @@ int query_len;
}
}
}
else if (strcasecmp(query_text, "DISCONNECT") == 0)
{
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete DISCONNECT command.",
router->service->name)));
}
else if (strcasecmp(word, "ALL") == 0)
{
free(query_text);
spinlock_release(&router->lock);
return blr_slave_disconnect_all(router, slave);
}
else if (strcasecmp(word, "SERVER") == 0)
{
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Expected DISCONNECT SERVER $server_id",
router->service->name)));
} else {
free(query_text);
return blr_slave_disconnect_server(router, slave, atoi(word));
}
}
}
free(query_text);
query_text = strndup(qtext, query_len);
@ -1837,3 +1870,190 @@ uint8_t *ptr;
encode_value(ptr, 2, 16); // Autocommit enabled
return slave->dcb->func.write(slave->dcb, pkt);
}
/**
* Send the reply only to the SQL command "DISCONNECT SERVER $server_id'
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @return Non-zero if data was sent
*/
static int
blr_slave_send_disconnected_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id, int found)
{
GWBUF *pkt;
char state[40];
char serverid[40];
uint8_t *ptr;
int len, id_len, seqno = 2;
blr_slave_send_fieldcount(router, slave, 2);
blr_slave_send_columndef(router, slave, "server_id", 0x03, 40, seqno++);
blr_slave_send_columndef(router, slave, "state", 0xf, 40, seqno++);
blr_slave_send_eof(router, slave, seqno++);
sprintf(serverid, "%d", server_id);
id_len = strlen(serverid);
if (found)
strcpy(state, "disconnected");
else
strcpy(state, "not found");
len = 5 + id_len + strlen(state) + 1;
if ((pkt = gwbuf_alloc(len)) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, id_len + 2 + strlen(state), 24); // Add length of data packet
ptr += 3;
*ptr++ = seqno++; // Sequence number in response
*ptr++ = id_len; // Length of result string
strncpy((char *)ptr, serverid, id_len); // Result string
ptr += id_len;
*ptr++ = strlen(state); // Length of result string
strncpy((char *)ptr, state, strlen(state)); // Result string
ptr += strlen(state);
slave->dcb->func.write(slave->dcb, pkt);
return blr_slave_send_eof(router, slave, seqno++);
}
/**
* Send the response to the SQL command "DISCONNECT SERVER $server_id'
* and close the connection to that server
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @param server_id The slave server_id to disconnect
* @return Non-zero if data was sent to the client
*/
static int
blr_slave_disconnect_server(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int server_id)
{
ROUTER_OBJECT *router_obj= router->service->router;
ROUTER_SLAVE *sptr;
int n;
int server_found = 0;
spinlock_acquire(&router->lock);
sptr = router->slaves;
/* look for server_id among all registered slaves */
while (sptr)
{
/* don't examine slaves with state = 0 */
if (sptr->state != 0 && sptr->serverid == server_id)
{
/* server_id found */
server_found = 1;
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "DISCONNECT SERVER: closing [%s], server id [%d]",
sptr->dcb->remote, server_id)));
/* send server_id with disconnect state to client */
n = blr_slave_send_disconnected_server(router, slave, server_id, 1);
/* force session close for matched slave */
router_obj->closeSession(router->service->router_instance, sptr);
break;
} else {
sptr = sptr->next;
}
}
spinlock_release(&router->lock);
/** server id was not found
* send server_id with not found state to the client
*/
if (!server_found)
{
n = blr_slave_send_disconnected_server(router, slave, server_id, 0);
}
return n;
}
/**
* Send the response to the SQL command "DISCONNECT ALL'
* and close the connection to all slave servers
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @return Non-zero if data was sent to the client
*/
static int
blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
ROUTER_OBJECT *router_obj= router->service->router;
ROUTER_SLAVE *sptr;
char server_id[40];
char state[40];
uint8_t *ptr;
int len, seqno;
GWBUF *pkt;
int n = 0;
/* preparing output result */
blr_slave_send_fieldcount(router, slave, 2);
blr_slave_send_columndef(router, slave, "server_id", 0x03, 40, 2);
blr_slave_send_columndef(router, slave, "state", 0xf, 40, 3);
blr_slave_send_eof(router, slave, 4);
seqno = 5;
spinlock_acquire(&router->lock);
sptr = router->slaves;
while (sptr)
{
/* skip servers with state = 0 */
if (sptr->state != 0)
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "DISCONNECT ALL: closing [%s], server_id [%d]",
sptr->dcb->remote, sptr->serverid)));
sprintf(server_id, "%d", sptr->serverid);
sprintf(state, "disconnected");
len = 5 + strlen(server_id) + strlen(state) + 1;
if ((pkt = gwbuf_alloc(len)) == NULL) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Error: gwbuf memory allocation in "
"DISCONNECT ALL for [%s], server_id [%d]",
sptr->dcb->remote, sptr->serverid)));
spinlock_release(&router->lock);
return 0;
}
ptr = GWBUF_DATA(pkt);
encode_value(ptr, len - 4, 24); // Add length of data packet
ptr += 3;
*ptr++ = seqno++; // Sequence number in response
*ptr++ = strlen(server_id); // Length of result string
strncpy((char *)ptr, server_id, strlen(server_id)); // Result string
ptr += strlen(server_id);
*ptr++ = strlen(state); // Length of result string
strncpy((char *)ptr, state, strlen(state)); // Result string
ptr += strlen(state);
n = slave->dcb->func.write(slave->dcb, pkt);
/* force session close*/
router_obj->closeSession(router->service->router_instance, sptr);
}
sptr = sptr->next;
}
spinlock_release(&router->lock);
blr_slave_send_eof(router, slave, seqno);
return n;
}