MXS-1075: MariaDB 10 GTID stored and shown only for transaction safety on

MariaDB 10 GTID is detected and stored only if transaction_safety
option is on.

SELECT @@gtid_current_pos and “maxadmin show service $service_name” can
return it
This commit is contained in:
MassimilianoPinto
2017-02-28 15:19:57 +01:00
parent 054ddcb3dd
commit 304b5ced29
2 changed files with 67 additions and 53 deletions

View File

@ -1357,49 +1357,50 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
spinlock_release(&router->binlog_lock); spinlock_release(&router->binlog_lock);
/** /**
* Detect transactions in events * Detect transactions in events if trx_safe is set:
* Only complete transactions should be sent to sleves * Only complete transactions should be sent to sleves
*
* Now looking for:
* - QUERY_EVENT: BEGIN | START TRANSACTION | COMMIT
* - MariadDB 10 GTID_EVENT
* - XID_EVENT for transactional storage engines
*/ */
if (router->mariadb10_compat && if (router->trx_safe)
hdr.event_type == MARIADB10_GTID_EVENT)
{ {
/** if (router->mariadb10_compat &&
* If MariaDB 10 compatibility: hdr.event_type == MARIADB10_GTID_EVENT)
* check for MARIADB10_GTID_EVENT with flags:
* this is the TRASACTION START detection.
*
*/
uint64_t n_sequence;
uint32_t domainid;
unsigned int flags;
n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64);
domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32);
flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4);
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
{ {
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
domainid, hdr.serverid,
n_sequence);
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
mariadb_gtid,
router->binlog_name,
router->current_pos);
spinlock_acquire(&router->binlog_lock);
/* Save GTID */
strcpy(router->mariadb_gtid, mariadb_gtid);
/** /**
* Now mark the new open transaction * If MariaDB 10 compatibility:
* check for MARIADB10_GTID_EVENT with flags:
* this is the TRASACTION START detection.
*/ */
if (router->trx_safe)
uint64_t n_sequence;
uint32_t domainid;
unsigned int flags;
n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64);
domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32);
flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4);
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
{ {
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
domainid, hdr.serverid,
n_sequence);
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
mariadb_gtid,
router->binlog_name,
router->current_pos);
spinlock_acquire(&router->binlog_lock);
/**
* Now mark the new open transaction
*/
if (router->pending_transaction.state > BLRM_NO_TRANSACTION) if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
{ {
MXS_ERROR("A MariaDB 10 transaction " MXS_ERROR("A MariaDB 10 transaction "
@ -1413,20 +1414,15 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
router->pending_transaction.state = BLRM_TRANSACTION_START; router->pending_transaction.state = BLRM_TRANSACTION_START;
/* Save the pending GTID details */
strcpy(router->pending_transaction.gtid, mariadb_gtid);
router->pending_transaction.start_pos = router->current_pos;
router->pending_transaction.end_pos = 0;
} }
spinlock_release(&router->binlog_lock); spinlock_release(&router->binlog_lock);
} }
}
/**
* If trx_safe is set then look for:
* - QUERY_EVENT: BEGIN | START TRANSACTION | COMMIT
* - XID_EVENT for transactional storage edngines
*/
if (router->trx_safe)
{
if (hdr.event_type == QUERY_EVENT) if (hdr.event_type == QUERY_EVENT)
{ {
char *statement_sql; char *statement_sql;
@ -1458,6 +1454,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
} }
router->pending_transaction.state = BLRM_TRANSACTION_START; router->pending_transaction.state = BLRM_TRANSACTION_START;
router->pending_transaction.start_pos = router->current_pos;
router->pending_transaction.end_pos = 0;
} }
/* Check for COMMIT in non transactional store engines */ /* Check for COMMIT in non transactional store engines */
@ -1465,7 +1463,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{ {
router->pending_transaction.state = BLRM_COMMIT_SEEN; router->pending_transaction.state = BLRM_COMMIT_SEEN;
} }
spinlock_release(&router->binlog_lock); spinlock_release(&router->binlog_lock);
MXS_FREE(statement_sql); MXS_FREE(statement_sql);
@ -1624,19 +1621,32 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
* If transaction is closed: * If transaction is closed:
* *
* 1) Notify clients events can be read * 1) Notify clients events can be read
* from router->binlog_position * from router->binlog_position
* 2) set router->binlog_position to * 2) Update last seen MariaDB 10 GTID
* 3) set router->binlog_position to
* router->current_pos * router->current_pos
*/ */
if (router->pending_transaction.state > BLRM_TRANSACTION_START) if (router->pending_transaction.state > BLRM_TRANSACTION_START)
{ {
/* Update last seen MariaDB GTID */
if (router->mariadb10_compat)
{
strcpy(router->mariadb_gtid, router->pending_transaction.gtid);
/* The transaction has been saved.
* this poins to end of binlog:
* i.e. the position of a new event
*/
router->pending_transaction.end_pos = router->current_pos;
}
spinlock_release(&router->binlog_lock); spinlock_release(&router->binlog_lock);
/* Notify clients events can be read */ /* Notify clients events can be read */
blr_notify_all_slaves(router); blr_notify_all_slaves(router);
/* update binlog_position and set pending to 0 */ /* update binlog_position and set pending to NO_TRX */
spinlock_acquire(&router->binlog_lock); spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos; router->binlog_position = router->current_pos;

View File

@ -646,15 +646,19 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
} }
else if ((strcasecmp(word, "@@gtid_current_pos") == 0) || (strcasecmp(word, "@@global.gtid_current_pos") == 0)) else if ((strcasecmp(word, "@@gtid_current_pos") == 0) || (strcasecmp(word, "@@global.gtid_current_pos") == 0))
{ {
char heading[40]; /* to ensure we match the case in query and response */ char heading[40];
char mariadb_gtid[GTID_MAX_LEN + 1]; char mariadb_gtid[GTID_MAX_LEN + 1];
mariadb_gtid[0] = '\0';
strcpy(heading, word); strcpy(heading, word);
MXS_FREE(query_text); MXS_FREE(query_text);
/* Safely get router->mariadb_gtid */ if (router->mariadb10_compat)
spinlock_acquire(&router->binlog_lock); {
strcpy(mariadb_gtid, router->mariadb_gtid); spinlock_acquire(&router->binlog_lock);
spinlock_release(&router->binlog_lock); strcpy(mariadb_gtid, router->mariadb_gtid);
spinlock_release(&router->binlog_lock);
}
return blr_slave_send_var_value(router, slave, heading, mariadb_gtid, BLR_TYPE_STRING); return blr_slave_send_var_value(router, slave, heading, mariadb_gtid, BLR_TYPE_STRING);
} }