Add support for:
select @@maxscale_version; show variables like "maxscale%" show master status show slave hosts
This commit is contained in:
@ -52,6 +52,7 @@
|
||||
#include <skygw_types.h>
|
||||
#include <skygw_utils.h>
|
||||
#include <log_manager.h>
|
||||
#include <version.h>
|
||||
|
||||
static uint32_t extract_field(uint8_t *src, int bits);
|
||||
static void encode_value(unsigned char *data, unsigned int value, int len);
|
||||
@ -66,6 +67,13 @@ uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
|
||||
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
|
||||
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static int blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static int blr_slave_send_maxscale_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static int blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static int blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
static int blr_slave_send_fieldcount(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int count);
|
||||
static int blr_slave_send_columndef(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
|
||||
static int blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno);
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
@ -141,6 +149,10 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
* when MaxScale registered as a slave. The exception to the rule is the
|
||||
* request to obtain the current timestamp value of the server.
|
||||
*
|
||||
* The original set added for the registration process has been enhanced in
|
||||
* order to support some commands that are useful for monitoring the binlog
|
||||
* router.
|
||||
*
|
||||
* Eight select statements are currently supported:
|
||||
* SELECT UNIX_TIMESTAMP();
|
||||
* SELECT @master_binlog_checksum
|
||||
@ -150,10 +162,14 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
* SELECT @@version_comment limit 1
|
||||
* SELECT @@hostname
|
||||
* SELECT @@max_allowed_packet
|
||||
* SELECT @@maxscale_version
|
||||
*
|
||||
* Two show commands are supported:
|
||||
* Five show commands are supported:
|
||||
* SHOW VARIABLES LIKE 'SERVER_ID'
|
||||
* SHOW VARIABLES LIKE 'SERVER_UUID'
|
||||
* SHOW VARIABLES LIKE 'MAXSCALE%
|
||||
* SHOW MASTER STATUS
|
||||
* SHOW SLAVE HOSTS
|
||||
*
|
||||
* Five set commands are supported:
|
||||
* SET @master_binlog_checksum = @@global.binlog_checksum
|
||||
@ -243,6 +259,11 @@ int query_len;
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.map);
|
||||
}
|
||||
else if (strcasecmp(word, "@@maxscale_version") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_send_maxscale_version(router, slave);
|
||||
}
|
||||
}
|
||||
else if (strcasecmp(word, "SHOW") == 0)
|
||||
{
|
||||
@ -263,7 +284,7 @@ int query_len;
|
||||
{
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Missing LIKE clause in SHOW VARIABLES.",
|
||||
router->service->name)));
|
||||
}
|
||||
@ -277,6 +298,44 @@ int query_len;
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.uuid);
|
||||
}
|
||||
else if (strcasecmp(word, "'MAXSCALE%'") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_send_maxscale_variables(router, slave);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (strcasecmp(word, "MASTER") == 0)
|
||||
{
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Expected SHOW MASTER STATUS command",
|
||||
router->service->name)));
|
||||
}
|
||||
else if (strcasecmp(word, "STATUS") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_send_master_status(router, slave);
|
||||
}
|
||||
}
|
||||
else if (strcasecmp(word, "SLAVE") == 0)
|
||||
{
|
||||
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"%s: Expected SHOW MASTER STATUS command",
|
||||
router->service->name)));
|
||||
}
|
||||
else if (strcasecmp(word, "STATUS") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_send_master_status(router, slave);
|
||||
}
|
||||
else if (strcasecmp(word, "HOSTS") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_send_slave_hosts(router, slave);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -449,6 +508,203 @@ int len, ts_len;
|
||||
return slave->dcb->func.write(slave->dcb, pkt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a response the the SQL command SELECT @@MAXSCALE_VERSION
|
||||
*
|
||||
* @param router The binlog router instance
|
||||
* @param slave The slave server to which we are sending the response
|
||||
* @return Non-zero if data was sent
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
char version[40];
|
||||
uint8_t *ptr;
|
||||
int len, vers_len;
|
||||
|
||||
sprintf(version, "%s", MAXSCALE_VERSION);
|
||||
vers_len = strlen(version);
|
||||
blr_slave_send_fieldcount(router, slave, 1);
|
||||
blr_slave_send_columndef(router, slave, "MAXSCALE_VERSION", 0xf, vers_len, 2);
|
||||
blr_slave_send_eof(router, slave, 3);
|
||||
|
||||
len = 5 + vers_len;
|
||||
if ((pkt = gwbuf_alloc(len)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, vers_len + 1, 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = 0x04; // Sequence number in response
|
||||
*ptr++ = vers_len; // Length of result string
|
||||
strncpy((char *)ptr, version, vers_len); // Result string
|
||||
ptr += vers_len;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
return blr_slave_send_eof(router, slave, 5);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send the response to the SQL command "SHOW VARIABLES LIKE 'MAXSCALE%'
|
||||
*
|
||||
* @param router The binlog router instance
|
||||
* @param slave The slave server to which we are sending the response
|
||||
* @return Non-zero if data was sent
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_maxscale_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
char name[40];
|
||||
char version[40];
|
||||
uint8_t *ptr;
|
||||
int len, vers_len, seqno = 2;
|
||||
|
||||
blr_slave_send_fieldcount(router, slave, 2);
|
||||
blr_slave_send_columndef(router, slave, "Variable_name", 0xf, 40, seqno++);
|
||||
blr_slave_send_columndef(router, slave, "value", 0xf, 40, seqno++);
|
||||
blr_slave_send_eof(router, slave, seqno++);
|
||||
|
||||
sprintf(version, "%s", MAXSCALE_VERSION);
|
||||
vers_len = strlen(version);
|
||||
strcpy(name, "MAXSCALE_VERSION");
|
||||
len = 5 + vers_len + strlen(name) + 1;
|
||||
if ((pkt = gwbuf_alloc(len)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, vers_len + 2 + strlen(name), 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = seqno++; // Sequence number in response
|
||||
*ptr++ = strlen(name); // Length of result string
|
||||
strncpy((char *)ptr, name, strlen(name)); // Result string
|
||||
ptr += strlen(name);
|
||||
*ptr++ = vers_len; // Length of result string
|
||||
strncpy((char *)ptr, version, vers_len); // Result string
|
||||
ptr += vers_len;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
|
||||
return blr_slave_send_eof(router, slave, seqno++);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send the response to the SQL command "SHOW MASTER STATUS"
|
||||
*
|
||||
* @param router The binlog router instance
|
||||
* @param slave The slave server to which we are sending the response
|
||||
* @return Non-zero if data was sent
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
char file[40];
|
||||
char position[40];
|
||||
uint8_t *ptr;
|
||||
int len, file_len;
|
||||
|
||||
blr_slave_send_fieldcount(router, slave, 5);
|
||||
blr_slave_send_columndef(router, slave, "File", 0xf, 40, 2);
|
||||
blr_slave_send_columndef(router, slave, "Position", 0xf, 40, 3);
|
||||
blr_slave_send_columndef(router, slave, "Binlog_Do_DB", 0xf, 40, 4);
|
||||
blr_slave_send_columndef(router, slave, "Binlog_Ignore_DB", 0xf, 40, 5);
|
||||
blr_slave_send_columndef(router, slave, "Execute_Gtid_Set", 0xf, 40, 6);
|
||||
blr_slave_send_eof(router, slave, 7);
|
||||
|
||||
sprintf(file, "%s", router->binlog_name);
|
||||
file_len = strlen(file);
|
||||
sprintf(position, "%d", router->binlog_position);
|
||||
len = 5 + file_len + strlen(position) + 1 + 3;
|
||||
if ((pkt = gwbuf_alloc(len)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, len - 4, 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = 0x08; // Sequence number in response
|
||||
*ptr++ = strlen(file); // Length of result string
|
||||
strncpy((char *)ptr, file, strlen(file)); // Result string
|
||||
ptr += strlen(file);
|
||||
*ptr++ = strlen(position); // Length of result string
|
||||
strncpy((char *)ptr, position, strlen(position)); // Result string
|
||||
ptr += strlen(position);
|
||||
*ptr++ = 0; // Send 3 empty values
|
||||
*ptr++ = 0;
|
||||
*ptr++ = 0;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
return blr_slave_send_eof(router, slave, 9);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the response to the SQL command "SHOW SLAVE HOSTS"
|
||||
*
|
||||
* @param router The binlog router instance
|
||||
* @param slave The slave server to which we are sending the response
|
||||
* @return Non-zero if data was sent
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
char server_id[40];
|
||||
char host[40];
|
||||
char port[40];
|
||||
char master_id[40];
|
||||
char slave_uuid[40];
|
||||
uint8_t *ptr;
|
||||
int len, seqno;
|
||||
ROUTER_SLAVE *sptr;
|
||||
|
||||
blr_slave_send_fieldcount(router, slave, 5);
|
||||
blr_slave_send_columndef(router, slave, "Server_id", 0xf, 40, 2);
|
||||
blr_slave_send_columndef(router, slave, "Host", 0xf, 40, 3);
|
||||
blr_slave_send_columndef(router, slave, "Port", 0xf, 40, 4);
|
||||
blr_slave_send_columndef(router, slave, "Master_id", 0xf, 40, 5);
|
||||
blr_slave_send_columndef(router, slave, "Slave_UUID", 0xf, 40, 6);
|
||||
blr_slave_send_eof(router, slave, 7);
|
||||
|
||||
seqno = 8;
|
||||
spinlock_acquire(&router->lock);
|
||||
sptr = router->slaves;
|
||||
while (sptr)
|
||||
{
|
||||
if (sptr->state != 0)
|
||||
{
|
||||
sprintf(server_id, "%d", sptr->serverid);
|
||||
sprintf(host, "%s", sptr->hostname ? sptr->hostname : "");
|
||||
sprintf(port, "%d", sptr->port);
|
||||
sprintf(master_id, "%d", router->serverid);
|
||||
sprintf(slave_uuid, "%s", sptr->uuid);
|
||||
len = 5 + strlen(server_id) + strlen(host) + strlen(port)
|
||||
+ strlen(master_id) + strlen(slave_uuid) + 5;
|
||||
if ((pkt = gwbuf_alloc(len)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, len - 4, 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = seqno++; // Sequence number in response
|
||||
*ptr++ = strlen(server_id); // Length of result string
|
||||
strncpy((char *)ptr, server_id, strlen(server_id)); // Result string
|
||||
ptr += strlen(server_id);
|
||||
*ptr++ = strlen(host); // Length of result string
|
||||
strncpy((char *)ptr, host, strlen(host)); // Result string
|
||||
ptr += strlen(host);
|
||||
*ptr++ = strlen(port); // Length of result string
|
||||
strncpy((char *)ptr, port, strlen(port)); // Result string
|
||||
ptr += strlen(port);
|
||||
*ptr++ = strlen(master_id); // Length of result string
|
||||
strncpy((char *)ptr, master_id, strlen(master_id)); // Result string
|
||||
ptr += strlen(master_id);
|
||||
*ptr++ = strlen(slave_uuid); // Length of result string
|
||||
strncpy((char *)ptr, slave_uuid, strlen(slave_uuid)); // Result string
|
||||
ptr += strlen(slave_uuid);
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
}
|
||||
sptr = sptr->next;
|
||||
}
|
||||
spinlock_release(&router->lock);
|
||||
return blr_slave_send_eof(router, slave, seqno);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a slave replication registration message.
|
||||
*
|
||||
@ -722,7 +978,7 @@ uint8_t *ptr;
|
||||
* call. The paramter "long" control the number of events in the burst. The
|
||||
* short burst is intended to be used when the master receive an event and
|
||||
* needs to put the slave into catchup mode. This prevents the slave taking
|
||||
* too much tiem away from the thread that is processing the master events.
|
||||
* too much time away from the thread that is processing the master events.
|
||||
*
|
||||
* At the end of the burst a fake EPOLLOUT event is added to the poll event
|
||||
* queue. This ensures that the slave callback for processing DCB write drain
|
||||
@ -1184,7 +1440,7 @@ uint8_t *ptr;
|
||||
*ptr++ = 'e';
|
||||
*ptr++ = 'f';
|
||||
*ptr++ = 0; // Schema name length
|
||||
*ptr++ = 0; // virtal table name length
|
||||
*ptr++ = 0; // virtual table name length
|
||||
*ptr++ = 0; // Table name length
|
||||
*ptr++ = strlen(name); // Column name length;
|
||||
while (*name)
|
||||
@ -1206,3 +1462,31 @@ uint8_t *ptr;
|
||||
*ptr++= 0;
|
||||
return slave->dcb->func.write(slave->dcb, pkt);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Send an EOF packet in a response packet sequence.
|
||||
*
|
||||
* @param router The router
|
||||
* @param slave The slave connection
|
||||
* @param seqno The sequence number of the EOF packet
|
||||
* @return Non-zero on success
|
||||
*/
|
||||
static int
|
||||
blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno)
|
||||
{
|
||||
GWBUF *pkt;
|
||||
uint8_t *ptr;
|
||||
|
||||
if ((pkt = gwbuf_alloc(9)) == NULL)
|
||||
return 0;
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
encode_value(ptr, 5, 24); // Add length of data packet
|
||||
ptr += 3;
|
||||
*ptr++ = seqno; // Sequence number in response
|
||||
*ptr++ = 0xfe; // Length of result string
|
||||
encode_value(ptr, 0, 16); // No errors
|
||||
ptr += 2;
|
||||
encode_value(ptr, 2, 16); // Autocommit enabled
|
||||
return slave->dcb->func.write(slave->dcb, pkt);
|
||||
}
|
||||
|
Reference in New Issue
Block a user