Move binlog event processing into a separate file

This clarifies what parts of the router are specific to the binlogrouter
and what are common between the binlogrouter and avrorouter.

Ideally, the two modules would use the same infrastructure to handle the
processing of replication events. This is the first, albeit small, step
towards making the code in the binlogrouter the common infrastructure.
This commit is contained in:
Markus Mäkelä
2018-05-23 17:37:51 +03:00
parent 14a3b0052b
commit 5a3bbf0d15
6 changed files with 516 additions and 483 deletions

View File

@ -57,24 +57,20 @@ static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
void encode_value(unsigned char *data, unsigned int value, int len);
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
static void *CreateMySQLAuthData(const char *username,
const char *password,
const char *database);
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static void blr_log_packet(int priority, const char *msg, uint8_t *ptr, int len);
void blr_master_close(ROUTER_INSTANCE *);
char *blr_extract_column(GWBUF *buf, int col);
static bool blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
static int blr_get_master_semisync(GWBUF *buf);
static void blr_terminate_master_replication(ROUTER_INSTANCE *router,
uint8_t* ptr,
int len);
void blr_notify_all_slaves(ROUTER_INSTANCE *router);
extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
extern bool blr_save_mariadb_gtid(ROUTER_INSTANCE *inst);
static void blr_register_serverid(ROUTER_INSTANCE *router, GWBUF *buf);
@ -105,12 +101,6 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router,
static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mariadb_gtid_request(ROUTER_INSTANCE *router,
GWBUF *buf);
static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router,
REP_HEADER *hdr,
uint8_t *ptr);
static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router,
REP_HEADER *hdr,
uint8_t *ptr);
extern int blr_write_special_event(ROUTER_INSTANCE *router,
uint32_t file_offset,
uint32_t hole_size,
@ -125,21 +115,6 @@ static int blr_check_connect_retry(ROUTER_INSTANCE *router);
static int keepalive = 1;
/** Master Semi-Sync capability */
typedef enum
{
MASTER_SEMISYNC_NOT_AVAILABLE, /*< Semi-Sync replication not available */
MASTER_SEMISYNC_DISABLED, /*< Semi-Sync is disabled */
MASTER_SEMISYNC_ENABLED /*< Semi-Sync is enabled */
} master_semisync_capability_t;
#define MASTER_BYTES_BEFORE_EVENT 5
#define MASTER_BYTES_BEFORE_EVENT_SEMI_SYNC MASTER_BYTES_BEFORE_EVENT + 2
/* Semi-Sync indicator in network packet (byte 6) */
#define BLR_MASTER_SEMI_SYNC_INDICATOR 0xef
/* Semi-Sync flag ACK_REQ in network packet (byte 7) */
#define BLR_MASTER_SEMI_SYNC_ACK_REQ 0x01
/**
* blr_start_master - controls the connection of the binlog router to the
* master MySQL server and triggers the slave registration process for
@ -900,11 +875,9 @@ static void reset_errors(ROUTER_INSTANCE *router, REP_HEADER *hdr)
void
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
uint8_t *msg = NULL, *ptr;
uint8_t *msg = NULL;
REP_HEADER hdr;
unsigned int len = 0;
int prev_length = -1;
int n_bufs = -1, pn_bufs = -1;
uint32_t len = 0;
int check_packet_len;
int semisync_bytes;
int semi_sync_send_ack = 0;
@ -916,7 +889,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
*/
while (pkt)
{
ptr = GWBUF_DATA(pkt);
uint8_t* ptr = GWBUF_DATA(pkt);
len = gw_mysql_get_byte3(ptr);
semisync_bytes = 0;
@ -987,19 +960,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
(hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN)) < MYSQL_PACKET_LENGTH_MAX)
{
MXS_ERROR("Packet length is %d, but event size is %d, "
"binlog file %s position %lu, "
"length of previous event %d.",
"binlog file %s position %lu.",
len, hdr.event_size,
router->binlog_name,
router->current_pos,
prev_length);
router->current_pos);
blr_log_packet(LOG_ERR, "Packet:", ptr, len);
MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs);
if (msg)
{
MXS_FREE(msg);
@ -1126,440 +1093,11 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
if (hdr.ok == 0)
{
router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp;
/**
* Check for an open transaction, if the option is set
* Only complete transactions should be sent to sleves
*
* If a trasaction is pending router->binlog_position
* won't be updated to router->current_pos
*/
spinlock_acquire(&router->binlog_lock);
if (router->trx_safe == 0 ||
(router->trx_safe &&
router->pending_transaction.state == BLRM_NO_TRANSACTION))
if (!blr_handle_one_event(router, hdr, ptr, len, semi_sync_send_ack))
{
/* no pending transaction: set current_pos to binlog_position */
router->binlog_position = router->current_pos;
router->current_safe_event = router->current_pos;
}
spinlock_release(&router->binlog_lock);
/**
* Detect transactions in events if trx_safe is set:
* Only complete transactions should be sent to sleves
*
* Now looking for:
* - QUERY_EVENT: BEGIN | START TRANSACTION | COMMIT
* - MariadDB 10 GTID_EVENT
* - XID_EVENT for transactional storage engines
*/
if (router->trx_safe)
{
// MariaDB 10 GTID event check
if (router->mariadb10_compat &&
hdr.event_type == MARIADB10_GTID_EVENT)
{
/**
* If MariaDB 10 compatibility:
* check for MARIADB10_GTID_EVENT with flags:
* this is the TRASACTION START detection.
*/
uint64_t n_sequence;
uint32_t domainid;
unsigned int flags;
n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64);
domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32);
flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4);
spinlock_acquire(&router->binlog_lock);
/**
* Detect whether it's a standalone transaction:
* there is no terminating COMMIT event.
* i.e: a DDL or FLUSH TABLES etc
*/
router->pending_transaction.standalone = flags & MARIADB_FL_STANDALONE;
/**
* Now mark the new open transaction
*/
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
{
MXS_ERROR("A MariaDB 10 transaction "
"is already open "
"@ %lu (GTID %u-%u-%lu) and "
"a new one starts @ %lu",
router->binlog_position,
domainid,
hdr.serverid,
n_sequence,
router->current_pos);
}
router->pending_transaction.state = BLRM_TRANSACTION_START;
/* Handle MariaDB 10 GTID */
if (router->mariadb10_gtid)
{
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
domainid,
hdr.serverid,
n_sequence);
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
mariadb_gtid,
router->binlog_name,
router->current_pos);
/* Save the pending GTID string value */
strcpy(router->pending_transaction.gtid, mariadb_gtid);
/* Save the pending GTID components */
router->pending_transaction.gtid_elms.domain_id = domainid;
/* This is the master id, no override */
router->pending_transaction.gtid_elms.server_id = hdr.serverid;
router->pending_transaction.gtid_elms.seq_no = n_sequence;
}
router->pending_transaction.start_pos = router->current_pos;
router->pending_transaction.end_pos = 0;
spinlock_release(&router->binlog_lock);
}
// Query Event check
if (hdr.event_type == QUERY_EVENT)
{
char *statement_sql;
int db_name_len, var_block_len, statement_len;
db_name_len = ptr[MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4];
var_block_len = ptr[MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4 + 1 + 2];
statement_len = len - (MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4 + 1 + 2 + 2 \
+ var_block_len + 1 + db_name_len);
statement_sql =
static_cast<char*>(MXS_CALLOC(1, statement_len + 1));
MXS_ABORT_IF_NULL(statement_sql);
memcpy(statement_sql,
(char *)ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 4 + 4 + 1 + 2 + 2 \
+ var_block_len + 1 + db_name_len,
statement_len);
spinlock_acquire(&router->binlog_lock);
/* Check for BEGIN (it comes for START TRANSACTION too) */
if (strncmp(statement_sql, "BEGIN", 5) == 0)
{
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
{
MXS_ERROR("A transaction is already open "
"@ %lu and a new one starts @ %lu",
router->binlog_position,
router->current_pos);
}
router->pending_transaction.state = BLRM_TRANSACTION_START;
router->pending_transaction.start_pos = router->current_pos;
router->pending_transaction.end_pos = 0;
}
/* Check for COMMIT in non transactional store engines */
if (strncmp(statement_sql, "COMMIT", 6) == 0)
{
router->pending_transaction.state = BLRM_COMMIT_SEEN;
}
/**
* If it's a standalone transaction event we're done:
* this query event, only one, terminates the transaction.
*/
if (router->pending_transaction.state > BLRM_NO_TRANSACTION &&
router->pending_transaction.standalone)
{
router->pending_transaction.state = BLRM_STANDALONE_SEEN;
}
spinlock_release(&router->binlog_lock);
MXS_FREE(statement_sql);
}
/* Check for COMMIT in Transactional engines, i.e InnoDB */
if (hdr.event_type == XID_EVENT)
{
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction.state >= BLRM_TRANSACTION_START)
{
router->pending_transaction.state = BLRM_XID_EVENT_SEEN;
}
spinlock_release(&router->binlog_lock);
}
}
/**
* Check Event Type limit:
* If supported, gather statistics about
* the replication event types
* else stop replication from master
*/
int event_limit = router->mariadb10_compat ?
MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
if (hdr.event_type <= event_limit)
{
router->stats.events[hdr.event_type]++;
}
else
{
char errmsg[BINLOG_ERROR_MSG_LEN + 1];
sprintf(errmsg,
"Event type [%d] not supported yet. "
"Check master server configuration and "
"disable any new feature. "
"Replication from master has been stopped.",
hdr.event_type);
MXS_ERROR("%s", errmsg);
gwbuf_free(pkt);
pkt = NULL;
spinlock_acquire(&router->lock);
/* Handle error messages */
char* old_errmsg = router->m_errmsg;
router->m_errmsg = MXS_STRDUP_A(errmsg);
router->m_errno = 1235;
/* Set state to stopped */
router->master_state = BLRM_SLAVE_STOPPED;
router->stats.n_binlog_errors++;
spinlock_release(&router->lock);
MXS_FREE(old_errmsg);
/* Stop replication */
blr_master_close(router);
return;
}
/*
* FORMAT_DESCRIPTION_EVENT with next_pos = 0
* should not be saved
*/
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
router->stats.n_fakeevents++;
MXS_DEBUG("Replication Fake FORMAT_DESCRIPTION_EVENT event. "
"Binlog %s @ %lu.",
router->binlog_name,
router->current_pos);
}
else
{
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
MXS_DEBUG("Replication heartbeat. "
"Binlog %s @ %lu.",
router->binlog_name,
router->current_pos);
router->stats.n_heartbeats++;
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
{
router->stats.lastReply = time(0);
}
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
if (hdr.event_type == ROTATE_EVENT)
{
spinlock_acquire(&router->binlog_lock);
router->rotating = 1;
spinlock_release(&router->binlog_lock);
}
uint32_t offset = MYSQL_HEADER_LEN + 1; // Skip header and OK byte
/**
* Write the raw event data to disk without the network
* header or the OK byte
*/
if (blr_write_binlog_record(router, &hdr, len - offset, ptr + offset) == 0)
{
gwbuf_free(pkt);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
/* Check for rotate event */
if (hdr.event_type == ROTATE_EVENT)
{
if (!blr_rotate_event(router, ptr + offset, &hdr))
{
gwbuf_free(pkt);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
}
/* Handle semi-sync request from master */
if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE &&
semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ)
{
MXS_DEBUG("%s: binlog record in file %s, pos %lu has "
"SEMI_SYNC_ACK_REQ and needs a Semi-Sync ACK packet to "
"be sent to the master server [%s]:%d",
router->service->name, router->binlog_name,
router->current_pos,
router->service->dbref->server->address,
router->service->dbref->server->port);
/* Send Semi-Sync ACK packet to master server */
blr_send_semisync_ack(router, hdr.next_pos);
/* Reset ACK sending */
semi_sync_send_ack = 0;
}
/**
* Distributing binlog events to slaves
* may depend on pending transaction
*/
spinlock_acquire(&router->binlog_lock);
if (router->trx_safe == 0 ||
(router->trx_safe &&
router->pending_transaction.state == BLRM_NO_TRANSACTION))
{
router->binlog_position = router->current_pos;
router->current_safe_event = router->last_event_pos;
spinlock_release(&router->binlog_lock);
/* Notify clients events can be read */
blr_notify_all_slaves(router);
}
else
{
/**
* If transaction is closed:
*
* 1) Notify clients events can be read
* from router->binlog_position
* 2) Update last seen MariaDB 10 GTID
* 3) set router->binlog_position to
* router->current_pos
*/
if (router->pending_transaction.state > BLRM_TRANSACTION_START)
{
if (router->mariadb10_compat)
{
/**
* The transaction has been saved.
* this poins to end of binlog:
* i.e. the position of a new event
*/
router->pending_transaction.end_pos = router->current_pos;
if (router->mariadb10_compat &&
router->mariadb10_gtid)
{
/* Update last seen MariaDB GTID */
strcpy(router->last_mariadb_gtid,
router->pending_transaction.gtid);
/**
* Save MariaDB GTID into repo
*/
blr_save_mariadb_gtid(router);
}
}
spinlock_release(&router->binlog_lock);
/* Notify clients events can be read */
blr_notify_all_slaves(router);
/* update binlog_position and set pending to NO_TRX */
spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos;
/* Set no pending transaction and no standalone */
router->pending_transaction.state = BLRM_NO_TRANSACTION;
router->pending_transaction.standalone = false;
spinlock_release(&router->binlog_lock);
}
else
{
spinlock_release(&router->binlog_lock);
}
}
}
else
{
/**
* Here we handle Artificial event, the ones with
* LOG_EVENT_ARTIFICIAL_F hdr.flags
*/
router->stats.n_artificial++;
MXS_DEBUG("Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %lu.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->current_pos);
ptr += MYSQL_HEADER_LEN + 1;
// Fake Rotate event is always sent as first packet from master
if (hdr.event_type == ROTATE_EVENT)
{
if (!blr_handle_fake_rotate(router, &hdr, ptr))
{
gwbuf_free(pkt);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
MXS_INFO("Fake ROTATE_EVENT received: "
"binlog file %s, pos %" PRIu64 "",
router->binlog_name,
router->current_pos);
}
else if (hdr.event_type == MARIADB10_GTID_GTID_LIST_EVENT)
{
/*
* MariaDB10 event:
* it could be sent as part of GTID registration
* before sending change data events.
*/
blr_handle_fake_gtid_list(router, &hdr, ptr);
}
}
}
}
else
{
@ -1610,7 +1148,7 @@ blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
* @param hdr The replication message header
* @return 1 if the file could be rotated, 0 otherwise.
*/
static int
int
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
{
int len, slen;
@ -1721,7 +1259,7 @@ CreateMySQLAuthData(const char *username, const char *password, const char *data
}
if ((auth_info =
static_cast<MYSQL_session*>(MXS_CALLOC(1, sizeof(MYSQL_session)))) == NULL)
static_cast<MYSQL_session*>(MXS_CALLOC(1, sizeof(MYSQL_session)))) == NULL)
{
return NULL;
}
@ -2341,8 +1879,7 @@ blr_extract_header_semisync(register uint8_t *ptr, register REP_HEADER *hdr)
* @return 1 if the packect is sent, 0 on errors
*/
static int
blr_send_semisync_ack(ROUTER_INSTANCE *router, uint64_t pos)
int blr_send_semisync_ack(ROUTER_INSTANCE *router, uint64_t pos)
{
int seqno = 0;
int semi_sync_flag = BLR_MASTER_SEMI_SYNC_INDICATOR;
@ -3349,9 +2886,9 @@ static void blr_register_mariadb_gtid_request(ROUTER_INSTANCE *router,
* @return True for succesfull binlog file rotation,
* False otherwise.
*/
static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router,
REP_HEADER *hdr,
uint8_t *ptr)
bool blr_handle_fake_rotate(ROUTER_INSTANCE *router,
REP_HEADER *hdr,
uint8_t *ptr)
{
ss_dassert(hdr->event_type == ROTATE_EVENT);
@ -3429,9 +2966,9 @@ static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router,
* @param hdr The Replication event header
* @param ptr The packet data
*/
static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router,
REP_HEADER *hdr,
uint8_t *ptr)
void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router,
REP_HEADER *hdr,
uint8_t *ptr)
{
ss_dassert(hdr->event_type == MARIADB10_GTID_GTID_LIST_EVENT);