MXS-1266: Standalone events in transaction (no COMMIT event) are now handled

Standalone events in transaction (no COMMIT event) are now handled:

the GTID is saved to gtid_maps storage.

Transaction detection assumes there is only one query_event after GTID
event with MARIADB_FL_STANDALONE flag set.
This commit is contained in:
MassimilianoPinto 2017-05-19 11:43:43 +02:00
parent 12c862a2c2
commit 362824579d
4 changed files with 155 additions and 99 deletions

View File

@ -543,7 +543,8 @@ typedef enum
BLRM_NO_TRANSACTION, /*< No transaction */
BLRM_TRANSACTION_START, /*< A transaction is open*/
BLRM_COMMIT_SEEN, /*< Received COMMIT event in the current trx */
BLRM_XID_EVENT_SEEN /*< Received XID event of current transaction */
BLRM_XID_EVENT_SEEN, /*< Received XID event of current transaction */
BLRM_STANDALONE_SEEN /*< Received a standalone event, ie: a DDL */
} master_transaction_t;
/** MariaDB GTID elements */
@ -562,6 +563,8 @@ typedef struct pending_transaction
uint64_t start_pos; /** The BEGIN pos */
uint64_t end_pos; /** The next_pos in COMMIT event*/
MARIADB_GTID_ELEMS gtid_elms; /* MariaDB 10.x GTID components */
bool standalone; /** Standalone event, such as DDL
* no terminating COMMIT */
} PENDING_TRANSACTION;
/** MariaDB GTID info */

View File

@ -2051,54 +2051,58 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
domainid = extract_field(ptr + 8, 32);
flags = *(ptr + 8 + 4);
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
/**
* Detect whether it's a standalone transaction:
* there is no terminating COMMIT event.
* i.e: a DDL or FLUSH TABLES etc
*/
router->pending_transaction.standalone = flags & MARIADB_FL_STANDALONE;
if (pending_transaction > BLRM_NO_TRANSACTION)
{
if (pending_transaction > BLRM_NO_TRANSACTION)
MXS_ERROR("Transaction cannot be @ pos %llu: "
"Another MariaDB 10 transaction (GTID %u-%u-%lu)"
" was opened at %llu",
pos, domainid, hdr.serverid,
n_sequence, last_known_commit);
gwbuf_free(result);
break;
}
else
{
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid,
GTID_MAX_LEN,
"%u-%u-%lu",
domainid,
hdr.serverid,
n_sequence);
pending_transaction = BLRM_TRANSACTION_START;
router->pending_transaction.start_pos = pos;
router->pending_transaction.end_pos = 0;
/* Set MariaDB GTID */
if (router->mariadb10_gtid)
{
MXS_ERROR("Transaction cannot be @ pos %llu: "
"Another MariaDB 10 transaction (GTID %u-%u-%lu)"
" was opened at %llu",
pos, domainid, hdr.serverid,
n_sequence, last_known_commit);
strcpy(router->pending_transaction.gtid, mariadb_gtid);
gwbuf_free(result);
break;
/* Save the pending GTID components */
router->pending_transaction.gtid_elms.domain_id = domainid;
router->pending_transaction.gtid_elms.server_id = hdr.serverid;
router->pending_transaction.gtid_elms.seq_no = n_sequence;
}
else
transaction_events = 0;
event_bytes = 0;
if (!(debug & BLR_CHECK_ONLY))
{
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid,
GTID_MAX_LEN,
"%u-%u-%lu",
domainid,
hdr.serverid,
n_sequence);
pending_transaction = BLRM_TRANSACTION_START;
router->pending_transaction.start_pos = pos;
router->pending_transaction.end_pos = 0;
/* Set MariaDB GTID */
if (router->mariadb10_gtid)
{
strcpy(router->pending_transaction.gtid, mariadb_gtid);
/* Save the pending GTID components */
router->pending_transaction.gtid_elms.domain_id = domainid;
router->pending_transaction.gtid_elms.server_id = hdr.serverid;
router->pending_transaction.gtid_elms.seq_no = n_sequence;
}
transaction_events = 0;
event_bytes = 0;
if (!(debug & BLR_CHECK_ONLY))
{
MXS_DEBUG("> MariaDB 10 Transaction (GTID %u-%u-%lu)"
" starts @ pos %llu",
domainid, hdr.serverid, n_sequence, pos);
}
MXS_DEBUG("> MariaDB 10 Transaction (GTID %u-%u-%lu)"
" starts @ pos %llu",
domainid, hdr.serverid, n_sequence, pos);
}
}
}
@ -2179,6 +2183,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
*
* Check for BEGIN ( ONLY for mysql 5.6, mariadb 5.5 )
* Check for COMMIT (not transactional engines)
* Check for pending standalone transaction
*/
if (hdr.event_type == QUERY_EVENT)
@ -2238,11 +2243,33 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
if (!(debug & BLR_CHECK_ONLY))
{
MXS_DEBUG(" Transaction @ pos %llu, closing @ %llu",
last_known_commit, pos);
MXS_DEBUG(" Transaction @ pos %llu,"
" closing @ %llu",
last_known_commit,
pos);
}
}
}
/**
* If it's a standalone transaction event we're done:
* This query event, only one, terminates the
* transaction.
*/
if (pending_transaction > BLRM_NO_TRANSACTION &&
router->pending_transaction.standalone)
{
pending_transaction = BLRM_STANDALONE_SEEN;
if (!(debug & BLR_CHECK_ONLY))
{
MXS_DEBUG(" Standalone Transaction @ pos %llu,"
" closing @ %llu",
last_known_commit,
pos);
}
}
MXS_FREE(statement_sql);
}
else
@ -2263,8 +2290,10 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
if (!(debug & BLR_CHECK_ONLY))
{
MXS_DEBUG(" Transaction XID @ pos %llu, closing @ %llu",
last_known_commit, pos);
MXS_DEBUG(" Transaction XID @ pos %llu,"
" closing @ %llu",
last_known_commit,
pos);
}
}
}
@ -2273,11 +2302,15 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
{
if (!(debug & BLR_CHECK_ONLY))
{
MXS_DEBUG("< Transaction @ pos %llu, is now closed @ %llu. %lu events seen",
last_known_commit, pos, transaction_events);
MXS_DEBUG("< Transaction @ pos %llu, is now closed @ %llu."
" %lu events seen",
last_known_commit,
pos,
transaction_events);
}
pending_transaction = BLRM_NO_TRANSACTION;
router->pending_transaction.standalone = false;
router->pending_transaction.end_pos = hdr.next_pos;
@ -3485,7 +3518,6 @@ static int gtid_select_cb(void *data, int cols, char** values, char** names)
* @param result The (allocated) ouput data to fill
* @return True if with found GTID or false
*/
bool blr_fetch_mariadb_gtid(ROUTER_SLAVE *slave,
const char *gtid,
MARIADB_GTID_INFO *result)

View File

@ -1041,6 +1041,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
if (router->trx_safe)
{
// MariaDB 10 GTID event check
if (router->mariadb10_compat &&
hdr.event_type == MARIADB10_GTID_EVENT)
{
@ -1057,57 +1058,62 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
domainid = extract_field(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8, 32);
flags = *(ptr + MYSQL_HEADER_LEN + 1 + BINLOG_EVENT_HDR_LEN + 8 + 4);
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
spinlock_acquire(&router->binlog_lock);
/**
* Detect whether it's a standalone transaction:
* there is no terminating COMMIT event.
* i.e: a DDL or FLUSH TABLES etc
*/
router->pending_transaction.standalone = flags & MARIADB_FL_STANDALONE;
/**
* Now mark the new open transaction
*/
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
{
spinlock_acquire(&router->binlog_lock);
/**
* Now mark the new open transaction
*/
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
{
MXS_ERROR("A MariaDB 10 transaction "
"is already open "
"@ %lu (GTID %u-%u-%lu) and "
"a new one starts @ %lu",
router->binlog_position,
domainid,
hdr.serverid,
n_sequence,
router->current_pos);
}
router->pending_transaction.state = BLRM_TRANSACTION_START;
/* Handle MariaDB 10 GTID */
if (router->mariadb10_gtid)
{
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
domainid,
hdr.serverid,
n_sequence);
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
mariadb_gtid,
router->binlog_name,
router->current_pos);
/* Save the pending GTID string value */
strcpy(router->pending_transaction.gtid, mariadb_gtid);
/* Save the pending GTID components */
router->pending_transaction.gtid_elms.domain_id = domainid;
router->pending_transaction.gtid_elms.server_id = hdr.serverid;
router->pending_transaction.gtid_elms.seq_no = n_sequence;
}
router->pending_transaction.start_pos = router->current_pos;
router->pending_transaction.end_pos = 0;
spinlock_release(&router->binlog_lock);
MXS_ERROR("A MariaDB 10 transaction "
"is already open "
"@ %lu (GTID %u-%u-%lu) and "
"a new one starts @ %lu",
router->binlog_position,
domainid,
hdr.serverid,
n_sequence,
router->current_pos);
}
router->pending_transaction.state = BLRM_TRANSACTION_START;
/* Handle MariaDB 10 GTID */
if (router->mariadb10_gtid)
{
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
domainid,
hdr.serverid,
n_sequence);
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
mariadb_gtid,
router->binlog_name,
router->current_pos);
/* Save the pending GTID string value */
strcpy(router->pending_transaction.gtid, mariadb_gtid);
/* Save the pending GTID components */
router->pending_transaction.gtid_elms.domain_id = domainid;
router->pending_transaction.gtid_elms.server_id = hdr.serverid;
router->pending_transaction.gtid_elms.seq_no = n_sequence;
}
router->pending_transaction.start_pos = router->current_pos;
router->pending_transaction.end_pos = 0;
spinlock_release(&router->binlog_lock);
}
// Query Event check
if (hdr.event_type == QUERY_EVENT)
{
char *statement_sql;
@ -1148,6 +1154,17 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
router->pending_transaction.state = BLRM_COMMIT_SEEN;
}
/**
* If it's a standalone transaction event we're done:
* this query event, only one, terminates the transaction.
*/
if (router->pending_transaction.state > BLRM_NO_TRANSACTION &&
router->pending_transaction.standalone)
{
router->pending_transaction.state = BLRM_STANDALONE_SEEN;
}
spinlock_release(&router->binlog_lock);
MXS_FREE(statement_sql);
@ -1326,7 +1343,10 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos;
/* Set no pending transaction and no standalone */
router->pending_transaction.state = BLRM_NO_TRANSACTION;
router->pending_transaction.standalone = false;
spinlock_release(&router->binlog_lock);
}

View File

@ -6826,7 +6826,8 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router,
MXS_ERROR("%s: Incomplete set command.", router->service->name);
return false;
}
else if ((strcasecmp(word, "autocommit") == 0) || (strcasecmp(word, "@@session.autocommit") == 0))
else if ((strcasecmp(word, "autocommit") == 0) ||
(strcasecmp(word, "@@session.autocommit") == 0))
{
blr_slave_send_ok(router, slave);
return true;