Reformat binlog router.

This commit is contained in:
Johan Wikman
2016-03-14 09:29:06 +02:00
parent 59f5880898
commit 5070b81473
6 changed files with 9050 additions and 7847 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -92,12 +92,14 @@ void blr_master_close(ROUTER_INSTANCE *);
char *blr_extract_column(GWBUF *buf, int col);
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
void poll_fake_write_event(DCB *dcb);
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end);
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr,
unsigned long long pos_end);
static void blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
extern char * blr_last_event_description(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code);
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state,
unsigned int err_code);
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
@ -122,10 +124,13 @@ DCB *client;
spinlock_acquire(&router->lock);
if (router->master_state != BLRM_UNCONNECTED)
{
if (router->master_state != BLRM_SLAVE_STOPPED) {
if (router->master_state != BLRM_SLAVE_STOPPED)
{
MXS_ERROR("%s: Master Connect: Unexpected master state %s\n",
router->service->name, blrm_states[router->master_state]);
} else {
}
else
{
MXS_NOTICE("%s: Master Connect: binlog state is %s\n",
router->service->name, blrm_states[router->master_state]);
}
@ -159,8 +164,7 @@ DCB *client;
if ((router->master = dcb_connect(router->service->dbref->server, router->session, BLR_PROTOCOL)) == NULL)
{
char *name;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master") + 1)) != NULL)
if ((name = malloc(strlen(router->service->name) + strlen(" Master") + 1)) != NULL)
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
@ -168,7 +172,9 @@ DCB *client;
free(name);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
{
router->retry_backoff = BLR_MAX_BACKOFF;
}
MXS_ERROR("Binlog router: failed to connect to master server '%s'",
router->service->dbref->server->unique_name);
return;
@ -182,7 +188,9 @@ DCB *client;
router->connect_time = time(0);
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
{
perror("setsockopt");
}
router->master_state = BLRM_AUTHENTICATED;
router->master->func.write(router->master, blr_make_query("SELECT UNIX_TIMESTAMP()"));
@ -222,8 +230,7 @@ blr_restart_master(ROUTER_INSTANCE *router)
router->master_state = BLRM_UNCONNECTED;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master")+1)) != NULL)
if ((name = malloc(strlen(router->service->name) + strlen(" Master") + 1)) != NULL)
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
@ -231,8 +238,10 @@ blr_restart_master(ROUTER_INSTANCE *router)
free(name);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
{
router->retry_backoff = BLR_MAX_BACKOFF;
}
}
else
{
router->master_state = BLRM_UNCONNECTED;
@ -256,7 +265,9 @@ blr_master_reconnect(ROUTER_INSTANCE *router)
int do_reconnect = 0;
if (router->master_state == BLRM_SLAVE_STOPPED)
{
return;
}
spinlock_acquire(&router->lock);
if (router->active_logs)
@ -307,8 +318,7 @@ blr_master_delayed_connect(ROUTER_INSTANCE *router)
{
char *name;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master Recovery")+1)) != NULL)
if ((name = malloc(strlen(router->service->name) + strlen(" Master Recovery") + 1)) != NULL)
{
sprintf(name, "%s Master Recovery", router->service->name);
hktask_oneshot(name, blr_start_master, router, 60);
@ -401,7 +411,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
/* set mysql error message */
if (router->m_errmsg)
{
free(router->m_errmsg);
}
router->m_errmsg = msg_err;
router->active_logs = 0;
@ -432,12 +444,15 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
// Response to fetch of master's server-id
if (router->saved_master.server_id)
{
GWBUF_CONSUME_ALL(router->saved_master.server_id);
}
router->saved_master.server_id = buf;
blr_cache_response(router, "serverid", buf);
// set router->masterid from master server-id if it's not set by the config option
if (router->masterid == 0) {
if (router->masterid == 0)
{
router->masterid = atoi(val);
}
@ -454,7 +469,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_HBPERIOD:
// Response to set the heartbeat period
if (router->saved_master.heartbeat)
{
GWBUF_CONSUME_ALL(router->saved_master.heartbeat);
}
router->saved_master.heartbeat = buf;
blr_cache_response(router, "heartbeat", buf);
buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
@ -464,7 +481,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_CHKSUM1:
// Response to set the master binlog checksum
if (router->saved_master.chksum1)
{
GWBUF_CONSUME_ALL(router->saved_master.chksum1);
}
router->saved_master.chksum1 = buf;
blr_cache_response(router, "chksum1", buf);
buf = blr_make_query("SELECT @master_binlog_checksum");
@ -480,17 +499,24 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
router->master_chksum = false;
}
if (val)
{
free(val);
}
// Response to the master_binlog_checksum, should be stored
if (router->saved_master.chksum2)
{
GWBUF_CONSUME_ALL(router->saved_master.chksum2);
}
router->saved_master.chksum2 = buf;
blr_cache_response(router, "chksum2", buf);
if (router->mariadb10_compat) {
if (router->mariadb10_compat)
{
buf = blr_make_query("SET @mariadb_slave_capability=4");
router->master_state = BLRM_MARIADB10;
} else {
}
else
{
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
router->master_state = BLRM_GTIDMODE;
}
@ -500,7 +526,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_MARIADB10:
// Response to the SET @mariadb_slave_capability=4, should be stored
if (router->saved_master.mariadb10)
{
GWBUF_CONSUME_ALL(router->saved_master.mariadb10);
}
router->saved_master.mariadb10 = buf;
blr_cache_response(router, "mariadb10", buf);
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
@ -510,7 +538,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_GTIDMODE:
// Response to the GTID_MODE, should be stored
if (router->saved_master.gtid_mode)
{
GWBUF_CONSUME_ALL(router->saved_master.gtid_mode);
}
router->saved_master.gtid_mode = buf;
blr_cache_response(router, "gtidmode", buf);
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
@ -524,21 +554,30 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
key = blr_extract_column(buf, 1);
if (key && strlen(key))
{
val = blr_extract_column(buf, 2);
}
if (key)
{
free(key);
}
/* set the master_uuid from master if not set by the option */
if (router->set_master_uuid == NULL) {
if (router->set_master_uuid == NULL)
{
free(router->master_uuid);
router->master_uuid = val;
} else {
}
else
{
router->master_uuid = router->set_master_uuid;
}
// Response to the SERVER_UUID, should be stored
if (router->saved_master.uuid)
{
GWBUF_CONSUME_ALL(router->saved_master.uuid);
}
router->saved_master.uuid = buf;
blr_cache_response(router, "uuid", buf);
sprintf(query, "SET @slave_uuid='%s'", router->uuid);
@ -550,7 +589,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_SUUID:
// Response to the SET @server_uuid, should be stored
if (router->saved_master.setslaveuuid)
{
GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid);
}
router->saved_master.setslaveuuid = buf;
blr_cache_response(router, "ssuuid", buf);
buf = blr_make_query("SET NAMES latin1");
@ -560,7 +601,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_LATIN1:
// Response to the SET NAMES latin1, should be stored
if (router->saved_master.setnames)
{
GWBUF_CONSUME_ALL(router->saved_master.setnames);
}
router->saved_master.setnames = buf;
blr_cache_response(router, "setnames", buf);
buf = blr_make_query("SET NAMES utf8");
@ -570,7 +613,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_UTF8:
// Response to the SET NAMES utf8, should be stored
if (router->saved_master.utf8)
{
GWBUF_CONSUME_ALL(router->saved_master.utf8);
}
router->saved_master.utf8 = buf;
blr_cache_response(router, "utf8", buf);
buf = blr_make_query("SELECT 1");
@ -580,7 +625,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_SELECT1:
// Response to the SELECT 1, should be stored
if (router->saved_master.select1)
{
GWBUF_CONSUME_ALL(router->saved_master.select1);
}
router->saved_master.select1 = buf;
blr_cache_response(router, "select1", buf);
buf = blr_make_query("SELECT VERSION()");
@ -590,7 +637,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_SELECTVER:
// Response to SELECT VERSION should be stored
if (router->saved_master.selectver)
{
GWBUF_CONSUME_ALL(router->saved_master.selectver);
}
router->saved_master.selectver = buf;
blr_cache_response(router, "selectver", buf);
buf = blr_make_query("SELECT @@version_comment limit 1");
@ -600,7 +649,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_SELECTVERCOM:
// Response to SELECT @@version_comment should be stored
if (router->saved_master.selectvercom)
{
GWBUF_CONSUME_ALL(router->saved_master.selectvercom);
}
router->saved_master.selectvercom = buf;
blr_cache_response(router, "selectvercom", buf);
buf = blr_make_query("SELECT @@hostname");
@ -610,7 +661,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_SELECTHOSTNAME:
// Response to SELECT @@hostname should be stored
if (router->saved_master.selecthostname)
{
GWBUF_CONSUME_ALL(router->saved_master.selecthostname);
}
router->saved_master.selecthostname = buf;
blr_cache_response(router, "selecthostname", buf);
buf = blr_make_query("SELECT @@max_allowed_packet");
@ -620,7 +673,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
case BLRM_MAP:
// Response to SELECT @@max_allowed_packet should be stored
if (router->saved_master.map)
{
GWBUF_CONSUME_ALL(router->saved_master.map);
}
router->saved_master.map = buf;
blr_cache_response(router, "map", buf);
buf = blr_make_registration(router);
@ -659,7 +714,9 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
}
if (router->reconnect_pending)
{
blr_restart_master(router);
}
spinlock_acquire(&router->lock);
router->active_logs = 0;
spinlock_release(&router->lock);
@ -679,7 +736,9 @@ unsigned char *data;
int len;
if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL)
{
return NULL;
}
data = GWBUF_DATA(buf);
len = strlen(query) + 1;
encode_value(&data[0], len, 24); // Payload length
@ -706,7 +765,9 @@ unsigned char *data;
int len = 18;
if ((buf = gwbuf_alloc(len + 4)) == NULL)
{
return NULL;
}
data = GWBUF_DATA(buf);
encode_value(&data[0], len, 24); // Payload length
data[3] = 0; // Sequence ID
@ -740,9 +801,10 @@ int binlog_file_len = strlen(router->binlog_name);
/* COM_BINLOG_DUMP needs 11 bytes + binlogname (terminating NULL is not required) */
int len = 11 + binlog_file_len;
if ((buf = gwbuf_alloc(len + 4)) == NULL)
{
return NULL;
}
data = GWBUF_DATA(buf);
encode_value(&data[0], len, 24); // Payload length
@ -874,7 +936,9 @@ int n_bufs = -1, pn_bufs = -1;
remainder -= n;
ptr += n;
if (remainder > 0)
{
p = p->next;
}
n_bufs++;
}
if (remainder)
@ -998,12 +1062,20 @@ int n_bufs = -1, pn_bufs = -1;
/* Remove error message */
if (router->m_errmsg)
{
free(router->m_errmsg);
}
router->m_errmsg = NULL;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", (unsigned long)len-4, hdr.event_type, hdr.flags, hdr.event_size, (unsigned long)hdr.timestamp);
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
"event size %d, event timestamp %lu\n",
(unsigned long)len - 4,
hdr.event_type,
hdr.flags,
hdr.event_size,
(unsigned long)hdr.timestamp);
#endif
}
}
@ -1136,7 +1208,8 @@ int n_bufs = -1, pn_bufs = -1;
*/
spinlock_acquire(&router->binlog_lock);
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0))
{
/* no pending transaction: set current_pos to binlog_position */
router->binlog_position = router->current_pos;
router->current_safe_event = router->current_pos;
@ -1153,9 +1226,12 @@ int n_bufs = -1, pn_bufs = -1;
* This marks the transaction starts instead of
* QUERY_EVENT with "BEGIN"
*/
if (router->trx_safe && router->master_event_state == BLR_EVENT_DONE) {
if (router->mariadb10_compat) {
if (hdr.event_type == MARIADB10_GTID_EVENT) {
if (router->trx_safe && router->master_event_state == BLR_EVENT_DONE)
{
if (router->mariadb10_compat)
{
if (hdr.event_type == MARIADB10_GTID_EVENT)
{
uint64_t n_sequence;
uint32_t domainid;
unsigned int flags;
@ -1163,10 +1239,12 @@ int n_bufs = -1, pn_bufs = -1;
domainid = extract_field(ptr + 4 + 20 + 8, 32);
flags = *(ptr + 4 + 20 + 8 + 4);
if (flags == 0) {
if (flags == 0)
{
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction > 0) {
if (router->pending_transaction > 0)
{
MXS_ERROR("A MariaDB 10 transaction "
"is already open "
"@ %lu (GTID %u-%u-%lu) and "
@ -1190,7 +1268,8 @@ int n_bufs = -1, pn_bufs = -1;
* look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT
*/
if(hdr.event_type == QUERY_EVENT) {
if (hdr.event_type == QUERY_EVENT)
{
char *statement_sql;
int db_name_len, var_block_len, statement_len;
db_name_len = ptr[4 + 20 + 4 + 4];
@ -1198,13 +1277,17 @@ int n_bufs = -1, pn_bufs = -1;
statement_len = len - (4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len);
statement_sql = calloc(1, statement_len + 1);
strncpy(statement_sql, (char *)ptr+4+20+4+4+1+2+2+var_block_len+1+db_name_len, statement_len);
strncpy(statement_sql,
(char *)ptr + 4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len,
statement_len);
spinlock_acquire(&router->binlog_lock);
/* Check for BEGIN (it comes for START TRANSACTION too) */
if (strncmp(statement_sql, "BEGIN", 5) == 0) {
if (router->pending_transaction > 0) {
if (strncmp(statement_sql, "BEGIN", 5) == 0)
{
if (router->pending_transaction > 0)
{
MXS_ERROR("A transaction is already open "
"@ %lu and a new one starts @ %lu",
router->binlog_position,
@ -1217,8 +1300,8 @@ int n_bufs = -1, pn_bufs = -1;
}
/* Check for COMMIT in non transactional store engines */
if (strncmp(statement_sql, "COMMIT", 6) == 0) {
if (strncmp(statement_sql, "COMMIT", 6) == 0)
{
router->pending_transaction = 2;
}
@ -1228,10 +1311,12 @@ int n_bufs = -1, pn_bufs = -1;
}
/* Check for COMMIT in Transactional engines, i.e InnoDB */
if(hdr.event_type == XID_EVENT) {
if (hdr.event_type == XID_EVENT)
{
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction) {
if (router->pending_transaction)
{
router->pending_transaction = 3;
}
spinlock_release(&router->binlog_lock);
@ -1243,7 +1328,9 @@ int n_bufs = -1, pn_bufs = -1;
MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
if (hdr.event_type >= 0 && hdr.event_type <= event_limit)
{
router->stats.events[hdr.event_type]++;
}
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
@ -1268,7 +1355,9 @@ int n_bufs = -1, pn_bufs = -1;
{
memcpy(new_fde, ptr + 5, hdr.event_size);
if (router->saved_master.fde_event)
{
free(router->saved_master.fde_event);
}
router->saved_master.fde_event = new_fde;
router->saved_master.fde_len = new_fde_len;
}
@ -1299,8 +1388,10 @@ int n_bufs = -1, pn_bufs = -1;
router->stats.n_heartbeats++;
if (router->pending_transaction)
{
router->stats.lastReply = time(0);
}
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 4; // Skip header
@ -1334,8 +1425,10 @@ int n_bufs = -1, pn_bufs = -1;
* buffer chain and close the
* connection with the master
*/
while ((pkt = gwbuf_consume(pkt,
GWBUF_LENGTH(pkt))) != NULL);
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
{
;
}
blr_master_close(router);
blr_master_delayed_connect(router);
return;
@ -1352,8 +1445,10 @@ int n_bufs = -1, pn_bufs = -1;
* buffer chain and close the
* connection with the master
*/
while ((pkt = gwbuf_consume(pkt,
GWBUF_LENGTH(pkt))) != NULL);
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
{
;
}
blr_master_close(router);
blr_master_delayed_connect(router);
return;
@ -1367,7 +1462,8 @@ int n_bufs = -1, pn_bufs = -1;
spinlock_acquire(&router->binlog_lock);
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0))
{
router->binlog_position = router->current_pos;
router->current_safe_event = router->current_pos;
@ -1377,7 +1473,9 @@ int n_bufs = -1, pn_bufs = -1;
if (router->master_event_state == BLR_EVENT_COMPLETE)
{
/** Read the complete event from the disk */
GWBUF *record = blr_read_events_from_pos(router, router->last_event_pos, &hdr, hdr.next_pos);
GWBUF *record = blr_read_events_from_pos(router,
router->last_event_pos,
&hdr, hdr.next_pos);
if (record)
{
uint8_t *data = GWBUF_DATA(record);
@ -1396,7 +1494,9 @@ int n_bufs = -1, pn_bufs = -1;
/* Now distribute events */
blr_distribute_binlog_record(router, &hdr, ptr);
}
} else {
}
else
{
/**
* If transaction is closed:
*
@ -1410,7 +1510,8 @@ int n_bufs = -1, pn_bufs = -1;
*
*/
if (router->pending_transaction > 1) {
if (router->pending_transaction > 1)
{
unsigned long long pos;
unsigned long long end_pos;
GWBUF *record;
@ -1423,7 +1524,11 @@ int n_bufs = -1, pn_bufs = -1;
spinlock_release(&router->binlog_lock);
while ((record = blr_read_events_from_pos(router, pos, &new_hdr, end_pos)) != NULL) {
while ((record = blr_read_events_from_pos(router,
pos,
&new_hdr,
end_pos)) != NULL)
{
i++;
raw_data = GWBUF_DATA(record);
@ -1447,13 +1552,15 @@ int n_bufs = -1, pn_bufs = -1;
}
/* Check whether binlog records has been read in previous loop */
if (pos < router->current_pos) {
if (pos < router->current_pos)
{
char err_message[BINLOG_ERROR_MSG_LEN + 1];
err_message[BINLOG_ERROR_MSG_LEN] = '\0';
/* No event has been sent */
if (pos == router->binlog_position) {
if (pos == router->binlog_position)
{
MXS_ERROR("No events distributed to slaves for a pending "
"transaction in %s at %lu. "
"Last event from master at %lu",
@ -1461,8 +1568,11 @@ int n_bufs = -1, pn_bufs = -1;
router->binlog_position,
router->current_pos);
strncpy(err_message, "No transaction events sent", BINLOG_ERROR_MSG_LEN);
} else {
strncpy(err_message,
"No transaction events sent", BINLOG_ERROR_MSG_LEN);
}
else
{
/* Some events have been sent */
MXS_ERROR("Some events were not distributed to slaves for a "
"pending transaction in %s at %lu. Last distributed "
@ -1472,7 +1582,8 @@ int n_bufs = -1, pn_bufs = -1;
pos,
router->current_pos);
strncpy(err_message, "Incomplete transaction events sent", BINLOG_ERROR_MSG_LEN);
strncpy(err_message,
"Incomplete transaction events sent", BINLOG_ERROR_MSG_LEN);
}
/* distribute error message to registered slaves */
@ -1486,7 +1597,9 @@ int n_bufs = -1, pn_bufs = -1;
router->pending_transaction = 0;
spinlock_release(&router->binlog_lock);
} else {
}
else
{
spinlock_release(&router->binlog_lock);
}
}
@ -1516,8 +1629,10 @@ int n_bufs = -1, pn_bufs = -1;
* buffer chain and close the
* connection with the master
*/
while ((pkt = gwbuf_consume(pkt,
GWBUF_LENGTH(pkt))) != NULL);
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
{
;
}
blr_master_close(router);
blr_master_delayed_connect(router);
return;
@ -1551,7 +1666,9 @@ int n_bufs = -1, pn_bufs = -1;
/* set io error message */
if (router->m_errmsg)
{
free(router->m_errmsg);
}
router->m_errmsg = msg_err;
/* Force stopped state */
@ -1645,16 +1762,22 @@ char file[BINLOG_FNAMELEN+1];
pos |= extract_field(ptr, 32);
slen = len - (8 + 4); // Allow for position and CRC
if (router->master_chksum == 0)
{
slen += 4;
}
if (slen > BINLOG_FNAMELEN)
{
slen = BINLOG_FNAMELEN;
}
memcpy(file, ptr + 8, slen);
file[slen] = 0;
#ifdef VERBOSE_ROTATE
printf("binlog rotate: ");
while (len--)
{
printf("0x%02x ", *ptr++);
}
printf("\n");
printf("New file: %s @ %ld\n", file, pos);
#endif
@ -1694,7 +1817,9 @@ MYSQL_session *auth_info;
}
if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL)
{
return NULL;
}
strncpy(auth_info->user, username, MYSQL_USER_MAXLEN);
strncpy(auth_info->db, database, MYSQL_DATABASE_MAXLEN);
gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1);
@ -1842,7 +1967,9 @@ unsigned int cstate;
/* set lastReply */
if (router->send_slave_heartbeat)
{
slave->lastReply = time(0);
}
if (hdr->event_type == ROTATE_EVENT)
{
@ -1884,7 +2011,9 @@ unsigned int cstate;
if ((cstate & CS_UPTODATE) == CS_UPTODATE)
{
#ifdef STATE_CHANGE_LOGGING_ENABLED
MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from up-to-date to catch-up in blr_distribute_binlog_record, binlog file '%s', position %lu.",
MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from "
"up-to-date to catch-up in blr_distribute_binlog_record, "
"binlog file '%s', position %lu.",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
@ -1910,8 +2039,10 @@ unsigned int cstate;
poll_fake_write_event(slave->dcb);
}
else
{
spinlock_release(&slave->catch_lock);
}
}
slave = slave->next;
}
@ -1936,12 +2067,18 @@ int i;
bufp = buf;
bufp += sprintf(bufp, "%s length = %d: ", msg, len);
for (i = 0; i < len && i < 40; i++)
{
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
}
if (i < len)
{
MXS_LOG_MESSAGE(priority, "%s...", buf);
}
else
{
MXS_LOG_MESSAGE(priority, "%s", buf);
}
}
/**
* Check if the master connection is in place and we
@ -1972,18 +2109,24 @@ int len, ncol, collen;
char *rval;
if (buf == NULL)
{
return NULL;
}
ptr = (uint8_t *)GWBUF_DATA(buf);
/* First packet should be the column count */
len = EXTRACT24(ptr);
ptr += 3;
if (*ptr != 1) // Check sequence number is 1
{
return NULL;
}
ptr++;
ncol = *ptr++;
if (ncol < col) // Not that many column in result
{
return NULL;
}
// Now ptr points at the column definition
while (ncol-- > 0)
{
@ -1995,7 +2138,9 @@ char *rval;
len = EXTRACT24(ptr);
ptr += 4; // Skip to payload
if (*ptr != 0xfe)
{
return NULL;
}
ptr += len;
// Finally we have reached the row
@ -2007,7 +2152,9 @@ char *rval;
contains a second EOF packet right after the first one, the result set is empty and
contains no rows. */
if (len == 5 && *ptr == 0xfe)
{
return NULL;
}
while (--col > 0)
{
@ -2016,7 +2163,9 @@ char *rval;
}
collen = *ptr++;
if ((rval = malloc(collen + 1)) == NULL)
{
return NULL;
}
memcpy(rval, ptr, collen);
rval[collen] = 0; // NULL terminate
@ -2032,7 +2181,11 @@ char *rval;
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF
*blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end) {
*blr_read_events_from_pos(ROUTER_INSTANCE *router,
unsigned long long pos,
REP_HEADER *hdr,
unsigned long long pos_end)
{
unsigned long long end_pos = 0;
struct stat statb;
uint8_t hdbuf[19];
@ -2045,7 +2198,8 @@ int event_limit;
end_pos = pos_end;
/* end of file reached, we're done */
if (pos == end_pos) {
if (pos == end_pos)
{
return NULL;
}
@ -2075,11 +2229,13 @@ int event_limit;
pos, strerror_r(errno, err_msg, sizeof(err_msg)));
if (errno == EBADF)
{
MXS_ERROR("Reading saved events: bad file descriptor for file %s"
", descriptor %d.",
router->binlog_name, router->binlog_fd);
break;
}
}
break;
default:
MXS_ERROR("Reading saved events: short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
@ -2131,7 +2287,9 @@ int event_limit;
"%s, expected %d bytes.",
pos, router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)), hdr->event_size - 19);
} else {
}
else
{
MXS_ERROR("Reading saved events: short read when reading "
"the event at %llu in %s. "
"Expected %d bytes got %d bytes.",
@ -2160,10 +2318,12 @@ int event_limit;
* @param router The router instance
*/
void
blr_stop_start_master(ROUTER_INSTANCE *router) {
if (router->master) {
if (router->master->fd != -1 && router->master->state == DCB_STATE_POLLING) {
blr_stop_start_master(ROUTER_INSTANCE *router)
{
if (router->master)
{
if (router->master->fd != -1 && router->master->state == DCB_STATE_POLLING)
{
blr_master_close(router);
}
}
@ -2181,10 +2341,14 @@ blr_stop_start_master(ROUTER_INSTANCE *router) {
*/
if (strcmp(router->binlog_name, router->prevbinlog) != 0)
{
strncpy(router->prevbinlog, router->binlog_name, BINLOG_FNAMELEN);
}
if (router->client) {
if (router->client->fd != -1 && router->client->state == DCB_STATE_POLLING) {
if (router->client)
{
if (router->client->fd != -1 && router->client->state == DCB_STATE_POLLING)
{
dcb_close(router->client);
router->client = NULL;
}
@ -2211,7 +2375,8 @@ blr_stop_start_master(ROUTER_INSTANCE *router) {
*/
static void
blr_check_last_master_event(void *inst) {
blr_check_last_master_event(void *inst)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
int master_check = 1;
int master_state = BLRM_UNCONNECTED;
@ -2225,7 +2390,8 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
spinlock_release(&router->lock);
if (!master_check) {
if (!master_check)
{
/*
* stop current master connection
* and try a new connection
@ -2233,7 +2399,8 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
blr_stop_start_master(router);
}
if ( (!master_check) || (master_state != BLRM_BINLOGDUMP) ) {
if ( (!master_check) || (master_state != BLRM_BINLOGDUMP) )
{
/*
* Remove the task, it will be added again
* when master state is back to BLRM_BINLOGDUMP
@ -2256,18 +2423,22 @@ char task_name[BLRM_TASK_NAME_LEN + 1] = "";
*/
int
blr_check_heartbeat(ROUTER_INSTANCE *router) {
blr_check_heartbeat(ROUTER_INSTANCE *router)
{
time_t t_now = time(0);
char *event_desc = NULL;
if (router->master_state != BLRM_BINLOGDUMP) {
if (router->master_state != BLRM_BINLOGDUMP)
{
return 1;
}
event_desc = blr_last_event_description(router);
if (router->master_state == BLRM_BINLOGDUMP && router->lastEventReceived > 0) {
if ((t_now - router->stats.lastReply) > (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME)) {
if (router->master_state == BLRM_BINLOGDUMP && router->lastEventReceived > 0)
{
if ((t_now - router->stats.lastReply) > (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME))
{
MXS_ERROR("No event received from master %s:%d in heartbeat period (%lu seconds), "
"last event (%s %d) received %lu seconds ago. Assuming connection is dead "
"and reconnecting.",
@ -2291,27 +2462,36 @@ char *event_desc = NULL;
* @param router The router instance
*/
static void blr_log_identity(ROUTER_INSTANCE *router) {
static void blr_log_identity(ROUTER_INSTANCE *router)
{
char *master_uuid;
char *master_hostname;
char *master_version;
if (router->set_master_version)
{
master_version = router->set_master_version;
else {
}
else
{
master_version = blr_extract_column(router->saved_master.selectver, 1);
}
if (router->set_master_hostname)
{
master_hostname = router->set_master_hostname;
else {
}
else
{
master_hostname = blr_extract_column(router->saved_master.selecthostname, 1);
}
if (router->set_master_uuid)
{
master_uuid = router->master_uuid;
else {
}
else
{
master_uuid = blr_extract_column(router->saved_master.uuid, 2);
}
@ -2324,13 +2504,16 @@ static void blr_log_identity(ROUTER_INSTANCE *router) {
/* Seen by the slaves */
/* MariaDB 5.5 and MariaDB don't have the MASTER_UUID var */
if (master_uuid == NULL) {
if (master_uuid == NULL)
{
MXS_NOTICE("%s: identity seen by the slaves: "
"server_id: %d, hostname: %s, MySQL version: %s",
router->service->name,
router->masterid, (master_hostname == NULL ? "not available" : master_hostname),
(master_version == NULL ? "not available" : master_version));
} else {
}
else
{
MXS_NOTICE("%s: identity seen by the slaves: "
"server_id: %d, uuid: %s, hostname: %s, MySQL version: %s",
router->service->name,
@ -2349,7 +2532,8 @@ static void blr_log_identity(ROUTER_INSTANCE *router) {
* @param err_code The MySQL error code for message
*/
static void
blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code) {
blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code)
{
ROUTER_SLAVE *slave;
spinlock_acquire(&router->lock);

File diff suppressed because it is too large Load Diff

View File

@ -68,7 +68,8 @@ extern uint32_t extract_field(uint8_t *src, int bits);
static void printVersion(const char *progname);
static void printUsage(const char *progname);
static struct option long_options[] = {
static struct option long_options[] =
{
{"debug", no_argument, 0, 'd'},
{"version", no_argument, 0, 'V'},
{"fix", no_argument, 0, 'f'},
@ -85,7 +86,8 @@ MaxScaleUptime()
return 1;
}
int main(int argc, char **argv) {
int main(int argc, char **argv)
{
ROUTER_INSTANCE *inst;
int fd;
int ret;
@ -102,7 +104,8 @@ int main(int argc, char **argv) {
while ((c = getopt_long(argc, argv, "dVfM?", long_options, &option_index)) >= 0)
{
switch (c) {
switch (c)
{
case 'd':
debug_out = 1;
break;
@ -128,7 +131,8 @@ int main(int argc, char **argv) {
mxs_log_set_augmentation(0);
mxs_log_set_priority_enabled(LOG_DEBUG, debug_out);
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL)
{
MXS_ERROR("Memory allocation failed for ROUTER_INSTANCE");
mxs_log_flush_sync();
@ -137,7 +141,8 @@ int main(int argc, char **argv) {
return 1;
}
if (argv[num_args] == NULL) {
if (argv[num_args] == NULL)
{
printf("ERROR: No binlog file was specified\n");
exit(EXIT_FAILURE);
}
@ -145,9 +150,13 @@ int main(int argc, char **argv) {
strncpy(path, argv[num_args], PATH_MAX);
if (fix_file)
{
fd = open(path, O_RDWR, 0666);
}
else
{
fd = open(path, O_RDONLY, 0666);
}
if (fd == -1)
{
@ -165,18 +174,26 @@ int main(int argc, char **argv) {
inst->binlog_fd = fd;
if (mariadb10_compat == 1)
{
inst->mariadb10_compat = 1;
}
ptr = strrchr(path, '/');
if (ptr)
{
strncpy(inst->binlog_name, ptr + 1, BINLOG_FNAMELEN);
}
else
{
strncpy(inst->binlog_name, path, BINLOG_FNAMELEN);
}
MXS_NOTICE("maxbinlogcheck %s", binlog_check_version);
if (fstat(inst->binlog_fd, &statb) == 0)
{
filelen = statb.st_size;
}
MXS_NOTICE("Checking %s (%s), size %lu bytes", path, inst->binlog_name, filelen);