diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index 14dc2a605..79b3b972e 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -1357,49 +1357,50 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) spinlock_release(&router->binlog_lock); /** - * Detect transactions in events + * Detect transactions in events if trx_safe is set: * Only complete transactions should be sent to sleves + * + * Now looking for: + * - QUERY_EVENT: BEGIN | START TRANSACTION | COMMIT + * - MariadDB 10 GTID_EVENT + * - XID_EVENT for transactional storage engines */ - if (router->mariadb10_compat && - hdr.event_type == MARIADB10_GTID_EVENT) + if (router->trx_safe) { - /** - * If MariaDB 10 compatibility: - * check for MARIADB10_GTID_EVENT with flags: - * this is the TRASACTION START detection. - * - */ - - uint64_t n_sequence; - uint32_t domainid; - unsigned int flags; - n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64); - domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32); - flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4); - - if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) + if (router->mariadb10_compat && + hdr.event_type == MARIADB10_GTID_EVENT) { - char mariadb_gtid[GTID_MAX_LEN + 1]; - snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", - domainid, hdr.serverid, - n_sequence); - - MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu", - mariadb_gtid, - router->binlog_name, - router->current_pos); - - spinlock_acquire(&router->binlog_lock); - - /* Save GTID */ - strcpy(router->mariadb_gtid, mariadb_gtid); - /** - * Now mark the new open transaction + * If MariaDB 10 compatibility: + * check for MARIADB10_GTID_EVENT with flags: + * this is the TRASACTION START detection. */ - if (router->trx_safe) + + uint64_t n_sequence; + uint32_t domainid; + unsigned int flags; + n_sequence = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN, 64); + domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32); + flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4); + + if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) { + char mariadb_gtid[GTID_MAX_LEN + 1]; + snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", + domainid, hdr.serverid, + n_sequence); + + MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu", + mariadb_gtid, + router->binlog_name, + router->current_pos); + + spinlock_acquire(&router->binlog_lock); + + /** + * Now mark the new open transaction + */ if (router->pending_transaction.state > BLRM_NO_TRANSACTION) { MXS_ERROR("A MariaDB 10 transaction " @@ -1413,20 +1414,15 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) router->pending_transaction.state = BLRM_TRANSACTION_START; + /* Save the pending GTID details */ + strcpy(router->pending_transaction.gtid, mariadb_gtid); + router->pending_transaction.start_pos = router->current_pos; + router->pending_transaction.end_pos = 0; } spinlock_release(&router->binlog_lock); } - } - /** - * If trx_safe is set then look for: - * - QUERY_EVENT: BEGIN | START TRANSACTION | COMMIT - * - XID_EVENT for transactional storage edngines - */ - - if (router->trx_safe) - { if (hdr.event_type == QUERY_EVENT) { char *statement_sql; @@ -1458,6 +1454,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) } router->pending_transaction.state = BLRM_TRANSACTION_START; + router->pending_transaction.start_pos = router->current_pos; + router->pending_transaction.end_pos = 0; } /* Check for COMMIT in non transactional store engines */ @@ -1465,7 +1463,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { router->pending_transaction.state = BLRM_COMMIT_SEEN; } - spinlock_release(&router->binlog_lock); MXS_FREE(statement_sql); @@ -1624,19 +1621,32 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) * If transaction is closed: * * 1) Notify clients events can be read - * from router->binlog_position - * 2) set router->binlog_position to + * from router->binlog_position + * 2) Update last seen MariaDB 10 GTID + * 3) set router->binlog_position to * router->current_pos */ if (router->pending_transaction.state > BLRM_TRANSACTION_START) { + /* Update last seen MariaDB GTID */ + if (router->mariadb10_compat) + { + strcpy(router->mariadb_gtid, router->pending_transaction.gtid); + + /* The transaction has been saved. + * this poins to end of binlog: + * i.e. the position of a new event + */ + router->pending_transaction.end_pos = router->current_pos; + } + spinlock_release(&router->binlog_lock); /* Notify clients events can be read */ blr_notify_all_slaves(router); - /* update binlog_position and set pending to 0 */ + /* update binlog_position and set pending to NO_TRX */ spinlock_acquire(&router->binlog_lock); router->binlog_position = router->current_pos; diff --git a/server/modules/routing/binlogrouter/blr_slave.c b/server/modules/routing/binlogrouter/blr_slave.c index 6c60dccfe..45eb8ee4c 100644 --- a/server/modules/routing/binlogrouter/blr_slave.c +++ b/server/modules/routing/binlogrouter/blr_slave.c @@ -646,15 +646,19 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) } else if ((strcasecmp(word, "@@gtid_current_pos") == 0) || (strcasecmp(word, "@@global.gtid_current_pos") == 0)) { - char heading[40]; /* to ensure we match the case in query and response */ + char heading[40]; char mariadb_gtid[GTID_MAX_LEN + 1]; + mariadb_gtid[0] = '\0'; strcpy(heading, word); + MXS_FREE(query_text); - /* Safely get router->mariadb_gtid */ - spinlock_acquire(&router->binlog_lock); - strcpy(mariadb_gtid, router->mariadb_gtid); - spinlock_release(&router->binlog_lock); + if (router->mariadb10_compat) + { + spinlock_acquire(&router->binlog_lock); + strcpy(mariadb_gtid, router->mariadb_gtid); + spinlock_release(&router->binlog_lock); + } return blr_slave_send_var_value(router, slave, heading, mariadb_gtid, BLR_TYPE_STRING); }