Detect and store MariaDB GTID

If Binlog Server is running with MariaDB 10 compatibility then the
found GTID is stored in router->mariadb_gtid
This commit is contained in:
MassimilianoPinto
2017-02-15 08:28:47 +01:00
parent 71707c8505
commit a0b599730c
5 changed files with 95 additions and 47 deletions

View File

@ -315,6 +315,7 @@ createInstance(SERVICE *service, char **options)
inst->current_pos = 0;
inst->current_safe_event = 0;
inst->master_event_state = BLR_EVENT_DONE;
inst->mariadb_gtid[0] = '\0';
strcpy(inst->binlog_name, "");
strcpy(inst->prevbinlog, "");
@ -1432,6 +1433,11 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
dcb_printf(dcb, "\tLast event from master: 0x%x, %s\n",
router_inst->lastEventReceived, (ptr != NULL) ? ptr : "unknown");
if (router_inst->mariadb_gtid[0])
{
dcb_printf(dcb, "\tLast seen MariaDB GTID: %s\n",
router_inst->mariadb_gtid);
}
}
if (router_inst->lastEventTimestamp)

View File

@ -87,6 +87,9 @@ MXS_BEGIN_DECLS
#define BLR_REPORT_CHECKSUM_FORMAT "CRC32 0x"
#define BLR_REPORT_REP_HEADER 0x02
/* MariaDB GTID string len */
#define GTID_MAX_LEN 42
/**
* Supported Encryption algorithms
*
@ -600,6 +603,7 @@ typedef struct router_instance
int master_semi_sync; /*< Semi-Sync replication status of master server */
BINLOG_ENCRYPTION_SETUP encryption; /*< Binlog encryption setup */
void *encryption_ctx; /*< Encryption context */
char mariadb_gtid[GTID_MAX_LEN + 1]; /*< MariaDB 10 GTID string value */
struct router_instance *next;
} ROUTER_INSTANCE;

View File

@ -1974,7 +1974,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug)
}
/* If MariaDB 10 compatibility:
* check for MARIADB10_GTID_EVENT with flags = 0
* check for MARIADB10_GTID_EVENT with flags
* This marks the transaction starts instead of
* QUERY_EVENT with "BEGIN"
*/
@ -1992,6 +1992,13 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug)
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
{
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
domainid, hdr.serverid,
n_sequence);
strcpy(router->mariadb_gtid, mariadb_gtid);
if (pending_transaction > 0)
{
MXS_ERROR("Transaction cannot be @ pos %llu: "

View File

@ -1368,54 +1368,72 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
* Only complete transactions should be sent to sleves
*/
/**
* If MariaDB 10 compatibility:
* check for MARIADB10_GTID_EVENT with flags = 0
* This marks the transaction starts instead of
* QUERY_EVENT with "BEGIN"
*/
if (router->trx_safe)
if (router->mariadb10_compat && hdr.event_type == MARIADB10_GTID_EVENT)
{
if (router->mariadb10_compat)
{
if (hdr.event_type == MARIADB10_GTID_EVENT)
{
uint64_t n_sequence;
uint32_t domainid;
unsigned int flags;
n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64);
domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32);
flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4);
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
{
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction > 0)
{
MXS_ERROR("A MariaDB 10 transaction "
"is already open "
"@ %lu (GTID %u-%u-%lu) and "
"a new one starts @ %lu",
router->binlog_position,
domainid, hdr.serverid,
n_sequence,
router->current_pos);
// An action should be taken here
}
router->pending_transaction = BLRM_TRANSACTION_START;
spinlock_release(&router->binlog_lock);
}
}
}
/**
* look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT
* If MariaDB 10 compatibility:
* check for MARIADB10_GTID_EVENT with flags:
* this is the TRASACTION START detection.
*
* Save GTID anyway and if trx_safe is set mark the transaction start
*/
uint64_t n_sequence;
uint32_t domainid;
unsigned int flags;
n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64);
domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32);
flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4);
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
{
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
domainid, hdr.serverid,
n_sequence);
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
router->mariadb_gtid,
router->binlog_name,
router->binlog_position);
spinlock_acquire(&router->binlog_lock);
/* Save GTID */
strcpy(router->mariadb_gtid, mariadb_gtid);
/**
* Now mark the new open transaction
*/
if (router->trx_safe)
{
if (router->pending_transaction > 0)
{
MXS_ERROR("A MariaDB 10 transaction "
"is already open "
"@ %lu (GTID %s) and "
"a new one starts @ %lu",
router->binlog_position,
mariadb_gtid,
router->current_pos);
}
router->pending_transaction = BLRM_TRANSACTION_START;
}
spinlock_release(&router->binlog_lock);
}
}
/**
* If trx_safe is set then look for:
* - QUERY_EVENT: BEGIN | START TRANSACTION | COMMIT
* - XID_EVENT for transactional storage edngines
*/
if (router->trx_safe)
{
if (hdr.event_type == QUERY_EVENT)
{
char *statement_sql;
@ -1444,7 +1462,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
router->binlog_position,
router->current_pos);
// An action should be taken here
}
router->pending_transaction = BLRM_TRANSACTION_START;

View File

@ -488,7 +488,7 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
}
else if (strcmp(query_text, maxwell_binlog_row_image_query) == 0)
{
char *binlog_row_image = blr_extract_column(router->saved_master.binlog_vars, 1);
char *binlog_row_image = blr_extract_column(router->saved_master.binlog_vars, 3);
blr_slave_send_var_value(router, slave, "Value", binlog_row_image == NULL ? "" : binlog_row_image, BLR_TYPE_STRING);
MXS_FREE(binlog_row_image);
MXS_FREE(query_text);
@ -644,6 +644,20 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return blr_slave_send_var_value(router, slave, heading, server_id, BLR_TYPE_INT);
}
else if ((strcasecmp(word, "@@gtid_current_pos") == 0) || (strcasecmp(word, "@@global.gtid_current_pos") == 0))
{
char heading[40]; /* to ensure we match the case in query and response */
char mariadb_gtid[GTID_MAX_LEN + 1];
strcpy(heading, word);
MXS_FREE(query_text);
/* Safely get router->mariadb_gtid */
spinlock_acquire(&router->binlog_lock);
strcpy(mariadb_gtid, router->mariadb_gtid);
spinlock_release(&router->binlog_lock);
return blr_slave_send_var_value(router, slave, heading, mariadb_gtid, BLR_TYPE_STRING);
}
else if (strcasestr(word, "binlog_gtid_pos"))
{
unexpected = false;