diff --git a/server/modules/routing/binlogrouter/blr.c b/server/modules/routing/binlogrouter/blr.c index 4693e44aa..ea16eb88b 100644 --- a/server/modules/routing/binlogrouter/blr.c +++ b/server/modules/routing/binlogrouter/blr.c @@ -117,6 +117,8 @@ static void destroyInstance(MXS_ROUTER *instance); bool blr_extract_key(const char *linebuf, int nline, ROUTER_INSTANCE *router); bool blr_get_encryption_key(ROUTER_INSTANCE *router); int blr_parse_key_file(ROUTER_INSTANCE *router); +static MARIADB_GTID_INFO *mariadb_gtid_info_dup(const MARIADB_GTID_INFO *in); +static void mariadb_gtid_info_free(MARIADB_GTID_INFO *in); static void stats_func(void *); @@ -192,6 +194,7 @@ MXS_MODULE* MXS_CREATE_MODULE() {"encrypt_binlog", MXS_MODULE_PARAM_BOOL, "false"}, {"encryption_algorithm", MXS_MODULE_PARAM_ENUM, "aes_cbc", MXS_MODULE_OPT_NONE, enc_algo_values}, {"encryption_key_file", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK}, + {"mariadb_gtid", MXS_MODULE_PARAM_BOOL, "false"}, {"shortburst", MXS_MODULE_PARAM_COUNT, DEF_SHORT_BURST}, {"longburst", MXS_MODULE_PARAM_COUNT, DEF_LONG_BURST}, {"burstsize", MXS_MODULE_PARAM_SIZE, DEF_BURST_SIZE}, @@ -312,7 +315,7 @@ createInstance(SERVICE *service, char **options) inst->current_pos = 0; inst->current_safe_event = 0; inst->master_event_state = BLR_EVENT_DONE; - inst->mariadb_gtid[0] = '\0'; + inst->last_mariadb_gtid[0] = '\0'; strcpy(inst->binlog_name, ""); strcpy(inst->prevbinlog, ""); @@ -350,6 +353,9 @@ createInstance(SERVICE *service, char **options) inst->request_semi_sync = config_get_bool(params, "semisync"); inst->master_semi_sync = 0; + /* Enable MariaDB GTID tracking */ + inst->mariadb_gtid = config_get_bool(params, "mariadb_gtid"); + /* Binlog encryption */ inst->encryption.enabled = config_get_bool(params, "encrypt_binlog"); inst->encryption.encryption_algorithm = config_get_enum(params, "encryption_algorithm", enc_algo_values); @@ -358,6 +364,10 @@ createInstance(SERVICE *service, char **options) /* Encryption CTX */ inst->encryption_ctx = NULL; + /* MariaDB GTID repo init val */ + inst->gtid_repo = NULL; + + /* Set router uuid */ inst->uuid = config_copy_string(params, "uuid"); if (inst->uuid == NULL) @@ -507,6 +517,10 @@ createInstance(SERVICE *service, char **options) { inst->encryption.enabled = config_truth_value(value); } + else if (strcmp(options[i], "mariadb_gtid") == 0) + { + inst->mariadb_gtid = config_truth_value(value); + } else if (strcmp(options[i], "encryption_algorithm") == 0) { int ret = blr_check_encryption_algorithm(value); @@ -626,6 +640,38 @@ createInstance(SERVICE *service, char **options) return NULL; } + /* Enable MariaDB GTID repo */ + if (inst->mariadb10_compat && + inst->mariadb_gtid) + { + if (!inst->trx_safe) + { + MXS_ERROR("MariaDB GTID can be enabled only" + " with Transaction Safety feature." + " Please enable it with option 'transaction_safety = on'"); + free_instance(inst); + return NULL; + } + + if ((inst->gtid_repo = hashtable_alloc(1000, + hashtable_item_strhash, + hashtable_item_strcmp)) == NULL) + { + MXS_ERROR("Service %s, cannot allocate MariaDB GTID hashtable", service->name); + free_instance(inst); + return NULL; + } + + hashtable_memory_fns(inst->gtid_repo, + hashtable_item_strdup, + (HASHCOPYFN)mariadb_gtid_info_dup, + hashtable_item_free, + (HASHFREEFN)mariadb_gtid_info_free); + + MXS_NOTICE("%s: Service has MariaDB GTID otion set to ON", + service->name); + } + if (inst->serverid <= 0) { MXS_ERROR("Service %s, server-id is not configured. " @@ -1417,10 +1463,12 @@ diagnostics(MXS_ROUTER *router, DCB *dcb) dcb_printf(dcb, "\tLast event from master: 0x%x, %s\n", router_inst->lastEventReceived, (ptr != NULL) ? ptr : "unknown"); - if (router_inst->mariadb_gtid[0]) + + if (router_inst->mariadb_gtid && + router_inst->last_mariadb_gtid[0]) { dcb_printf(dcb, "\tLast seen MariaDB GTID: %s\n", - router_inst->mariadb_gtid); + router_inst->last_mariadb_gtid); } } @@ -2479,6 +2527,9 @@ destroyInstance(MXS_ROUTER *instance) inst->service->name, inst->binlog_name, inst->current_pos, inst->binlog_position); } + /* Free GTID hashtable */ + hashtable_free(inst->gtid_repo); + spinlock_release(&inst->lock); } @@ -2674,3 +2725,40 @@ int blr_parse_key_file(ROUTER_INSTANCE *router) return 0; } } + +/** + * Free routine for GTID repo hashtable + * + * @param in The data to free + */ +static void mariadb_gtid_info_free(MARIADB_GTID_INFO *in) +{ + if (in) + { + MXS_FREE(in->gtid); + MXS_FREE(in); + } +} + +/** + * Copy routine for GTID repo hashtable + * + * @param in The data to copy + * @return New allocated value or NULL + */ +static MARIADB_GTID_INFO *mariadb_gtid_info_dup(const MARIADB_GTID_INFO *in) +{ + MARIADB_GTID_INFO *rval = (MARIADB_GTID_INFO *) MXS_CALLOC(1, sizeof(MARIADB_GTID_INFO)); + char *gtid = MXS_STRDUP(in->gtid); + if (!gtid || !rval) + { + MXS_FREE(rval); + MXS_FREE(gtid); + return NULL; + } + rval->gtid = gtid; + rval->start = in-> start; + rval->end = in->end; + + return (void *) rval; +} diff --git a/server/modules/routing/binlogrouter/blr.h b/server/modules/routing/binlogrouter/blr.h index 11cf7084b..f94889618 100644 --- a/server/modules/routing/binlogrouter/blr.h +++ b/server/modules/routing/binlogrouter/blr.h @@ -541,6 +541,14 @@ typedef struct pending_transaction uint64_t end_pos; /** The next_pos in COMMIT event*/ } PENDING_TRANSACTION; +/** MariaDB GTID info */ +typedef struct mariadb_gtid_info +{ + char *gtid; /** MariaDB 10.x GTID */ + uint64_t start; /** The BEGIN pos */ + uint64_t end; /** The next_pos in COMMIT event*/ +} MARIADB_GTID_INFO; + /** * The per instance data for the router. */ @@ -621,7 +629,11 @@ typedef struct router_instance int master_semi_sync; /*< Semi-Sync replication status of master server */ BINLOG_ENCRYPTION_SETUP encryption; /*< Binlog encryption setup */ void *encryption_ctx; /*< Encryption context */ - char mariadb_gtid[GTID_MAX_LEN + 1]; /*< MariaDB 10 GTID string value */ + char last_mariadb_gtid[GTID_MAX_LEN + 1]; /*< Last seen MariaDB 10 GTID */ + bool mariadb_gtid; /*< Save received MariaDB GTIDs into repo. + * This allows MariaDB 10 slave servers + * connecting with GTID */ + HASHTABLE *gtid_repo; /*< Storage for MariaDB GTIDs */ struct router_instance *next; } ROUTER_INSTANCE; diff --git a/server/modules/routing/binlogrouter/blr_file.c b/server/modules/routing/binlogrouter/blr_file.c index d0fdee776..248c96e9a 100644 --- a/server/modules/routing/binlogrouter/blr_file.c +++ b/server/modules/routing/binlogrouter/blr_file.c @@ -172,6 +172,7 @@ static void blr_format_event_size(double *event_size, char *label); extern int MaxScaleUptime(); extern void encode_value(unsigned char *data, unsigned int value, int len); extern void blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr); +int blr_save_mariadb_gtid(ROUTER_INSTANCE *inst); typedef struct binlog_event_desc { @@ -2055,13 +2056,6 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) { - char mariadb_gtid[GTID_MAX_LEN + 1]; - snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", - domainid, hdr.serverid, - n_sequence); - - strcpy(router->mariadb_gtid, mariadb_gtid); - if (pending_transaction > BLRM_NO_TRANSACTION) { MXS_ERROR("Transaction cannot be @ pos %llu: " @@ -2076,7 +2070,21 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, } else { - pending_transaction = 1; + char mariadb_gtid[GTID_MAX_LEN + 1]; + snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", + domainid, hdr.serverid, + n_sequence); + + pending_transaction = BLRM_TRANSACTION_START; + + router->pending_transaction.start_pos = pos; + router->pending_transaction.end_pos = 0; + + /* Set MariaDB GTID */ + if (router->mariadb_gtid) + { + strcpy(router->pending_transaction.gtid, mariadb_gtid); + } transaction_events = 0; event_bytes = 0; @@ -2134,6 +2142,9 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, { pending_transaction = BLRM_TRANSACTION_START; + router->pending_transaction.start_pos = pos; + router->pending_transaction.end_pos = 0; + transaction_events = 0; event_bytes = 0; if (!(debug & BLR_CHECK_ONLY)) @@ -2190,12 +2201,26 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, MXS_DEBUG("< Transaction @ pos %llu, is now closed @ %llu. %lu events seen", last_known_commit, pos, transaction_events); } + pending_transaction = BLRM_NO_TRANSACTION; + + router->pending_transaction.end_pos = hdr.next_pos; + last_known_commit = pos; /* Reset the event replacing indicator */ replace_trx_events = false; + if (router->mariadb10_compat && + router->mariadb_gtid) + { + /* Update Last Seen MariaDB GTID */ + strcpy(router->last_mariadb_gtid, router->pending_transaction.gtid); + + /* Save MariaDB 10 GTID */ + blr_save_mariadb_gtid(router); + } + total_events += transaction_events; if (transaction_events > max_events) @@ -3273,3 +3298,37 @@ static void blr_report_checksum(REP_HEADER hdr, const uint8_t *buffer, char *out *p = tolower(*p); } } + +/** + * Save MariaDB GTID found in complete transaction + * + * @param inst The router instance + * @return 1 on success, 0 otherwise + */ +int blr_save_mariadb_gtid(ROUTER_INSTANCE *inst) +{ + MARIADB_GTID_INFO gtid_info; + + gtid_info.gtid = inst->pending_transaction.gtid; + gtid_info.start = inst->pending_transaction.start_pos; + gtid_info.end = inst->pending_transaction.end_pos; + + /* Save GTID into repo */ + if (!hashtable_add(inst->gtid_repo, + inst->pending_transaction.gtid, + >id_info)) + { + MXS_ERROR("Service %s: error saving mariadb GTID %s into repo", + inst->service->name, + inst->pending_transaction.gtid); + return 0; + } + + MXS_DEBUG("Saved MariaDB GTID '%s', %s:%lu:%lu", + gtid_info.gtid, + inst->binlog_name, + gtid_info.start, + gtid_info.end); + + return 1; +} diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index 9e18c2de8..a4f3401b9 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -108,6 +108,7 @@ static int blr_get_master_semisync(GWBUF *buf); static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len); void blr_notify_all_slaves(ROUTER_INSTANCE *router); extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave); +extern int blr_save_mariadb_gtid(ROUTER_INSTANCE *inst); static int keepalive = 1; @@ -1386,16 +1387,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) { - char mariadb_gtid[GTID_MAX_LEN + 1]; - snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", - domainid, hdr.serverid, - n_sequence); - - MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu", - mariadb_gtid, - router->binlog_name, - router->current_pos); - spinlock_acquire(&router->binlog_lock); /** @@ -1405,17 +1396,35 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { MXS_ERROR("A MariaDB 10 transaction " "is already open " - "@ %lu (GTID %s) and " + "@ %lu (GTID %u-%u-%lu) and " "a new one starts @ %lu", router->binlog_position, - mariadb_gtid, + domainid, + hdr.serverid, + n_sequence, router->current_pos); } router->pending_transaction.state = BLRM_TRANSACTION_START; - /* Save the pending GTID details */ - strcpy(router->pending_transaction.gtid, mariadb_gtid); + /* Handle MariaDB GTID */ + if (router->mariadb_gtid) + { + char mariadb_gtid[GTID_MAX_LEN + 1]; + snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu", + domainid, + hdr.serverid, + n_sequence); + + MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu", + mariadb_gtid, + router->binlog_name, + router->current_pos); + + /* Save the pending GTID value */ + strcpy(router->pending_transaction.gtid, mariadb_gtid); + } + router->pending_transaction.start_pos = router->current_pos; router->pending_transaction.end_pos = 0; @@ -1629,16 +1638,25 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) if (router->pending_transaction.state > BLRM_TRANSACTION_START) { - /* Update last seen MariaDB GTID */ if (router->mariadb10_compat) { - strcpy(router->mariadb_gtid, router->pending_transaction.gtid); - - /* The transaction has been saved. + /** + * The transaction has been saved. * this poins to end of binlog: * i.e. the position of a new event */ router->pending_transaction.end_pos = router->current_pos; + + if (router->mariadb10_compat && + router->mariadb_gtid) + { + /* Update last seen MariaDB GTID */ + strcpy(router->last_mariadb_gtid, router->pending_transaction.gtid); + /** + * Save MariaDB GTID into repo + */ + blr_save_mariadb_gtid(router); + } } spinlock_release(&router->binlog_lock); diff --git a/server/modules/routing/binlogrouter/blr_slave.c b/server/modules/routing/binlogrouter/blr_slave.c index 45eb8ee4c..df38af69e 100644 --- a/server/modules/routing/binlogrouter/blr_slave.c +++ b/server/modules/routing/binlogrouter/blr_slave.c @@ -653,10 +653,11 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) MXS_FREE(query_text); - if (router->mariadb10_compat) + if (router->mariadb10_compat && + router->mariadb_gtid) { spinlock_acquire(&router->binlog_lock); - strcpy(mariadb_gtid, router->mariadb_gtid); + strcpy(mariadb_gtid, router->last_mariadb_gtid); spinlock_release(&router->binlog_lock); }