MXS-1266: saving GTID components into gtid_maps storage. part1

Saving GTID components into gtid_maps storage will allow to create a
hiearchical binlog cache dir.

Empty GTID  for master registration can be specified with SET
@@global.gtid_slave_pos = ‘’
This commit is contained in:
MassimilianoPinto
2017-05-19 10:23:35 +02:00
parent 56cf06ee08
commit 6c86e1ef2f
5 changed files with 262 additions and 116 deletions

View File

@ -3116,12 +3116,18 @@ static bool blr_open_gtid_maps_storage(ROUTER_INSTANCE *inst)
char* errmsg;
/* Create the gtid_maps table */
int rc = sqlite3_exec(inst->gtid_maps,
"CREATE TABLE IF NOT EXISTS "
"gtid_maps(gtid varchar(255), "
"binlog_file varchar(255), "
"start_pos bigint, "
"end_pos bigint, "
"primary key(gtid));",
"BEGIN;"
"CREATE TABLE IF NOT EXISTS gtid_maps("
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
"rep_domain INT, "
"server_id INT, "
"sequence BIGINT, "
"binlog_file VARCHAR(255), "
"start_pos BIGINT, "
"end_pos BIGINT);"
"CREATE UNIQUE INDEX IF NOT EXISTS gtid_index "
"ON gtid_maps(rep_domain, server_id, sequence);"
"COMMIT;",
NULL, NULL, &errmsg);
if (rc != SQLITE_OK)
{

View File

@ -546,24 +546,6 @@ typedef enum
BLRM_XID_EVENT_SEEN /*< Received XID event of current transaction */
} master_transaction_t;
/** Transaction Details */
typedef struct pending_transaction
{
char gtid[GTID_MAX_LEN + 1]; /** MariaDB 10.x GTID */
master_transaction_t state; /** Transaction state */
uint64_t start_pos; /** The BEGIN pos */
uint64_t end_pos; /** The next_pos in COMMIT event*/
} PENDING_TRANSACTION;
/** MariaDB GTID info */
typedef struct mariadb_gtid_info
{
char *gtid; /** MariaDB 10.x GTID */
char *file; /** The binlog file */
uint64_t start; /** The BEGIN pos */
uint64_t end; /** The next_pos in COMMIT event*/
} MARIADB_GTID_INFO;
/** MariaDB GTID elements */
typedef struct mariadb_gtid_elems
{
@ -572,6 +554,25 @@ typedef struct mariadb_gtid_elems
uint64_t seq_no; /*< The sequence number */
} MARIADB_GTID_ELEMS;
/** Transaction Details */
typedef struct pending_transaction
{
char gtid[GTID_MAX_LEN + 1]; /** MariaDB 10.x GTID */
master_transaction_t state; /** Transaction state */
uint64_t start_pos; /** The BEGIN pos */
uint64_t end_pos; /** The next_pos in COMMIT event*/
MARIADB_GTID_ELEMS gtid_elms; /* MariaDB 10.x GTID components */
} PENDING_TRANSACTION;
/** MariaDB GTID info */
typedef struct mariadb_gtid_info
{
char *gtid; /** MariaDB 10.x GTID, string value */
char *file; /** The binlog file */
uint64_t start; /** The BEGIN pos: i.e the GTID event */
uint64_t end; /** The next_pos in COMMIT event*/
} MARIADB_GTID_INFO;
/**
* The per instance data for the router.
*/

View File

@ -173,6 +173,7 @@ extern int MaxScaleUptime();
extern void encode_value(unsigned char *data, unsigned int value, int len);
extern void blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr);
bool blr_save_mariadb_gtid(ROUTER_INSTANCE *inst);
bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info);
typedef struct binlog_event_desc
{
@ -219,7 +220,12 @@ static int blr_binlog_event_check(ROUTER_INSTANCE *router,
char *binlogname,
char *errmsg);
static void blr_report_checksum(REP_HEADER hdr, const uint8_t *buffer, char *output);
static void blr_report_checksum(REP_HEADER hdr,
const uint8_t *buffer,
char *output);
bool blr_load_last_mariadb_gtid(ROUTER_INSTANCE *router,
MARIADB_GTID_INFO *result);
/**
* MariaDB 10.1.7 Start Encryption event content
@ -2065,7 +2071,8 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
snprintf(mariadb_gtid,
GTID_MAX_LEN,
"%u-%u-%lu",
domainid, hdr.serverid,
domainid,
hdr.serverid,
n_sequence);
pending_transaction = BLRM_TRANSACTION_START;
@ -2077,6 +2084,11 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
if (router->mariadb10_gtid)
{
strcpy(router->pending_transaction.gtid, mariadb_gtid);
/* Save the pending GTID components */
router->pending_transaction.gtid_elms.domain_id = domainid;
router->pending_transaction.gtid_elms.server_id = hdr.serverid;
router->pending_transaction.gtid_elms.seq_no = n_sequence;
}
transaction_events = 0;
@ -2099,7 +2111,10 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
{
if (hdr.event_type == MARIADB10_GTID_GTID_LIST_EVENT)
{
char mariadb_gtid[GTID_MAX_LEN + 1] = "";
MARIADB_GTID_INFO gtid_info = {};
unsigned long n_gtids;
n_gtids = extract_field(ptr, 32);
/* The lower 28 bits are the number of GTIDs */
n_gtids &= 0x01111111;
@ -2119,8 +2134,6 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
n_sequence = extract_field(ptr, 64);
ptr += 4;
char mariadb_gtid[GTID_MAX_LEN + 1];
snprintf(mariadb_gtid,
GTID_MAX_LEN,
"%u-%u-%lu",
@ -2128,16 +2141,36 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
serverid,
n_sequence);
MXS_DEBUG("GTID List has %lu GTIDs, first is %s",
MXS_DEBUG("GTID List Event has %lu GTIDs, first is %s",
n_gtids,
mariadb_gtid);
}
else
{
MXS_DEBUG("GTID List Event has no GTIDs");
/* Set MariaDB GTID */
if (router->mariadb10_gtid)
/* Try loading last found GTID */
if (router->mariadb10_gtid &&
blr_load_last_mariadb_gtid(router, &gtid_info) &&
gtid_info.gtid != NULL)
{
strcpy(router->last_mariadb_gtid, mariadb_gtid);
snprintf(mariadb_gtid,
GTID_MAX_LEN,
"%s",
gtid_info.gtid);
MXS_FREE(gtid_info.gtid);
MXS_FREE(gtid_info.file);
}
}
/* Set MariaDB GTID */
if (router->mariadb10_gtid)
{
strcpy(router->last_mariadb_gtid, mariadb_gtid);
MXS_INFO("Last MariaDB 10 GTID was (%s).",
router->last_mariadb_gtid);
}
}
}
@ -2257,8 +2290,8 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
router->mariadb10_gtid)
{
/* Update Last Seen MariaDB GTID */
strcpy(router->last_mariadb_gtid, router->pending_transaction.gtid);
strcpy(router->last_mariadb_gtid,
router->pending_transaction.gtid);
/* Save MariaDB 10 GTID */
blr_save_mariadb_gtid(router);
}
@ -3349,24 +3382,39 @@ static void blr_report_checksum(REP_HEADER hdr, const uint8_t *buffer, char *out
bool blr_save_mariadb_gtid(ROUTER_INSTANCE *inst)
{
static const char insert_tpl[] = "INSERT OR IGNORE INTO gtid_maps("
"gtid, "
"binlog_file, "
"start_pos, end_pos) "
"VALUES (\"%s\", \"%s\", %lu, %lu);";
MARIADB_GTID_INFO gtid_info;
"rep_domain, "
"server_id, "
"sequence, "
"binlog_file, "
"start_pos, "
"end_pos) "
"VALUES ( "
"%" PRIu32 ", "
"%" PRIu32 ", "
"%" PRIu64 ", "
"\"%s\", "
"%" PRIu64 ", "
"%" PRIu64 ");";
char *errmsg;
char insert_sql[GTID_SQL_BUFFER_SIZE];
MARIADB_GTID_INFO gtid_info;
MARIADB_GTID_ELEMS gtid_elms;
gtid_info.gtid = inst->pending_transaction.gtid;
gtid_info.file = inst->binlog_name;
gtid_info.start = inst->pending_transaction.start_pos;
gtid_info.end = inst->pending_transaction.end_pos;
memcpy(&gtid_elms,
&inst->pending_transaction.gtid_elms,
sizeof(MARIADB_GTID_ELEMS));
/* Save GTID into repo */
snprintf(insert_sql,
GTID_SQL_BUFFER_SIZE,
insert_tpl,
gtid_info.gtid,
gtid_elms.domain_id,
gtid_elms.server_id,
gtid_elms.seq_no,
gtid_info.file,
gtid_info.start,
gtid_info.end);
@ -3383,9 +3431,9 @@ bool blr_save_mariadb_gtid(ROUTER_INSTANCE *inst)
gtid_info.start,
gtid_info.end,
errmsg);
sqlite3_free(errmsg);
return false;
}
sqlite3_free(errmsg);
MXS_DEBUG("Saved MariaDB GTID '%s', %s:%lu,%lu, insert SQL [%s]",
gtid_info.gtid,
@ -3398,7 +3446,7 @@ bool blr_save_mariadb_gtid(ROUTER_INSTANCE *inst)
}
/**
* GTID select callbck for sqlite3 database
* GTID select callback for sqlite3 database
*
* @param data Data pointer from caller
* @param cols Number of columns
@ -3422,9 +3470,9 @@ static int gtid_select_cb(void *data, int cols, char** values, char** names)
result->file = MXS_STRDUP_A(values[1]);
result->start = atoll(values[2]);
result->end = atoll(values[3]);
}
ss_dassert(result->start > 0 && result->end > result->start);
ss_dassert(result->start > 0 && result->end > result->start);
}
return 0;
}
@ -3444,15 +3492,33 @@ bool blr_fetch_mariadb_gtid(ROUTER_SLAVE *slave,
{
char *errmsg = NULL;
char select_query[GTID_SQL_BUFFER_SIZE];
static const char select_tpl[] = "SELECT gtid, binlog_file, start_pos, end_pos "
MARIADB_GTID_ELEMS gtid_elms = {};
static const char select_tpl[] = "SELECT "
"(rep_domain ||"
" '-' || server_id ||"
" '-' || sequence) AS gtid, "
"binlog_file, "
"start_pos, "
"end_pos "
"FROM gtid_maps "
"WHERE gtid = '%s' LIMIT 1;";
"WHERE (rep_domain = %" PRIu32 " AND "
"server_id = %" PRIu32 " AND "
"sequence = %" PRIu64 ") "
"LIMIT 1;";
ss_dassert(gtid != NULL);
/* Parse GTID value into its components */
if (!blr_parse_gtid(gtid, &gtid_elms))
{
return false;
}
snprintf(select_query,
GTID_SQL_BUFFER_SIZE,
select_tpl,
gtid);
gtid_elms.domain_id,
gtid_elms.server_id,
gtid_elms.seq_no);
/* Find the GTID */
if (sqlite3_exec(slave->gtid_maps,
@ -3462,7 +3528,9 @@ bool blr_fetch_mariadb_gtid(ROUTER_SLAVE *slave,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to select GTID %s from GTID maps DB: %s, select [%s]",
gtid, errmsg, select_query);
gtid,
errmsg,
select_query);
sqlite3_free(errmsg);
return false;
}
@ -3518,3 +3586,89 @@ uint32_t blr_slave_get_file_size(const char *filename)
return 0;
}
}
/**
* Extract the GTID the client requested
*
* @param gtid Then input GTID
* @param info The GTID structure to fil
* @return True for a parsed GTID string or false
*/
bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info)
{
const char *ptr = gtid;
int read = 0;
int len = strlen(gtid);
while (ptr < gtid + len)
{
if (!isdigit(*ptr))
{
ptr++;
}
else
{
char *end;
switch (read)
{
case 0:
info->domain_id = strtoul(ptr, &end, 10);
break;
case 1:
info->server_id = strtoul(ptr, &end, 10);
break;
case 2:
info->seq_no = strtoul(ptr, &end, 10);
break;
}
read++;
ptr = end;
}
}
return (info->server_id && info->seq_no) ? true : false;
}
/**
* Get MariaDB GTID from repo
*
* @param router The current router instance
* @param gtid The GTID to look for
* @param result The (allocated) ouput data to fill
* @return False on sqlite errors
* True even if the gtid_maps is empty
* The caller must check result->gtid value
*/
bool blr_load_last_mariadb_gtid(ROUTER_INSTANCE *router,
MARIADB_GTID_INFO *result)
{
char *errmsg = NULL;
MARIADB_GTID_ELEMS gtid_elms = {};
static const char last_gtid[] = "SELECT "
"(rep_domain ||"
" '-' || server_id ||"
" '-' || sequence) AS gtid, "
"binlog_file, "
"MAX(start_pos) AS start_pos, "
"MAX(end_pos) AS end_pos "
"FROM gtid_maps "
"WHERE id = "
"(SELECT MAX(id) FROM gtid_maps);";
/* Find the GTID */
if (sqlite3_exec(router->gtid_maps,
last_gtid,
gtid_select_cb,
result,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to select last GTID from GTID maps DB: %s, select [%s]",
errmsg,
last_gtid);
sqlite3_free(errmsg);
return false;
}
return true;
}

View File

@ -1093,8 +1093,12 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
router->binlog_name,
router->current_pos);
/* Save the pending GTID value */
/* Save the pending GTID string value */
strcpy(router->pending_transaction.gtid, mariadb_gtid);
/* Save the pending GTID components */
router->pending_transaction.gtid_elms.domain_id = domainid;
router->pending_transaction.gtid_elms.server_id = hdr.serverid;
router->pending_transaction.gtid_elms.seq_no = n_sequence;
}
router->pending_transaction.start_pos = router->current_pos;
@ -1304,7 +1308,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
router->mariadb10_gtid)
{
/* Update last seen MariaDB GTID */
strcpy(router->last_mariadb_gtid, router->pending_transaction.gtid);
strcpy(router->last_mariadb_gtid,
router->pending_transaction.gtid);
/**
* Save MariaDB GTID into repo
*/
@ -3162,8 +3167,8 @@ static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router,
* @param ptr The packet data
*/
static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router,
REP_HEADER *hdr,
uint8_t *ptr)
REP_HEADER *hdr,
uint8_t *ptr)
{
ss_dassert(hdr->event_type == MARIADB10_GTID_GTID_LIST_EVENT);
@ -3182,7 +3187,6 @@ static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router,
* fill any GAP with an ignorable event
* if GTID_LIST next_pos is greter than current EOF
*/
if (hdr->next_pos && (hdr->next_pos > binlog_file_eof))
{
uint64_t hole_size = hdr->next_pos - binlog_file_eof;

View File

@ -274,7 +274,6 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
static int blr_send_fake_gtid_list(ROUTER_SLAVE *slave,
const char *gtid,
uint32_t serverid);
static bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info);
static bool blr_handle_maxwell_stmt(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
const char *maxwell_stmt);
@ -297,6 +296,8 @@ static inline void blr_get_file_fullpath(const char *binlog_file,
const char *root_dir,
char *full_path);
extern bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info);
/**
* Process a request packet from the slave server.
*
@ -6328,7 +6329,7 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
}
else
{
/* A Gtid has been found */
/* GTID has been found */
MXS_INFO("Found GTID '%s' for slave %lu at %s:%lu",
slave->mariadb_gtid,
(unsigned long)slave->serverid,
@ -6500,48 +6501,6 @@ static int blr_send_fake_gtid_list(ROUTER_SLAVE *slave,
return gl_event ? slave->dcb->func.write(slave->dcb, gl_event) : 0;
}
/**
* Extract the GTID the client requested
*
* @param gtid Then input GTID
* @param info The GTID structure to fil
* @return True for a parsed GTID string or false
*/
static bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info)
{
const char *ptr = gtid;
int read = 0;
int len = strlen(gtid);
while (ptr < gtid + len)
{
if (!isdigit(*ptr))
{
ptr++;
}
else
{
char *end;
switch (read)
{
case 0:
info->domain_id = strtoul(ptr, &end, 10);
break;
case 1:
info->server_id = strtoul(ptr, &end, 10);
break;
case 2:
info->seq_no = strtoul(ptr, &end, 10);
break;
}
read++;
ptr = end;
}
}
return (info->server_id && info->seq_no) ? true : false;
}
/**
* Handle received Maxwell statements from clients
*
@ -6967,19 +6926,24 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router,
{
if (slave->serverid != 0)
{
MXS_ERROR("Master GTID registration can be sent only via administration connection");
MXS_ERROR("Master GTID registration can be sent only"
" via administration connection");
blr_slave_send_error_packet(slave,
"Master GTID registration cannot be issued by a regitrating slave.",
(unsigned int)1198, NULL);
"Master GTID registration cannot be"
" issued by a registrating slave.",
1198, NULL);
return false;
}
if (router->master_state != BLRM_SLAVE_STOPPED)
{
MXS_ERROR("Master GTID registration needs stopped slave: issue STOP SLAVE first.");
MXS_ERROR("Master GTID registration needs stopped replication:"
" issue STOP SLAVE first.");
blr_slave_send_error_packet(slave,
"Cannot use Master GTID registration with a running slave; "
"run STOP SLAVE first",
(unsigned int)1198, NULL);
"Cannot use Master GTID registration"
" with running replication;"
" run STOP SLAVE first",
1198,
NULL);
return true;
}
/* If not mariadb GTID an error message will be returned */
@ -6988,34 +6952,50 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router,
if ((word = strtok_r(NULL, sep, &brkb)) != NULL)
{
char heading[GTID_MAX_LEN + 1];
MXS_INFO("Binlog server requests GTID '%s' to master",
word);
MARIADB_GTID_ELEMS gtid_elms = {};
// TODO: gtid_strip_chars routine for this
strcpy(heading, word + 1);
heading[strlen(heading) - 1] = '\0';
if (!heading[0])
MXS_INFO("Requesting GTID (%s) from Master server.",
!heading[0] ? "empty value" : heading);
/* Parse the non empty GTID value */
if (heading[0] && !blr_parse_gtid(heading, &gtid_elms))
{
MXS_ERROR("Cannot request empty GTID right now");
static const char *err_fmt = "Invalid format for GTID ('%s')"
" set request; use 'X-Y-Z'";
char err_msg[sizeof(err_fmt) + GTID_MAX_LEN + 1];
sprintf(err_msg, err_fmt, heading);
MXS_ERROR(err_msg);
/* Stop Master registration */
blr_slave_send_error_packet(slave,
"Empty GTID not implemented righ now",
(unsigned int)1198, NULL);
return false;
err_msg,
1198,
NULL);
}
else
{
strcpy(router->last_mariadb_gtid, heading);
blr_slave_send_ok(router, slave);
return true;
}
return true;
}
}
else
{
MXS_ERROR("Master GTID registration needs 'mariadb10_master_gtid' option to be set.");
MXS_ERROR("Master GTID registration needs 'mariadb10_master_gtid'"
" option to be set.");
blr_slave_send_error_packet(slave,
"Master GTID registration needs 'mariadb10_master_gtid' option to be set.",
(unsigned int)1198, NULL);
"Master GTID registration needs"
" 'mariadb10_master_gtid' option"
" to be set first.",
1198,
NULL);
return true;
}
}
@ -7078,7 +7058,8 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router,
{
if ((word = strtok_r(NULL, sep, &brkb)) == NULL)
{
MXS_ERROR("%s: Truncated SET NAMES command.", router->service->name);
MXS_ERROR("%s: Truncated SET NAMES command.",
router->service->name);
return false;
}
else if (strcasecmp(word, "latin1") == 0)