diff --git a/server/modules/routing/binlogrouter/blr.c b/server/modules/routing/binlogrouter/blr.c index 34a52fdda..4693e44aa 100644 --- a/server/modules/routing/binlogrouter/blr.c +++ b/server/modules/routing/binlogrouter/blr.c @@ -289,7 +289,7 @@ createInstance(SERVICE *service, char **options) inst->m_errno = 0; inst->m_errmsg = NULL; - inst->pending_transaction = 0; + memset(&inst->pending_transaction, '\0', sizeof(PENDING_TRANSACTION)); inst->last_safe_pos = 0; inst->last_event_pos = 0; @@ -1081,7 +1081,6 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_FREE(slave); } - /** * Close a session with the router, this is the mechanism * by which a router may cleanup data structure etc. @@ -1341,7 +1340,7 @@ diagnostics(MXS_ROUTER *router, DCB *dcb) router_inst->current_pos); if (router_inst->trx_safe) { - if (router_inst->pending_transaction) + if (router_inst->pending_transaction.state != BLRM_NO_TRANSACTION) { dcb_printf(dcb, "\tCurrent open transaction pos: %lu\n", router_inst->binlog_position); @@ -2472,7 +2471,8 @@ destroyInstance(MXS_ROUTER *instance) inst->service->dbref->server->port, inst->binlog_name, inst->current_pos, inst->binlog_position); - if (inst->trx_safe && inst->pending_transaction) + if (inst->trx_safe && + inst->pending_transaction.state > BLRM_NO_TRANSACTION) { MXS_WARNING("%s stopped by shutdown: detected mid-transaction in binlog file %s, " "pos %lu, incomplete transaction starts at pos %lu", diff --git a/server/modules/routing/binlogrouter/blr.h b/server/modules/routing/binlogrouter/blr.h index 0b840727c..11cf7084b 100644 --- a/server/modules/routing/binlogrouter/blr.h +++ b/server/modules/routing/binlogrouter/blr.h @@ -447,8 +447,8 @@ typedef struct router_slave blr_thread_role_t lsi_sender_role; /*< Master or slave code sent */ THREAD lsi_sender_tid; /*< Who sent */ char lsi_binlog_name[BINLOG_FNAMELEN + 1]; /*< Which binlog file */ - uint32_t lsi_binlog_pos; /*< What position */ - void *encryption_ctx; /*< Encryption context */ + uint32_t lsi_binlog_pos; /*< What position */ + void *encryption_ctx; /*< Encryption context */ #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif @@ -523,6 +523,24 @@ typedef struct binlog_encryption_setup uint8_t key_id; } BINLOG_ENCRYPTION_SETUP; +/** Transaction States */ +typedef enum +{ + BLRM_NO_TRANSACTION, /*< No transaction */ + BLRM_TRANSACTION_START, /*< A transaction is open*/ + BLRM_COMMIT_SEEN, /*< Received COMMIT event in the current trx */ + BLRM_XID_EVENT_SEEN /*< Received XID event of current transaction */ +} master_transaction_t; + +/** Transaction Details */ +typedef struct pending_transaction +{ + char gtid[GTID_MAX_LEN + 1]; /** MariaDB 10.x GTID */ + master_transaction_t state; /** Transaction state */ + uint64_t start_pos; /** The BEGIN pos */ + uint64_t end_pos; /** The next_pos in COMMIT event*/ +} PENDING_TRANSACTION; + /** * The per instance data for the router. */ @@ -552,7 +570,7 @@ typedef struct router_instance char *binlogdir; /*< The directory with the binlog files */ SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */ int trx_safe; /*< Detect and handle partial transactions */ - int pending_transaction; /*< Pending transaction */ + PENDING_TRANSACTION pending_transaction; /*< Pending transaction */ enum blr_event_state master_event_state; /*< Packet read state */ REP_HEADER stored_header; /*< Relication header of the event the master is sending */ GWBUF *stored_event; /*< Buffer where partial events are stored */ diff --git a/server/modules/routing/binlogrouter/blr_file.c b/server/modules/routing/binlogrouter/blr_file.c index c4d80905d..d0fdee776 100644 --- a/server/modules/routing/binlogrouter/blr_file.c +++ b/server/modules/routing/binlogrouter/blr_file.c @@ -1317,7 +1317,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, unsigned long long last_known_commit = 4; REP_HEADER hdr; - int pending_transaction = 0; + int pending_transaction = BLRM_NO_TRANSACTION; int n; int db_name_len; uint8_t *ptr; @@ -1469,7 +1469,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, router->binlog_position = last_known_commit; router->current_safe_event = last_known_commit; router->current_pos = pos; - router->pending_transaction = 1; + router->pending_transaction.state = BLRM_TRANSACTION_START; MXS_ERROR("Binlog '%s' ends at position %lu and has an incomplete transaction at %lu. ", router->binlog_name, router->current_pos, router->binlog_position); @@ -2062,7 +2062,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, strcpy(router->mariadb_gtid, mariadb_gtid); - if (pending_transaction > 0) + if (pending_transaction > BLRM_NO_TRANSACTION) { MXS_ERROR("Transaction cannot be @ pos %llu: " "Another MariaDB 10 transaction (GTID %u-%u-%lu)" @@ -2119,7 +2119,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, /* A transaction starts with this event */ if (strncmp(statement_sql, "BEGIN", 5) == 0) { - if (pending_transaction > 0) + if (pending_transaction > BLRM_NO_TRANSACTION) { MXS_ERROR("Transaction cannot be @ pos %llu: " "Another transaction was opened at %llu", @@ -2132,7 +2132,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, } else { - pending_transaction = 1; + pending_transaction = BLRM_TRANSACTION_START; transaction_events = 0; event_bytes = 0; @@ -2146,9 +2146,9 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, /* Commit received for non transactional tables, i.e. MyISAM */ if (strncmp(statement_sql, "COMMIT", 6) == 0) { - if (pending_transaction > 0) + if (pending_transaction > BLRM_NO_TRANSACTION) { - pending_transaction = 3; + pending_transaction = BLRM_COMMIT_SEEN; if (!(debug & BLR_CHECK_ONLY)) { @@ -2171,10 +2171,9 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, if (hdr.event_type == XID_EVENT) { /* Commit received for a transactional tables, i.e. InnoDB */ - - if (pending_transaction > 0) + if (pending_transaction > BLRM_NO_TRANSACTION) { - pending_transaction = 2; + pending_transaction = BLRM_XID_EVENT_SEEN; if (!(debug & BLR_CHECK_ONLY)) { @@ -2184,14 +2183,14 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, } } - if (pending_transaction > 1) + if (pending_transaction > BLRM_TRANSACTION_START) { if (!(debug & BLR_CHECK_ONLY)) { MXS_DEBUG("< Transaction @ pos %llu, is now closed @ %llu. %lu events seen", last_known_commit, pos, transaction_events); } - pending_transaction = 0; + pending_transaction = BLRM_NO_TRANSACTION; last_known_commit = pos; /* Reset the event replacing indicator */ @@ -2306,7 +2305,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, router->binlog_position = last_known_commit; router->current_safe_event = last_known_commit; router->current_pos = pos; - router->pending_transaction = 1; + router->pending_transaction.state = BLRM_TRANSACTION_START; MXS_WARNING("an error has been found. " "Setting safe pos to %lu, current pos %lu", diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index 0b5599ec5..14dc2a605 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -111,15 +111,6 @@ extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave); static int keepalive = 1; -/** Transaction-Safety feature */ -typedef enum -{ - BLRM_NO_TRANSACTION, /*< No transaction */ - BLRM_TRANSACTION_START, /*< A transaction is open*/ - BLRM_COMMIT_SEEN, /*< Received COMMIT event in the current transaction */ - BLRM_XID_EVENT_SEEN /*< Received XID event of current transaction */ -} master_transaction_t; - /** Master Semi-Sync capability */ typedef enum { @@ -1355,7 +1346,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) */ spinlock_acquire(&router->binlog_lock); - if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == BLRM_NO_TRANSACTION)) + if (router->trx_safe == 0 || + (router->trx_safe && + router->pending_transaction.state == BLRM_NO_TRANSACTION)) { /* no pending transaction: set current_pos to binlog_position */ router->binlog_position = router->current_pos; @@ -1368,14 +1361,14 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) * Only complete transactions should be sent to sleves */ - if (router->mariadb10_compat && hdr.event_type == MARIADB10_GTID_EVENT) + if (router->mariadb10_compat && + hdr.event_type == MARIADB10_GTID_EVENT) { /** * If MariaDB 10 compatibility: * check for MARIADB10_GTID_EVENT with flags: * this is the TRASACTION START detection. * - * Save GTID anyway and if trx_safe is set mark the transaction start */ uint64_t n_sequence; @@ -1395,7 +1388,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu", mariadb_gtid, router->binlog_name, - router->binlog_position); + router->current_pos); spinlock_acquire(&router->binlog_lock); @@ -1407,7 +1400,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) */ if (router->trx_safe) { - if (router->pending_transaction > 0) + if (router->pending_transaction.state > BLRM_NO_TRANSACTION) { MXS_ERROR("A MariaDB 10 transaction " "is already open " @@ -1418,7 +1411,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) router->current_pos); } - router->pending_transaction = BLRM_TRANSACTION_START; + router->pending_transaction.state = BLRM_TRANSACTION_START; } @@ -1455,7 +1448,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) /* Check for BEGIN (it comes for START TRANSACTION too) */ if (strncmp(statement_sql, "BEGIN", 5) == 0) { - if (router->pending_transaction > BLRM_NO_TRANSACTION) + if (router->pending_transaction.state > BLRM_NO_TRANSACTION) { MXS_ERROR("A transaction is already open " "@ %lu and a new one starts @ %lu", @@ -1464,13 +1457,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) } - router->pending_transaction = BLRM_TRANSACTION_START; + router->pending_transaction.state = BLRM_TRANSACTION_START; } /* Check for COMMIT in non transactional store engines */ if (strncmp(statement_sql, "COMMIT", 6) == 0) { - router->pending_transaction = BLRM_COMMIT_SEEN; + router->pending_transaction.state = BLRM_COMMIT_SEEN; } spinlock_release(&router->binlog_lock); @@ -1483,9 +1476,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { spinlock_acquire(&router->binlog_lock); - if (router->pending_transaction) + if (router->pending_transaction.state >= BLRM_TRANSACTION_START) { - router->pending_transaction = BLRM_XID_EVENT_SEEN; + router->pending_transaction.state = BLRM_XID_EVENT_SEEN; } spinlock_release(&router->binlog_lock); } @@ -1546,7 +1539,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) router->stats.n_heartbeats++; - if (router->pending_transaction) + if (router->pending_transaction.state > BLRM_NO_TRANSACTION) { router->stats.lastReply = time(0); } @@ -1613,7 +1606,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) spinlock_acquire(&router->binlog_lock); - if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == BLRM_NO_TRANSACTION)) + if (router->trx_safe == 0 || + (router->trx_safe && + router->pending_transaction.state == BLRM_NO_TRANSACTION)) { router->binlog_position = router->current_pos; router->current_safe_event = router->last_event_pos; @@ -1634,7 +1629,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) * router->current_pos */ - if (router->pending_transaction > BLRM_TRANSACTION_START) + if (router->pending_transaction.state > BLRM_TRANSACTION_START) { spinlock_release(&router->binlog_lock); @@ -1645,7 +1640,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) spinlock_acquire(&router->binlog_lock); router->binlog_position = router->current_pos; - router->pending_transaction = BLRM_NO_TRANSACTION; + router->pending_transaction.state = BLRM_NO_TRANSACTION; spinlock_release(&router->binlog_lock); } diff --git a/server/modules/routing/binlogrouter/blr_slave.c b/server/modules/routing/binlogrouter/blr_slave.c index 342c4a482..6c60dccfe 100644 --- a/server/modules/routing/binlogrouter/blr_slave.c +++ b/server/modules/routing/binlogrouter/blr_slave.c @@ -1185,7 +1185,8 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) return blr_slave_send_ok(router, slave); } - if (router->trx_safe && router->pending_transaction) + if (router->trx_safe && + router->pending_transaction.state > BLRM_NO_TRANSACTION) { if (strcmp(router->binlog_name, router->prevbinlog) != 0) { @@ -2163,7 +2164,8 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue bool force_disconnect = false; spinlock_acquire(&router->binlog_lock); - if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 && + if (router->pending_transaction.state > BLRM_NO_TRANSACTION && + strcmp(router->binlog_name, slave->binlogfile) == 0 && (slave->binlog_pos > router->binlog_position)) { force_disconnect = true; @@ -2413,7 +2415,8 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large) do_return = 0; /* check for a pending transaction and safe position */ - if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 && + if (router->pending_transaction.state > BLRM_NO_TRANSACTION && + strcmp(router->binlog_name, slave->binlogfile) == 0 && (slave->binlog_pos > router->binlog_position)) { do_return = 1; @@ -3596,7 +3599,8 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) router->service->dbref->server->port, router->binlog_name, router->current_pos, router->binlog_position); - if (router->trx_safe && router->pending_transaction) + if (router->trx_safe && + router->pending_transaction.state > BLRM_NO_TRANSACTION) { char message[BINLOG_ERROR_MSG_LEN + 1] = ""; snprintf(message, BINLOG_ERROR_MSG_LEN, @@ -3650,7 +3654,8 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) /* create a new binlog or just use current one */ if (strlen(router->prevbinlog) && strcmp(router->prevbinlog, router->binlog_name)) { - if (router->trx_safe && router->pending_transaction) + if (router->trx_safe && + router->pending_transaction.state > BLRM_NO_TRANSACTION) { char msg[BINLOG_ERROR_MSG_LEN + 1] = ""; char file[PATH_MAX + 1] = ""; @@ -3694,7 +3699,7 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) spinlock_acquire(&router->lock); - router->pending_transaction = 0; + router->pending_transaction.state = BLRM_NO_TRANSACTION; router->last_safe_pos = 0; router->master_state = BLRM_UNCONNECTED; router->current_pos = 4;