MXS-1075: pending_transaction struct added
New pending_transaction added and master_transaction_t values are now used in all files
This commit is contained in:
@ -289,7 +289,7 @@ createInstance(SERVICE *service, char **options)
|
||||
inst->m_errno = 0;
|
||||
inst->m_errmsg = NULL;
|
||||
|
||||
inst->pending_transaction = 0;
|
||||
memset(&inst->pending_transaction, '\0', sizeof(PENDING_TRANSACTION));
|
||||
inst->last_safe_pos = 0;
|
||||
inst->last_event_pos = 0;
|
||||
|
||||
@ -1081,7 +1081,6 @@ static void freeSession(MXS_ROUTER* router_instance,
|
||||
MXS_FREE(slave);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Close a session with the router, this is the mechanism
|
||||
* by which a router may cleanup data structure etc.
|
||||
@ -1341,7 +1340,7 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
router_inst->current_pos);
|
||||
if (router_inst->trx_safe)
|
||||
{
|
||||
if (router_inst->pending_transaction)
|
||||
if (router_inst->pending_transaction.state != BLRM_NO_TRANSACTION)
|
||||
{
|
||||
dcb_printf(dcb, "\tCurrent open transaction pos: %lu\n",
|
||||
router_inst->binlog_position);
|
||||
@ -2472,7 +2471,8 @@ destroyInstance(MXS_ROUTER *instance)
|
||||
inst->service->dbref->server->port,
|
||||
inst->binlog_name, inst->current_pos, inst->binlog_position);
|
||||
|
||||
if (inst->trx_safe && inst->pending_transaction)
|
||||
if (inst->trx_safe &&
|
||||
inst->pending_transaction.state > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
MXS_WARNING("%s stopped by shutdown: detected mid-transaction in binlog file %s, "
|
||||
"pos %lu, incomplete transaction starts at pos %lu",
|
||||
|
@ -523,6 +523,24 @@ typedef struct binlog_encryption_setup
|
||||
uint8_t key_id;
|
||||
} BINLOG_ENCRYPTION_SETUP;
|
||||
|
||||
/** Transaction States */
|
||||
typedef enum
|
||||
{
|
||||
BLRM_NO_TRANSACTION, /*< No transaction */
|
||||
BLRM_TRANSACTION_START, /*< A transaction is open*/
|
||||
BLRM_COMMIT_SEEN, /*< Received COMMIT event in the current trx */
|
||||
BLRM_XID_EVENT_SEEN /*< Received XID event of current transaction */
|
||||
} master_transaction_t;
|
||||
|
||||
/** Transaction Details */
|
||||
typedef struct pending_transaction
|
||||
{
|
||||
char gtid[GTID_MAX_LEN + 1]; /** MariaDB 10.x GTID */
|
||||
master_transaction_t state; /** Transaction state */
|
||||
uint64_t start_pos; /** The BEGIN pos */
|
||||
uint64_t end_pos; /** The next_pos in COMMIT event*/
|
||||
} PENDING_TRANSACTION;
|
||||
|
||||
/**
|
||||
* The per instance data for the router.
|
||||
*/
|
||||
@ -552,7 +570,7 @@ typedef struct router_instance
|
||||
char *binlogdir; /*< The directory with the binlog files */
|
||||
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
|
||||
int trx_safe; /*< Detect and handle partial transactions */
|
||||
int pending_transaction; /*< Pending transaction */
|
||||
PENDING_TRANSACTION pending_transaction; /*< Pending transaction */
|
||||
enum blr_event_state master_event_state; /*< Packet read state */
|
||||
REP_HEADER stored_header; /*< Relication header of the event the master is sending */
|
||||
GWBUF *stored_event; /*< Buffer where partial events are stored */
|
||||
|
@ -1317,7 +1317,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
unsigned long long last_known_commit = 4;
|
||||
|
||||
REP_HEADER hdr;
|
||||
int pending_transaction = 0;
|
||||
int pending_transaction = BLRM_NO_TRANSACTION;
|
||||
int n;
|
||||
int db_name_len;
|
||||
uint8_t *ptr;
|
||||
@ -1469,7 +1469,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
router->binlog_position = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_pos = pos;
|
||||
router->pending_transaction = 1;
|
||||
router->pending_transaction.state = BLRM_TRANSACTION_START;
|
||||
|
||||
MXS_ERROR("Binlog '%s' ends at position %lu and has an incomplete transaction at %lu. ",
|
||||
router->binlog_name, router->current_pos, router->binlog_position);
|
||||
@ -2062,7 +2062,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
|
||||
strcpy(router->mariadb_gtid, mariadb_gtid);
|
||||
|
||||
if (pending_transaction > 0)
|
||||
if (pending_transaction > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
MXS_ERROR("Transaction cannot be @ pos %llu: "
|
||||
"Another MariaDB 10 transaction (GTID %u-%u-%lu)"
|
||||
@ -2119,7 +2119,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
/* A transaction starts with this event */
|
||||
if (strncmp(statement_sql, "BEGIN", 5) == 0)
|
||||
{
|
||||
if (pending_transaction > 0)
|
||||
if (pending_transaction > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
MXS_ERROR("Transaction cannot be @ pos %llu: "
|
||||
"Another transaction was opened at %llu",
|
||||
@ -2132,7 +2132,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
}
|
||||
else
|
||||
{
|
||||
pending_transaction = 1;
|
||||
pending_transaction = BLRM_TRANSACTION_START;
|
||||
|
||||
transaction_events = 0;
|
||||
event_bytes = 0;
|
||||
@ -2146,9 +2146,9 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
/* Commit received for non transactional tables, i.e. MyISAM */
|
||||
if (strncmp(statement_sql, "COMMIT", 6) == 0)
|
||||
{
|
||||
if (pending_transaction > 0)
|
||||
if (pending_transaction > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
pending_transaction = 3;
|
||||
pending_transaction = BLRM_COMMIT_SEEN;
|
||||
|
||||
if (!(debug & BLR_CHECK_ONLY))
|
||||
{
|
||||
@ -2171,10 +2171,9 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
if (hdr.event_type == XID_EVENT)
|
||||
{
|
||||
/* Commit received for a transactional tables, i.e. InnoDB */
|
||||
|
||||
if (pending_transaction > 0)
|
||||
if (pending_transaction > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
pending_transaction = 2;
|
||||
pending_transaction = BLRM_XID_EVENT_SEEN;
|
||||
|
||||
if (!(debug & BLR_CHECK_ONLY))
|
||||
{
|
||||
@ -2184,14 +2183,14 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
}
|
||||
}
|
||||
|
||||
if (pending_transaction > 1)
|
||||
if (pending_transaction > BLRM_TRANSACTION_START)
|
||||
{
|
||||
if (!(debug & BLR_CHECK_ONLY))
|
||||
{
|
||||
MXS_DEBUG("< Transaction @ pos %llu, is now closed @ %llu. %lu events seen",
|
||||
last_known_commit, pos, transaction_events);
|
||||
}
|
||||
pending_transaction = 0;
|
||||
pending_transaction = BLRM_NO_TRANSACTION;
|
||||
last_known_commit = pos;
|
||||
|
||||
/* Reset the event replacing indicator */
|
||||
@ -2306,7 +2305,7 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
router->binlog_position = last_known_commit;
|
||||
router->current_safe_event = last_known_commit;
|
||||
router->current_pos = pos;
|
||||
router->pending_transaction = 1;
|
||||
router->pending_transaction.state = BLRM_TRANSACTION_START;
|
||||
|
||||
MXS_WARNING("an error has been found. "
|
||||
"Setting safe pos to %lu, current pos %lu",
|
||||
|
@ -111,15 +111,6 @@ extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
|
||||
|
||||
static int keepalive = 1;
|
||||
|
||||
/** Transaction-Safety feature */
|
||||
typedef enum
|
||||
{
|
||||
BLRM_NO_TRANSACTION, /*< No transaction */
|
||||
BLRM_TRANSACTION_START, /*< A transaction is open*/
|
||||
BLRM_COMMIT_SEEN, /*< Received COMMIT event in the current transaction */
|
||||
BLRM_XID_EVENT_SEEN /*< Received XID event of current transaction */
|
||||
} master_transaction_t;
|
||||
|
||||
/** Master Semi-Sync capability */
|
||||
typedef enum
|
||||
{
|
||||
@ -1355,7 +1346,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
*/
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == BLRM_NO_TRANSACTION))
|
||||
if (router->trx_safe == 0 ||
|
||||
(router->trx_safe &&
|
||||
router->pending_transaction.state == BLRM_NO_TRANSACTION))
|
||||
{
|
||||
/* no pending transaction: set current_pos to binlog_position */
|
||||
router->binlog_position = router->current_pos;
|
||||
@ -1368,14 +1361,14 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
* Only complete transactions should be sent to sleves
|
||||
*/
|
||||
|
||||
if (router->mariadb10_compat && hdr.event_type == MARIADB10_GTID_EVENT)
|
||||
if (router->mariadb10_compat &&
|
||||
hdr.event_type == MARIADB10_GTID_EVENT)
|
||||
{
|
||||
/**
|
||||
* If MariaDB 10 compatibility:
|
||||
* check for MARIADB10_GTID_EVENT with flags:
|
||||
* this is the TRASACTION START detection.
|
||||
*
|
||||
* Save GTID anyway and if trx_safe is set mark the transaction start
|
||||
*/
|
||||
|
||||
uint64_t n_sequence;
|
||||
@ -1395,7 +1388,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
|
||||
mariadb_gtid,
|
||||
router->binlog_name,
|
||||
router->binlog_position);
|
||||
router->current_pos);
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
@ -1407,7 +1400,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
*/
|
||||
if (router->trx_safe)
|
||||
{
|
||||
if (router->pending_transaction > 0)
|
||||
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
MXS_ERROR("A MariaDB 10 transaction "
|
||||
"is already open "
|
||||
@ -1418,7 +1411,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
router->current_pos);
|
||||
}
|
||||
|
||||
router->pending_transaction = BLRM_TRANSACTION_START;
|
||||
router->pending_transaction.state = BLRM_TRANSACTION_START;
|
||||
|
||||
}
|
||||
|
||||
@ -1455,7 +1448,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
/* Check for BEGIN (it comes for START TRANSACTION too) */
|
||||
if (strncmp(statement_sql, "BEGIN", 5) == 0)
|
||||
{
|
||||
if (router->pending_transaction > BLRM_NO_TRANSACTION)
|
||||
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
MXS_ERROR("A transaction is already open "
|
||||
"@ %lu and a new one starts @ %lu",
|
||||
@ -1464,13 +1457,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
}
|
||||
|
||||
router->pending_transaction = BLRM_TRANSACTION_START;
|
||||
router->pending_transaction.state = BLRM_TRANSACTION_START;
|
||||
}
|
||||
|
||||
/* Check for COMMIT in non transactional store engines */
|
||||
if (strncmp(statement_sql, "COMMIT", 6) == 0)
|
||||
{
|
||||
router->pending_transaction = BLRM_COMMIT_SEEN;
|
||||
router->pending_transaction.state = BLRM_COMMIT_SEEN;
|
||||
}
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
@ -1483,9 +1476,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
{
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
if (router->pending_transaction)
|
||||
if (router->pending_transaction.state >= BLRM_TRANSACTION_START)
|
||||
{
|
||||
router->pending_transaction = BLRM_XID_EVENT_SEEN;
|
||||
router->pending_transaction.state = BLRM_XID_EVENT_SEEN;
|
||||
}
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
@ -1546,7 +1539,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
router->stats.n_heartbeats++;
|
||||
|
||||
if (router->pending_transaction)
|
||||
if (router->pending_transaction.state > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
router->stats.lastReply = time(0);
|
||||
}
|
||||
@ -1613,7 +1606,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == BLRM_NO_TRANSACTION))
|
||||
if (router->trx_safe == 0 ||
|
||||
(router->trx_safe &&
|
||||
router->pending_transaction.state == BLRM_NO_TRANSACTION))
|
||||
{
|
||||
router->binlog_position = router->current_pos;
|
||||
router->current_safe_event = router->last_event_pos;
|
||||
@ -1634,7 +1629,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
* router->current_pos
|
||||
*/
|
||||
|
||||
if (router->pending_transaction > BLRM_TRANSACTION_START)
|
||||
if (router->pending_transaction.state > BLRM_TRANSACTION_START)
|
||||
{
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
@ -1645,7 +1640,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
router->binlog_position = router->current_pos;
|
||||
router->pending_transaction = BLRM_NO_TRANSACTION;
|
||||
router->pending_transaction.state = BLRM_NO_TRANSACTION;
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
|
@ -1185,7 +1185,8 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
return blr_slave_send_ok(router, slave);
|
||||
}
|
||||
|
||||
if (router->trx_safe && router->pending_transaction)
|
||||
if (router->trx_safe &&
|
||||
router->pending_transaction.state > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
if (strcmp(router->binlog_name, router->prevbinlog) != 0)
|
||||
{
|
||||
@ -2163,7 +2164,8 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
|
||||
bool force_disconnect = false;
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
if (router->pending_transaction.state > BLRM_NO_TRANSACTION &&
|
||||
strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
(slave->binlog_pos > router->binlog_position))
|
||||
{
|
||||
force_disconnect = true;
|
||||
@ -2413,7 +2415,8 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
|
||||
do_return = 0;
|
||||
|
||||
/* check for a pending transaction and safe position */
|
||||
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
if (router->pending_transaction.state > BLRM_NO_TRANSACTION &&
|
||||
strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
(slave->binlog_pos > router->binlog_position))
|
||||
{
|
||||
do_return = 1;
|
||||
@ -3596,7 +3599,8 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
router->service->dbref->server->port,
|
||||
router->binlog_name, router->current_pos, router->binlog_position);
|
||||
|
||||
if (router->trx_safe && router->pending_transaction)
|
||||
if (router->trx_safe &&
|
||||
router->pending_transaction.state > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
char message[BINLOG_ERROR_MSG_LEN + 1] = "";
|
||||
snprintf(message, BINLOG_ERROR_MSG_LEN,
|
||||
@ -3650,7 +3654,8 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
/* create a new binlog or just use current one */
|
||||
if (strlen(router->prevbinlog) && strcmp(router->prevbinlog, router->binlog_name))
|
||||
{
|
||||
if (router->trx_safe && router->pending_transaction)
|
||||
if (router->trx_safe &&
|
||||
router->pending_transaction.state > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
char msg[BINLOG_ERROR_MSG_LEN + 1] = "";
|
||||
char file[PATH_MAX + 1] = "";
|
||||
@ -3694,7 +3699,7 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
router->pending_transaction = 0;
|
||||
router->pending_transaction.state = BLRM_NO_TRANSACTION;
|
||||
router->last_safe_pos = 0;
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
router->current_pos = 4;
|
||||
|
Reference in New Issue
Block a user