MXS-1075: save all received MariaDB GTIds
The GTID saving, which will allow slave to connect with GTID, is done only with transaction_safety = on
This commit is contained in:
@ -117,6 +117,8 @@ static void destroyInstance(MXS_ROUTER *instance);
|
||||
bool blr_extract_key(const char *linebuf, int nline, ROUTER_INSTANCE *router);
|
||||
bool blr_get_encryption_key(ROUTER_INSTANCE *router);
|
||||
int blr_parse_key_file(ROUTER_INSTANCE *router);
|
||||
static MARIADB_GTID_INFO *mariadb_gtid_info_dup(const MARIADB_GTID_INFO *in);
|
||||
static void mariadb_gtid_info_free(MARIADB_GTID_INFO *in);
|
||||
|
||||
static void stats_func(void *);
|
||||
|
||||
@ -192,6 +194,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
{"encrypt_binlog", MXS_MODULE_PARAM_BOOL, "false"},
|
||||
{"encryption_algorithm", MXS_MODULE_PARAM_ENUM, "aes_cbc", MXS_MODULE_OPT_NONE, enc_algo_values},
|
||||
{"encryption_key_file", MXS_MODULE_PARAM_PATH, NULL, MXS_MODULE_OPT_PATH_R_OK},
|
||||
{"mariadb_gtid", MXS_MODULE_PARAM_BOOL, "false"},
|
||||
{"shortburst", MXS_MODULE_PARAM_COUNT, DEF_SHORT_BURST},
|
||||
{"longburst", MXS_MODULE_PARAM_COUNT, DEF_LONG_BURST},
|
||||
{"burstsize", MXS_MODULE_PARAM_SIZE, DEF_BURST_SIZE},
|
||||
@ -312,7 +315,7 @@ createInstance(SERVICE *service, char **options)
|
||||
inst->current_pos = 0;
|
||||
inst->current_safe_event = 0;
|
||||
inst->master_event_state = BLR_EVENT_DONE;
|
||||
inst->mariadb_gtid[0] = '\0';
|
||||
inst->last_mariadb_gtid[0] = '\0';
|
||||
|
||||
strcpy(inst->binlog_name, "");
|
||||
strcpy(inst->prevbinlog, "");
|
||||
@ -350,6 +353,9 @@ createInstance(SERVICE *service, char **options)
|
||||
inst->request_semi_sync = config_get_bool(params, "semisync");
|
||||
inst->master_semi_sync = 0;
|
||||
|
||||
/* Enable MariaDB GTID tracking */
|
||||
inst->mariadb_gtid = config_get_bool(params, "mariadb_gtid");
|
||||
|
||||
/* Binlog encryption */
|
||||
inst->encryption.enabled = config_get_bool(params, "encrypt_binlog");
|
||||
inst->encryption.encryption_algorithm = config_get_enum(params, "encryption_algorithm", enc_algo_values);
|
||||
@ -358,6 +364,10 @@ createInstance(SERVICE *service, char **options)
|
||||
/* Encryption CTX */
|
||||
inst->encryption_ctx = NULL;
|
||||
|
||||
/* MariaDB GTID repo init val */
|
||||
inst->gtid_repo = NULL;
|
||||
|
||||
/* Set router uuid */
|
||||
inst->uuid = config_copy_string(params, "uuid");
|
||||
|
||||
if (inst->uuid == NULL)
|
||||
@ -507,6 +517,10 @@ createInstance(SERVICE *service, char **options)
|
||||
{
|
||||
inst->encryption.enabled = config_truth_value(value);
|
||||
}
|
||||
else if (strcmp(options[i], "mariadb_gtid") == 0)
|
||||
{
|
||||
inst->mariadb_gtid = config_truth_value(value);
|
||||
}
|
||||
else if (strcmp(options[i], "encryption_algorithm") == 0)
|
||||
{
|
||||
int ret = blr_check_encryption_algorithm(value);
|
||||
@ -626,6 +640,38 @@ createInstance(SERVICE *service, char **options)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Enable MariaDB GTID repo */
|
||||
if (inst->mariadb10_compat &&
|
||||
inst->mariadb_gtid)
|
||||
{
|
||||
if (!inst->trx_safe)
|
||||
{
|
||||
MXS_ERROR("MariaDB GTID can be enabled only"
|
||||
" with Transaction Safety feature."
|
||||
" Please enable it with option 'transaction_safety = on'");
|
||||
free_instance(inst);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if ((inst->gtid_repo = hashtable_alloc(1000,
|
||||
hashtable_item_strhash,
|
||||
hashtable_item_strcmp)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Service %s, cannot allocate MariaDB GTID hashtable", service->name);
|
||||
free_instance(inst);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
hashtable_memory_fns(inst->gtid_repo,
|
||||
hashtable_item_strdup,
|
||||
(HASHCOPYFN)mariadb_gtid_info_dup,
|
||||
hashtable_item_free,
|
||||
(HASHFREEFN)mariadb_gtid_info_free);
|
||||
|
||||
MXS_NOTICE("%s: Service has MariaDB GTID otion set to ON",
|
||||
service->name);
|
||||
}
|
||||
|
||||
if (inst->serverid <= 0)
|
||||
{
|
||||
MXS_ERROR("Service %s, server-id is not configured. "
|
||||
@ -1417,10 +1463,12 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
|
||||
|
||||
dcb_printf(dcb, "\tLast event from master: 0x%x, %s\n",
|
||||
router_inst->lastEventReceived, (ptr != NULL) ? ptr : "unknown");
|
||||
if (router_inst->mariadb_gtid[0])
|
||||
|
||||
if (router_inst->mariadb_gtid &&
|
||||
router_inst->last_mariadb_gtid[0])
|
||||
{
|
||||
dcb_printf(dcb, "\tLast seen MariaDB GTID: %s\n",
|
||||
router_inst->mariadb_gtid);
|
||||
router_inst->last_mariadb_gtid);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2479,6 +2527,9 @@ destroyInstance(MXS_ROUTER *instance)
|
||||
inst->service->name, inst->binlog_name, inst->current_pos, inst->binlog_position);
|
||||
}
|
||||
|
||||
/* Free GTID hashtable */
|
||||
hashtable_free(inst->gtid_repo);
|
||||
|
||||
spinlock_release(&inst->lock);
|
||||
}
|
||||
|
||||
@ -2674,3 +2725,40 @@ int blr_parse_key_file(ROUTER_INSTANCE *router)
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Free routine for GTID repo hashtable
|
||||
*
|
||||
* @param in The data to free
|
||||
*/
|
||||
static void mariadb_gtid_info_free(MARIADB_GTID_INFO *in)
|
||||
{
|
||||
if (in)
|
||||
{
|
||||
MXS_FREE(in->gtid);
|
||||
MXS_FREE(in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy routine for GTID repo hashtable
|
||||
*
|
||||
* @param in The data to copy
|
||||
* @return New allocated value or NULL
|
||||
*/
|
||||
static MARIADB_GTID_INFO *mariadb_gtid_info_dup(const MARIADB_GTID_INFO *in)
|
||||
{
|
||||
MARIADB_GTID_INFO *rval = (MARIADB_GTID_INFO *) MXS_CALLOC(1, sizeof(MARIADB_GTID_INFO));
|
||||
char *gtid = MXS_STRDUP(in->gtid);
|
||||
if (!gtid || !rval)
|
||||
{
|
||||
MXS_FREE(rval);
|
||||
MXS_FREE(gtid);
|
||||
return NULL;
|
||||
}
|
||||
rval->gtid = gtid;
|
||||
rval->start = in-> start;
|
||||
rval->end = in->end;
|
||||
|
||||
return (void *) rval;
|
||||
}
|
||||
|
@ -541,6 +541,14 @@ typedef struct pending_transaction
|
||||
uint64_t end_pos; /** The next_pos in COMMIT event*/
|
||||
} PENDING_TRANSACTION;
|
||||
|
||||
/** MariaDB GTID info */
|
||||
typedef struct mariadb_gtid_info
|
||||
{
|
||||
char *gtid; /** MariaDB 10.x GTID */
|
||||
uint64_t start; /** The BEGIN pos */
|
||||
uint64_t end; /** The next_pos in COMMIT event*/
|
||||
} MARIADB_GTID_INFO;
|
||||
|
||||
/**
|
||||
* The per instance data for the router.
|
||||
*/
|
||||
@ -621,7 +629,11 @@ typedef struct router_instance
|
||||
int master_semi_sync; /*< Semi-Sync replication status of master server */
|
||||
BINLOG_ENCRYPTION_SETUP encryption; /*< Binlog encryption setup */
|
||||
void *encryption_ctx; /*< Encryption context */
|
||||
char mariadb_gtid[GTID_MAX_LEN + 1]; /*< MariaDB 10 GTID string value */
|
||||
char last_mariadb_gtid[GTID_MAX_LEN + 1]; /*< Last seen MariaDB 10 GTID */
|
||||
bool mariadb_gtid; /*< Save received MariaDB GTIDs into repo.
|
||||
* This allows MariaDB 10 slave servers
|
||||
* connecting with GTID */
|
||||
HASHTABLE *gtid_repo; /*< Storage for MariaDB GTIDs */
|
||||
struct router_instance *next;
|
||||
} ROUTER_INSTANCE;
|
||||
|
||||
|
@ -172,6 +172,7 @@ static void blr_format_event_size(double *event_size, char *label);
|
||||
extern int MaxScaleUptime();
|
||||
extern void encode_value(unsigned char *data, unsigned int value, int len);
|
||||
extern void blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr);
|
||||
int blr_save_mariadb_gtid(ROUTER_INSTANCE *inst);
|
||||
|
||||
typedef struct binlog_event_desc
|
||||
{
|
||||
@ -2055,13 +2056,6 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
|
||||
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
|
||||
{
|
||||
char mariadb_gtid[GTID_MAX_LEN + 1];
|
||||
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
|
||||
domainid, hdr.serverid,
|
||||
n_sequence);
|
||||
|
||||
strcpy(router->mariadb_gtid, mariadb_gtid);
|
||||
|
||||
if (pending_transaction > BLRM_NO_TRANSACTION)
|
||||
{
|
||||
MXS_ERROR("Transaction cannot be @ pos %llu: "
|
||||
@ -2076,7 +2070,21 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
}
|
||||
else
|
||||
{
|
||||
pending_transaction = 1;
|
||||
char mariadb_gtid[GTID_MAX_LEN + 1];
|
||||
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
|
||||
domainid, hdr.serverid,
|
||||
n_sequence);
|
||||
|
||||
pending_transaction = BLRM_TRANSACTION_START;
|
||||
|
||||
router->pending_transaction.start_pos = pos;
|
||||
router->pending_transaction.end_pos = 0;
|
||||
|
||||
/* Set MariaDB GTID */
|
||||
if (router->mariadb_gtid)
|
||||
{
|
||||
strcpy(router->pending_transaction.gtid, mariadb_gtid);
|
||||
}
|
||||
|
||||
transaction_events = 0;
|
||||
event_bytes = 0;
|
||||
@ -2134,6 +2142,9 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
{
|
||||
pending_transaction = BLRM_TRANSACTION_START;
|
||||
|
||||
router->pending_transaction.start_pos = pos;
|
||||
router->pending_transaction.end_pos = 0;
|
||||
|
||||
transaction_events = 0;
|
||||
event_bytes = 0;
|
||||
if (!(debug & BLR_CHECK_ONLY))
|
||||
@ -2190,12 +2201,26 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
|
||||
MXS_DEBUG("< Transaction @ pos %llu, is now closed @ %llu. %lu events seen",
|
||||
last_known_commit, pos, transaction_events);
|
||||
}
|
||||
|
||||
pending_transaction = BLRM_NO_TRANSACTION;
|
||||
|
||||
router->pending_transaction.end_pos = hdr.next_pos;
|
||||
|
||||
last_known_commit = pos;
|
||||
|
||||
/* Reset the event replacing indicator */
|
||||
replace_trx_events = false;
|
||||
|
||||
if (router->mariadb10_compat &&
|
||||
router->mariadb_gtid)
|
||||
{
|
||||
/* Update Last Seen MariaDB GTID */
|
||||
strcpy(router->last_mariadb_gtid, router->pending_transaction.gtid);
|
||||
|
||||
/* Save MariaDB 10 GTID */
|
||||
blr_save_mariadb_gtid(router);
|
||||
}
|
||||
|
||||
total_events += transaction_events;
|
||||
|
||||
if (transaction_events > max_events)
|
||||
@ -3273,3 +3298,37 @@ static void blr_report_checksum(REP_HEADER hdr, const uint8_t *buffer, char *out
|
||||
*p = tolower(*p);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save MariaDB GTID found in complete transaction
|
||||
*
|
||||
* @param inst The router instance
|
||||
* @return 1 on success, 0 otherwise
|
||||
*/
|
||||
int blr_save_mariadb_gtid(ROUTER_INSTANCE *inst)
|
||||
{
|
||||
MARIADB_GTID_INFO gtid_info;
|
||||
|
||||
gtid_info.gtid = inst->pending_transaction.gtid;
|
||||
gtid_info.start = inst->pending_transaction.start_pos;
|
||||
gtid_info.end = inst->pending_transaction.end_pos;
|
||||
|
||||
/* Save GTID into repo */
|
||||
if (!hashtable_add(inst->gtid_repo,
|
||||
inst->pending_transaction.gtid,
|
||||
>id_info))
|
||||
{
|
||||
MXS_ERROR("Service %s: error saving mariadb GTID %s into repo",
|
||||
inst->service->name,
|
||||
inst->pending_transaction.gtid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
MXS_DEBUG("Saved MariaDB GTID '%s', %s:%lu:%lu",
|
||||
gtid_info.gtid,
|
||||
inst->binlog_name,
|
||||
gtid_info.start,
|
||||
gtid_info.end);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
@ -108,6 +108,7 @@ static int blr_get_master_semisync(GWBUF *buf);
|
||||
static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len);
|
||||
void blr_notify_all_slaves(ROUTER_INSTANCE *router);
|
||||
extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
|
||||
extern int blr_save_mariadb_gtid(ROUTER_INSTANCE *inst);
|
||||
|
||||
static int keepalive = 1;
|
||||
|
||||
@ -1386,16 +1387,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
|
||||
{
|
||||
char mariadb_gtid[GTID_MAX_LEN + 1];
|
||||
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
|
||||
domainid, hdr.serverid,
|
||||
n_sequence);
|
||||
|
||||
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
|
||||
mariadb_gtid,
|
||||
router->binlog_name,
|
||||
router->current_pos);
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
/**
|
||||
@ -1405,17 +1396,35 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
{
|
||||
MXS_ERROR("A MariaDB 10 transaction "
|
||||
"is already open "
|
||||
"@ %lu (GTID %s) and "
|
||||
"@ %lu (GTID %u-%u-%lu) and "
|
||||
"a new one starts @ %lu",
|
||||
router->binlog_position,
|
||||
mariadb_gtid,
|
||||
domainid,
|
||||
hdr.serverid,
|
||||
n_sequence,
|
||||
router->current_pos);
|
||||
}
|
||||
|
||||
router->pending_transaction.state = BLRM_TRANSACTION_START;
|
||||
|
||||
/* Save the pending GTID details */
|
||||
strcpy(router->pending_transaction.gtid, mariadb_gtid);
|
||||
/* Handle MariaDB GTID */
|
||||
if (router->mariadb_gtid)
|
||||
{
|
||||
char mariadb_gtid[GTID_MAX_LEN + 1];
|
||||
snprintf(mariadb_gtid, GTID_MAX_LEN, "%u-%u-%lu",
|
||||
domainid,
|
||||
hdr.serverid,
|
||||
n_sequence);
|
||||
|
||||
MXS_DEBUG("MariaDB GTID received: (%s). Current file %s, pos %lu",
|
||||
mariadb_gtid,
|
||||
router->binlog_name,
|
||||
router->current_pos);
|
||||
|
||||
/* Save the pending GTID value */
|
||||
strcpy(router->pending_transaction.gtid, mariadb_gtid);
|
||||
}
|
||||
|
||||
router->pending_transaction.start_pos = router->current_pos;
|
||||
router->pending_transaction.end_pos = 0;
|
||||
|
||||
@ -1629,16 +1638,25 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
if (router->pending_transaction.state > BLRM_TRANSACTION_START)
|
||||
{
|
||||
/* Update last seen MariaDB GTID */
|
||||
if (router->mariadb10_compat)
|
||||
{
|
||||
strcpy(router->mariadb_gtid, router->pending_transaction.gtid);
|
||||
|
||||
/* The transaction has been saved.
|
||||
/**
|
||||
* The transaction has been saved.
|
||||
* this poins to end of binlog:
|
||||
* i.e. the position of a new event
|
||||
*/
|
||||
router->pending_transaction.end_pos = router->current_pos;
|
||||
|
||||
if (router->mariadb10_compat &&
|
||||
router->mariadb_gtid)
|
||||
{
|
||||
/* Update last seen MariaDB GTID */
|
||||
strcpy(router->last_mariadb_gtid, router->pending_transaction.gtid);
|
||||
/**
|
||||
* Save MariaDB GTID into repo
|
||||
*/
|
||||
blr_save_mariadb_gtid(router);
|
||||
}
|
||||
}
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
@ -653,10 +653,11 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
|
||||
MXS_FREE(query_text);
|
||||
|
||||
if (router->mariadb10_compat)
|
||||
if (router->mariadb10_compat &&
|
||||
router->mariadb_gtid)
|
||||
{
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
strcpy(mariadb_gtid, router->mariadb_gtid);
|
||||
strcpy(mariadb_gtid, router->last_mariadb_gtid);
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user