Improved error messages

Support for running out of disk space in the binlog router

Support for COM_PING & COM_STATISTICS added in the binlog router

Addition of binlogdir router option
This commit is contained in:
Mark Riddoch 2014-12-16 10:38:09 +00:00
parent df1ff25be4
commit 909518fac7
6 changed files with 277 additions and 10 deletions

View File

@ -81,7 +81,7 @@ setipaddress(struct in_addr *a, char *p) {
if ((rc = getaddrinfo(p, NULL, &hint, &ai)) != 0) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : getaddrinfo failed for [%s] due [%s]",
"Error: Failed to obtain address for host %s, %s",
p,
gai_strerror(rc))));
@ -94,7 +94,7 @@ setipaddress(struct in_addr *a, char *p) {
if ((rc = getaddrinfo(p, NULL, &hint, &ai)) != 0) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : getaddrinfo failed for [%s] due [%s]",
"Error: Failed to obtain address for host %s, %s",
p,
gai_strerror(rc))));

View File

@ -161,6 +161,7 @@ typedef struct router_slave {
int binlog_pos; /*< Binlog position for this slave */
char binlogfile[BINLOG_FNAMELEN+1];
/*< Current binlog file for this slave */
char *uuid; /*< Slave UUID */
BLFILE *file; /*< Currently open binlog file */
int serverid; /*< Server-id of the slave */
char *hostname; /*< Hostname of the slave, if known */
@ -227,6 +228,8 @@ typedef struct {
GWBUF *utf8; /*< Set NAMES utf8 */
GWBUF *select1; /*< select 1 */
GWBUF *selectver; /*< select version() */
GWBUF *selectvercom; /*< select @@version_comment */
GWBUF *selecthostname;/*< select @@hostname */
uint8_t *fde_event; /*< Format Description Event */
int fde_len; /*< Length of fde_event */
} MASTER_RESPONSES;
@ -300,16 +303,19 @@ typedef struct router_instance {
#define BLRM_UTF8 0x000C
#define BLRM_SELECT1 0x000D
#define BLRM_SELECTVER 0x000E
#define BLRM_REGISTER 0x000F
#define BLRM_BINLOGDUMP 0x0010
#define BLRM_SELECTVERCOM 0x000F
#define BLRM_SELECTHOSTNAME 0x0010
#define BLRM_REGISTER 0x0011
#define BLRM_BINLOGDUMP 0x0012
#define BLRM_MAXSTATE 0x0010
#define BLRM_MAXSTATE 0x0012
static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval",
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
"Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
"select version()", "Register slave", "Binlog Dump" };
"select version()", "select @@version_comment", "select @@hostname",
"Register slave", "Binlog Dump" };
#define BLRS_CREATED 0x0000
#define BLRS_UNREGISTERED 0x0001
@ -338,6 +344,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
*/
#define COM_QUIT 0x01
#define COM_QUERY 0x03
#define COM_STATISTICS 0x09
#define COM_PING 0x0e
#define COM_REGISTER_SLAVE 0x15
#define COM_BINLOG_DUMP 0x12

View File

@ -432,6 +432,8 @@ ROUTER_SLAVE *slave;
slave->cstate = 0;
slave->pthread = 0;
slave->overrun = 0;
slave->uuid = NULL;
slave->hostname = NULL;
spinlock_init(&slave->catch_lock);
slave->dcb = session->client;
slave->router = inst;
@ -786,8 +788,10 @@ struct tm tm;
session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
if (session->uuid)
dcb_printf(dcb, "\t\tSlave UUID: %s\n", session->uuid);
dcb_printf(dcb,
"\t\tSlave: %d\n",
"\t\tSlave: %s\n",
session->dcb->remote);
dcb_printf(dcb,
"\t\tSlave DCB: %p\n",
@ -1043,3 +1047,143 @@ ROUTER_SLAVE *slave;
}
spinlock_release(&router->lock);
}
/**
* Return some basic statistics from the router in response to a COM_STATISTICS
* request.
*
* @param router The router instance
* @param slave The "slave" connection that requested the statistics
* @param queue The statistics request
*
* @return non-zero on sucessful send
*/
int
blr_statistics(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
char result[1000], *ptr;
GWBUF *ret;
int len;
snprintf(result, 1000,
"Uptime: %u Threads: %u Events: %u Slaves: %u",
time(0) - router->connect_time,
config_threadcount(),
router->stats.n_binlogs_ses,
router->stats.n_slaves);
if ((ret = gwbuf_alloc(4 + strlen(result))) == NULL)
return 0;
len = strlen(result);
ptr = GWBUF_DATA(ret);
*ptr++ = len & 0xff;
*ptr++ = (len & 0xff00) >> 8;
*ptr++ = (len & 0xff0000) >> 16;
*ptr++ = 1;
strncpy(ptr, result, len);
return slave->dcb->func.write(slave->dcb, ret);
}
int
blr_ping(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
char *ptr;
GWBUF *ret;
int len;
if ((ret = gwbuf_alloc(5)) == NULL)
return 0;
ptr = GWBUF_DATA(ret);
*ptr++ = 0x01;
*ptr++ = 0;
*ptr++ = 0;
*ptr++ = 1;
*ptr = 0; // OK
return slave->dcb->func.write(slave->dcb, ret);
}
/**
* mysql_send_custom_error
*
* Send a MySQL protocol Generic ERR message, to the dcb
* Note the errno and state are still fixed now
*
* @param dcb Owner_Dcb Control Block for the connection to which the OK is sent
* @param packet_number
* @param in_affected_rows
* @param msg
* @return 1 Non-zero if data was sent
*
*/
int
blr_send_custom_error(DCB *dcb, int packet_number, int affected_rows, char *msg)
{
uint8_t *outbuf = NULL;
uint32_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];
uint8_t *mysql_payload = NULL;
uint8_t field_count = 0;
uint8_t mysql_err[2];
uint8_t mysql_statemsg[6];
unsigned int mysql_errno = 0;
const char *mysql_error_msg = NULL;
const char *mysql_state = NULL;
GWBUF *errbuf = NULL;
mysql_errno = 2003;
mysql_error_msg = "An errorr occurred ...";
mysql_state = "HY000";
field_count = 0xff;
gw_mysql_set_byte2(mysql_err, mysql_errno);
mysql_statemsg[0]='#';
memcpy(mysql_statemsg+1, mysql_state, 5);
if (msg != NULL) {
mysql_error_msg = msg;
}
mysql_payload_size = sizeof(field_count) +
sizeof(mysql_err) +
sizeof(mysql_statemsg) +
strlen(mysql_error_msg);
/** allocate memory for packet header + payload */
errbuf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size);
ss_dassert(errbuf != NULL);
if (errbuf == NULL)
{
return 0;
}
outbuf = GWBUF_DATA(errbuf);
/** write packet header and packet number */
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
mysql_packet_header[3] = packet_number;
/** write header */
memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header));
mysql_payload = outbuf + sizeof(mysql_packet_header);
/** write field */
memcpy(mysql_payload, &field_count, sizeof(field_count));
mysql_payload = mysql_payload + sizeof(field_count);
/** write errno */
memcpy(mysql_payload, mysql_err, sizeof(mysql_err));
mysql_payload = mysql_payload + sizeof(mysql_err);
/** write sqlstate */
memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg));
mysql_payload = mysql_payload + sizeof(mysql_statemsg);
/** write error message */
memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg));
return dcb->func.write(dcb, errbuf);
}

View File

@ -253,10 +253,11 @@ int n;
hdr->next_pos - hdr->event_size)) != hdr->event_size)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Failed to write binlog record at %d of %s. "
"%s: Failed to write binlog record at %d of %s, %s. "
"Truncating to previous record.",
router->service->name, hdr->next_pos - hdr->event_size,
router->binlog_name)));
router->binlog_name,
strerror(errno))));
/* Remove any partual event that was written */
ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size);
return 0;

View File

@ -438,6 +438,20 @@ char query[128];
case BLRM_SELECTVER:
// Response to SELECT VERSION should be stored
router->saved_master.selectver = buf;
buf = blr_make_query("SELECT @@version_comment limit 1;");
router->master_state = BLRM_SELECTVERCOM;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECTVERCOM:
// Response to SELECT @@version_comment should be stored
router->saved_master.selectvercom = buf;
buf = blr_make_query("SELECT @@hostname;");
router->master_state = BLRM_SELECTHOSTNAME;
router->master->func.write(router->master, buf);
break;
case BLRM_SELECTHOSTNAME:
// Response to SELECT @@hostname should be stored
router->saved_master.selecthostname = buf;
buf = blr_make_registration(router);
router->master_state = BLRM_REGISTER;
router->master->func.write(router->master, buf);

View File

@ -111,12 +111,20 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
case COM_BINLOG_DUMP:
return blr_slave_binlog_dump(router, slave, queue);
break;
case COM_STATISTICS:
return blr_statistics(router, slave, queue);
break;
case COM_PING:
return blr_ping(router, slave, queue);
break;
case COM_QUIT:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"COM_QUIT received from slave with server_id %d",
slave->serverid)));
break;
default:
blr_send_custom_error(slave->dcb, 1, 0,
"MySQL command not supported by the binlog router.");
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Unexpected MySQL Command (%d) received from slave",
@ -133,12 +141,14 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
* when MaxScale registered as a slave. The exception to the rule is the
* request to obtain the current timestamp value of the server.
*
* Five select statements are currently supported:
* Seven select statements are currently supported:
* SELECT UNIX_TIMESTAMP();
* SELECT @master_binlog_checksum
* SELECT @@GLOBAL.GTID_MODE
* SELECT VERSION()
* SELECT 1
* SELECT @@version_comment limit 1
* SELECT @@hostname
*
* Two show commands are supported:
* SHOW VARIABLES LIKE 'SERVER_ID'
@ -208,6 +218,16 @@ int query_len;
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.selectver);
}
else if (strcasecmp(word, "@@version_comment") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.selectvercom);
}
else if (strcasecmp(word, "@@hostname") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.selecthostname);
}
}
else if (strcasecmp(word, "SHOW") == 0)
{
@ -251,6 +271,8 @@ int query_len;
}
else if (strcasecmp(word, "@slave_uuid") == 0)
{
if ((word = strtok_r(NULL, sep, &brkb)) != NULL)
slave->uuid = strdup(word);
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.setslaveuuid);
}
@ -1075,3 +1097,81 @@ uint32_t chksum;
encode_value(ptr, chksum, 32);
slave->dcb->func.write(slave->dcb, head);
}
/**
* Send the field count packet in a response packet sequence.
*
* @param router The router
* @param slave The slave connection
* @param count Number of columns in the result set
* @return Non-zero on success
*/
static int
blr_slave_send_fieldcount(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int count)
{
GWBUF *pkt;
uint8_t *ptr;
if ((pkt = gwbuf_alloc(5)) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, 1, 24); // Add length of data packet
ptr += 3;
*ptr++ = 0x01; // Sequence number in response
*ptr++ = count; // Length of result string
return slave->dcb->func.write(slave->dcb, pkt);
}
/**
* Send the column definition packet in a response packet sequence.
*
* @param router The router
* @param slave The slave connection
* @param name Name of the column
* @param type Column type
* @param len Column length
* @param seqno Packet sequence number
* @return Non-zero on success
*/
static int
blr_slave_send_columndef(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno)
{
GWBUF *pkt;
uint8_t *ptr;
if ((pkt = gwbuf_alloc(26 + strlen(name))) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, 22 + strlen(name), 24); // Add length of data packet
ptr += 3;
*ptr++ = seqno; // Sequence number in response
*ptr++ = 3; // Catalog is always def
*ptr++ = 'd';
*ptr++ = 'e';
*ptr++ = 'f';
*ptr++ = 0; // Schema name length
*ptr++ = 0; // virtal table name length
*ptr++ = 0; // Table name length
*ptr++ = strlen(name); // Column name length;
while (*name)
*ptr++ = *name++; // Copy the column name
*ptr++ = 0; // Orginal column name
*ptr++ = 0x0c; // Length of next fields always 12
*ptr++ = 0x3f; // Character set
*ptr++ = 0;
encode_value(ptr, len, 32); // Add length of column
ptr += 4;
*ptr++ = type;
*ptr++ = 0x81; // Two bytes of flags
if (type == 0xfd)
*ptr++ = 0x1f;
else
*ptr++ = 0x00;
*ptr++= 0;
*ptr++= 0;
*ptr++= 0;
return slave->dcb->func.write(slave->dcb, pkt);
}