diff --git a/server/modules/routing/binlogrouter/blr.c b/server/modules/routing/binlogrouter/blr.c index b25c3154d..553b483d7 100644 --- a/server/modules/routing/binlogrouter/blr.c +++ b/server/modules/routing/binlogrouter/blr.c @@ -315,6 +315,7 @@ createInstance(SERVICE *service, char **options) inst->current_pos = 0; inst->current_safe_event = 0; inst->master_event_state = BLR_EVENT_DONE; + inst->mariadb_gtid[0] = '\0'; strcpy(inst->binlog_name, ""); strcpy(inst->prevbinlog, ""); @@ -1432,6 +1433,11 @@ diagnostics(MXS_ROUTER *router, DCB *dcb) dcb_printf(dcb, "\tLast event from master: 0x%x, %s\n", router_inst->lastEventReceived, (ptr != NULL) ? ptr : "unknown"); + if (router_inst->mariadb_gtid[0]) + { + dcb_printf(dcb, "\tLast seen MariaDB GTID: %s\n", + router_inst->mariadb_gtid); + } } if (router_inst->lastEventTimestamp) diff --git a/server/modules/routing/binlogrouter/blr.h b/server/modules/routing/binlogrouter/blr.h index dbdd336f9..d865314d2 100644 --- a/server/modules/routing/binlogrouter/blr.h +++ b/server/modules/routing/binlogrouter/blr.h @@ -87,6 +87,9 @@ MXS_BEGIN_DECLS #define BLR_REPORT_CHECKSUM_FORMAT "CRC32 0x" #define BLR_REPORT_REP_HEADER 0x02 +/* MariaDB GTID string len */ +#define GTID_MAX_LEN 42 + /** * Supported Encryption algorithms * @@ -600,6 +603,7 @@ typedef struct router_instance int master_semi_sync; /*< Semi-Sync replication status of master server */ BINLOG_ENCRYPTION_SETUP encryption; /*< Binlog encryption setup */ void *encryption_ctx; /*< Encryption context */ + char mariadb_gtid[GTID_MAX_LEN + 1]; /*< MariaDB 10 GTID string value */ struct router_instance *next; } ROUTER_INSTANCE; diff --git a/server/modules/routing/binlogrouter/blr_file.c b/server/modules/routing/binlogrouter/blr_file.c index 2abab36c8..86d5e4175 100644 --- a/server/modules/routing/binlogrouter/blr_file.c +++ b/server/modules/routing/binlogrouter/blr_file.c @@ -1974,7 +1974,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug) } /* If MariaDB 10 compatibility: - * check for MARIADB10_GTID_EVENT with flags = 0 + * check for MARIADB10_GTID_EVENT with flags * This marks the transaction starts instead of * QUERY_EVENT with "BEGIN" */ @@ -1992,6 +1992,13 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug) if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) { + char mariadb_gtid[GTID_MAX_LEN + 1]; + snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", + domainid, hdr.serverid, + n_sequence); + + strcpy(router->mariadb_gtid, mariadb_gtid); + if (pending_transaction > 0) { MXS_ERROR("Transaction cannot be @ pos %llu: " diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index 4ebda2154..a6cdc420f 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -1368,54 +1368,72 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) * Only complete transactions should be sent to sleves */ - /** - * If MariaDB 10 compatibility: - * check for MARIADB10_GTID_EVENT with flags = 0 - * This marks the transaction starts instead of - * QUERY_EVENT with "BEGIN" - */ - if (router->trx_safe) + if (router->mariadb10_compat && hdr.event_type == MARIADB10_GTID_EVENT) { - if (router->mariadb10_compat) - { - if (hdr.event_type == MARIADB10_GTID_EVENT) - { - uint64_t n_sequence; - uint32_t domainid; - unsigned int flags; - n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64); - domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32); - flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4); - - if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) - { - spinlock_acquire(&router->binlog_lock); - - if (router->pending_transaction > 0) - { - MXS_ERROR("A MariaDB 10 transaction " - "is already open " - "@ %lu (GTID %u-%u-%lu) and " - "a new one starts @ %lu", - router->binlog_position, - domainid, hdr.serverid, - n_sequence, - router->current_pos); - - // An action should be taken here - } - - router->pending_transaction = BLRM_TRANSACTION_START; - - spinlock_release(&router->binlog_lock); - } - } - } - /** - * look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT + * If MariaDB 10 compatibility: + * check for MARIADB10_GTID_EVENT with flags: + * this is the TRASACTION START detection. + * + * Save GTID anyway and if trx_safe is set mark the transaction start */ + uint64_t n_sequence; + uint32_t domainid; + unsigned int flags; + n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64); + domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32); + flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4); + + if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) + { + char mariadb_gtid[GTID_MAX_LEN + 1]; + snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", + domainid, hdr.serverid, + n_sequence); + + MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu", + router->mariadb_gtid, + router->binlog_name, + router->binlog_position); + + spinlock_acquire(&router->binlog_lock); + + /* Save GTID */ + strcpy(router->mariadb_gtid, mariadb_gtid); + + /** + * Now mark the new open transaction + */ + if (router->trx_safe) + { + if (router->pending_transaction > 0) + { + MXS_ERROR("A MariaDB 10 transaction " + "is already open " + "@ %lu (GTID %s) and " + "a new one starts @ %lu", + router->binlog_position, + mariadb_gtid, + router->current_pos); + } + + router->pending_transaction = BLRM_TRANSACTION_START; + + } + + spinlock_release(&router->binlog_lock); + } + } + + /** + * If trx_safe is set then look for: + * - QUERY_EVENT: BEGIN | START TRANSACTION | COMMIT + * - XID_EVENT for transactional storage edngines + */ + + if (router->trx_safe) + { if (hdr.event_type == QUERY_EVENT) { char *statement_sql; @@ -1444,7 +1462,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) router->binlog_position, router->current_pos); - // An action should be taken here } router->pending_transaction = BLRM_TRANSACTION_START; diff --git a/server/modules/routing/binlogrouter/blr_slave.c b/server/modules/routing/binlogrouter/blr_slave.c index baa44f0b7..df14182ae 100644 --- a/server/modules/routing/binlogrouter/blr_slave.c +++ b/server/modules/routing/binlogrouter/blr_slave.c @@ -488,7 +488,7 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) } else if (strcmp(query_text, maxwell_binlog_row_image_query) == 0) { - char *binlog_row_image = blr_extract_column(router->saved_master.binlog_vars, 1); + char *binlog_row_image = blr_extract_column(router->saved_master.binlog_vars, 3); blr_slave_send_var_value(router, slave, "Value", binlog_row_image == NULL ? "" : binlog_row_image, BLR_TYPE_STRING); MXS_FREE(binlog_row_image); MXS_FREE(query_text); @@ -644,6 +644,20 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) return blr_slave_send_var_value(router, slave, heading, server_id, BLR_TYPE_INT); } + else if ((strcasecmp(word, "@@gtid_current_pos") == 0) || (strcasecmp(word, "@@global.gtid_current_pos") == 0)) + { + char heading[40]; /* to ensure we match the case in query and response */ + char mariadb_gtid[GTID_MAX_LEN + 1]; + strcpy(heading, word); + MXS_FREE(query_text); + + /* Safely get router->mariadb_gtid */ + spinlock_acquire(&router->binlog_lock); + strcpy(mariadb_gtid, router->mariadb_gtid); + spinlock_release(&router->binlog_lock); + + return blr_slave_send_var_value(router, slave, heading, mariadb_gtid, BLR_TYPE_STRING); + } else if (strcasestr(word, "binlog_gtid_pos")) { unexpected = false;