MXS-1266: Allow save and read binlog files from a binlog cache directory tree

MXS-1266: Allow save and read binlog files from a binlog cache
directory tree
This commit is contained in:
MassimilianoPinto
2017-06-07 08:56:52 +02:00
parent a79a492601
commit 8f94b27fd5
5 changed files with 927 additions and 175 deletions

View File

@ -142,7 +142,8 @@ uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
BLFILE** filep);
BLFILE** filep,
const char *new_file);
static uint32_t blr_slave_send_fde(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
GWBUF *fde);
@ -313,7 +314,8 @@ static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
static inline void blr_get_file_fullpath(const char *binlog_file,
const char *root_dir,
char *full_path);
char *full_path,
const char *f_prefix);
static int blr_show_binary_logs(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
const char *extra_data);
@ -332,6 +334,12 @@ static bool blr_handle_complex_select(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
const char *col1,
const char *coln);
extern bool blr_is_current_binlog(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave);
extern bool blr_compare_binlogs(ROUTER_INSTANCE *router,
MARIADB_GTID_INFO *slave,
const char *r_file,
const char *s_file);
/**
* Process a request packet from the slave server.
*
@ -1808,7 +1816,7 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction.state > BLRM_NO_TRANSACTION &&
strcmp(router->binlog_name, slave->binlogfile) == 0 &&
blr_is_current_binlog(router, slave) &&
(slave->binlog_pos > router->binlog_position))
{
force_disconnect = true;
@ -1855,7 +1863,8 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
/**
* Check whether the request file is empty
* and try using next file in sequence.
* and try using next file in sequence or next one
* based on GTID mpas.
* If one or more files have been skipped then
* the slave->binlog_pos is set to 4 and
* slave->binlogname set to new filename.
@ -2046,6 +2055,9 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
int rotating = 0;
long burst_size;
char read_errmsg[BINLOG_ERROR_MSG_LEN + 1];
MARIADB_GTID_INFO *f_tree = router->storage_type == BLR_BINLOG_STORAGE_TREE ?
&slave->f_info :
NULL;
read_errmsg[BINLOG_ERROR_MSG_LEN] = '\0';
@ -2068,7 +2080,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
/* check for a pending transaction and safe position */
if (router->pending_transaction.state > BLRM_NO_TRANSACTION &&
strcmp(router->binlog_name, slave->binlogfile) == 0 &&
blr_is_current_binlog(router, slave) &&
(slave->binlog_pos > router->binlog_position))
{
do_return = 1;
@ -2098,7 +2110,9 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
if (file == NULL)
{
rotating = router->rotating;
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
if ((file = blr_open_binlog(router,
slave->binlogfile,
f_tree)) == NULL)
{
char err_msg[BINLOG_ERROR_MSG_LEN + 1];
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
@ -2244,9 +2258,13 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
beat1 = hkheartbeat;
#ifdef BLFILE_IN_SLAVE
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
if ((slave->file = blr_open_binlog(router,
slave->binlogfile,
f_tree)) == NULL)
#else
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
if ((file = blr_open_binlog(router,
slave->binlogfile,
f_tree)) == NULL)
#endif
{
char err_msg[BINLOG_ERROR_MSG_LEN + 1];
@ -2276,14 +2294,19 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
/* Send error that stops slave replication */
blr_send_custom_error(slave->dcb,
(slave->seqno - 1),
slave->seqno,
0,
err_msg,
"HY000",
BINLOG_FATAL_ERROR_READING);
gwbuf_free(record);
record = NULL;
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
break;
return 0;
}
#ifdef BLFILE_IN_SLAVE
file = slave->file;
@ -2440,7 +2463,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
poll_fake_write_event(slave->dcb);
}
else if (slave->binlog_pos == router->binlog_position &&
strcmp(slave->binlogfile, router->binlog_name) == 0)
blr_is_current_binlog(router, slave))
{
spinlock_acquire(&router->binlog_lock);
spinlock_acquire(&slave->catch_lock);
@ -2450,7 +2473,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
* and slave->catch_lock.
*/
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
!blr_is_current_binlog(router, slave))
{
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
@ -2473,10 +2496,11 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
}
else
{
if (slave->binlog_pos >= blr_file_size(file)
&& router->rotating == 0
&& strcmp(router->binlog_name, slave->binlogfile) != 0
&& blr_file_next_exists(router, slave))
char next_file[BINLOG_FNAMELEN + 1] = "";
if (slave->binlog_pos >= blr_file_size(file) &&
router->rotating == 0 &&
(!blr_is_current_binlog(router, slave) &&
blr_file_next_exists(router, slave, next_file)))
{
/* We may have reached the end of file of a non-current
* binlog file.
@ -2501,10 +2525,11 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
MXS_FREE(slave->encryption_ctx);
slave->encryption_ctx = NULL;
/* Now pass the next_file to blr_slave_fake_rotate() */
#ifdef BLFILE_IN_SLAVE
if (blr_slave_fake_rotate(router, slave, &slave->file))
if (blr_slave_fake_rotate(router, slave, &slave->file, next_file))
#else
if (blr_slave_fake_rotate(router, slave, &file))
if (blr_slave_fake_rotate(router, slave, &file, next_file))
#endif
{
spinlock_acquire(&slave->catch_lock);
@ -2516,6 +2541,10 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
{
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
#ifndef BLFILE_IN_SLAVE
blr_close_binlog(router, file);
#endif
return 0;
}
}
else
@ -2604,6 +2633,7 @@ blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
/**
* Rotate the slave to the new binlog file
*
* @param router The router instance
* @param slave The slave instance
* @param ptr The rotate event (minus header and OK byte)
*/
@ -2632,7 +2662,7 @@ blr_slave_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, uint8_t *ptr)
/**
* Generate an internal rotate event that we can use to cause
* the slave to move beyond a binlog file
* that is missisng the rotate eent at the end.
* that is missisng the rotate event at the end.
*
* @param router The router instance
* @param slave The slave to rotate
@ -2641,26 +2671,30 @@ blr_slave_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, uint8_t *ptr)
static int
blr_slave_fake_rotate(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
BLFILE** filep)
BLFILE** filep,
const char *new_file)
{
char *sptr;
int filenum;
GWBUF *r_event;
MARIADB_GTID_INFO *f_tree = router->storage_type == BLR_BINLOG_STORAGE_TREE ?
&slave->f_info :
NULL;
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
if ((sptr = strrchr(new_file, '.')) == NULL)
{
return 0;
}
blr_close_binlog(router, *filep);
filenum = atoi(sptr + 1);
sprintf(slave->binlogfile,
BINLOG_NAMEFMT,
router->fileroot,
filenum + 1);
/* Set Pos = 4 */
slave->binlog_pos = 4;
if ((*filep = blr_open_binlog(router, slave->binlogfile)) == NULL)
/* Set Filename */
strcpy(slave->binlogfile, new_file);
if ((*filep = blr_open_binlog(router,
new_file,
f_tree)) == NULL)
{
return 0;
}
@ -2668,7 +2702,7 @@ blr_slave_fake_rotate(ROUTER_INSTANCE *router,
/* Build Fake Rotate Event */
r_event = blr_build_fake_rotate_event(slave,
slave->binlog_pos,
slave->binlogfile,
new_file,
router->masterid);
return r_event ? slave->dcb->func.write(slave->dcb, r_event) : 0;
@ -2690,12 +2724,17 @@ blr_slave_read_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
uint8_t *ptr;
uint32_t chksum;
char err_msg[BINLOG_ERROR_MSG_LEN + 1];
MARIADB_GTID_INFO *f_tree = router->storage_type == BLR_BINLOG_STORAGE_TREE ?
&slave->f_info :
NULL;
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
memset(&hdr, 0, BINLOG_EVENT_HDR_LEN);
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
if ((file = blr_open_binlog(router,
slave->binlogfile,
f_tree)) == NULL)
{
return NULL;
}
@ -5899,13 +5938,18 @@ blr_slave_read_ste(ROUTER_INSTANCE *router,
uint8_t *ptr;
uint32_t chksum;
char err_msg[BINLOG_ERROR_MSG_LEN + 1];
MARIADB_GTID_INFO *f_tree = router->storage_type == BLR_BINLOG_STORAGE_TREE ?
&slave->f_info :
NULL;
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
memset(&hdr, 0, BINLOG_EVENT_HDR_LEN);
BLFILE *file;
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
if ((file = blr_open_binlog(router,
slave->binlogfile,
f_tree)) == NULL)
{
return 0;
}
@ -6374,6 +6418,26 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
bool req_file,
unsigned long req_pos)
{
MARIADB_GTID_INFO f_gtid = {};
uint32_t router_pos;
char router_curr_file[BINLOG_FNAMELEN + 1];
char last_gtid[GTID_MAX_LEN + 1];
spinlock_acquire(&router->binlog_lock);
// Set gtid as current router gtid
strcpy(last_gtid, router->last_mariadb_gtid);
// Set file as router current file
strcpy(router_curr_file, router->binlog_name);
// Set safe postion of current ruter file
router_pos = router->binlog_position;
// Set domain_id, server_id in case of empty/not found GTID
if (router->storage_type == BLR_BINLOG_STORAGE_TREE)
{
f_gtid.gtid_elms.domain_id = router->mariadb10_gtid_domain;
f_gtid.gtid_elms.server_id = router->orig_masterid;
}
spinlock_release(&router->binlog_lock);
MXS_INFO("Slave %lu is registering with MariaDB GTID '%s'",
(unsigned long)slave->serverid,
slave->mariadb_gtid);
@ -6384,15 +6448,22 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
* Empty GTID:
* Sending data from the router current file and pos 4
*/
strcpy(slave->binlogfile, router_curr_file);
slave->binlog_pos = 4;
MXS_INFO("Slave %lu is registering with empty GTID:"
" sending events from current binlog file %s, pos %lu",
" sending events from current binlog file %s,"
" pos %" PRIu32 "",
(unsigned long)slave->serverid,
slave->binlogfile,
(unsigned long)slave->binlog_pos);
slave->binlog_pos);
/* Add GTID details to slave struct */
memcpy(&slave->f_info, &f_gtid, sizeof(MARIADB_GTID_INFO));
return true;
}
else
{
MARIADB_GTID_INFO f_gtid = {};
char dbpath[PATH_MAX + 1];
snprintf(dbpath, sizeof(dbpath), "/%s/%s",
router->binlogdir, GTID_MAPS_DB);
@ -6412,6 +6483,7 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
sqlite3_errmsg(slave->gtid_maps));
slave->gtid_maps = NULL;
return false;
}
else
{
@ -6447,16 +6519,34 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
}
else
{
/* Right now: just use current router binlog file pos 4 */
/* No strict mode: */
// - 1 -Set request GTID as current master one
MXS_FREE(slave->mariadb_gtid);
slave->mariadb_gtid = MXS_STRDUP_A(last_gtid);
// - 2 - Use current router file and position
strcpy(slave->binlogfile, router_curr_file);
slave->binlog_pos = router_pos;
// - 3 Set GTID details for filename
if (router->storage_type == BLR_BINLOG_STORAGE_TREE)
{
memcpy(&slave->f_info, &f_gtid, sizeof(MARIADB_GTID_INFO));
}
}
}
else
{
/* GTID has been found */
MXS_INFO("Found GTID '%s' for slave %lu at %s:%lu",
MXS_INFO("Found GTID '%s' for slave %" PRIu32 ""
" at %" PRIu32 "/%" PRIu32 "/%s:%" PRIu64 ""
". Next event at %" PRIu64 "",
slave->mariadb_gtid,
(unsigned long)slave->serverid,
slave->serverid,
f_gtid.gtid_elms.domain_id,
f_gtid.gtid_elms.server_id,
f_gtid.file,
f_gtid.start,
f_gtid.end);
/**
@ -6487,6 +6577,9 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
slave->binlog_pos = req_pos;
}
/* Set GTID details in f_info*/
memcpy(&slave->f_info, &f_gtid, sizeof(MARIADB_GTID_INFO));
/* Free gtid and file from result */
MXS_FREE(f_gtid.gtid);
MXS_FREE(f_gtid.file);
@ -7074,7 +7167,7 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router,
}
return true;
}
else if (strstr(word, "@@global.gtid_slave_pos") != NULL)
else if (strcasecmp(word, "@@global.gtid_slave_pos") == 0)
{
if (slave->serverid != 0)
{
@ -7172,7 +7265,7 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router,
* in case of a fresh new setup.
*/
MXS_FREE(slave->mariadb_gtid);
slave->mariadb_gtid = MXS_STRDUP(heading);
slave->mariadb_gtid = MXS_STRDUP_A(heading);
blr_slave_send_ok(router, slave);
return true;
@ -7598,11 +7691,16 @@ static bool blr_handle_admin_stmt(ROUTER_INSTANCE *router,
static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave)
{
char binlog_file[BLRM_BINLOG_NAME_STR_LEN + 1];
char router_curr_file[BLRM_BINLOG_NAME_STR_LEN + 1];
char binlog_file[BINLOG_FNAMELEN + 1];
char router_curr_file[BINLOG_FNAMELEN + 1];
char file_path[PATH_MAX + 1] = "";
unsigned int seqno;
bool skipped_files = false;
char t_prefix[BINLOG_FILE_EXTRA_INFO] = "";
MARIADB_GTID_INFO *f_tree = router->storage_type == BLR_BINLOG_STORAGE_TREE ?
&slave->f_info :
NULL;
char next_file[BINLOG_FNAMELEN + 1] = "";
// Save the current router binlog filename
spinlock_acquire(&router->binlog_lock);
@ -7612,18 +7710,32 @@ static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
// Set the starting filename
strcpy(binlog_file, slave->binlogfile);
//Add tree prefix
if (f_tree)
{
sprintf(t_prefix,
"%" PRIu32 "/%" PRIu32 "/",
f_tree->gtid_elms.domain_id,
f_tree->gtid_elms.server_id);
}
// Get binlog filename full-path
blr_get_file_fullpath(binlog_file,
router->binlogdir,
file_path);
file_path,
t_prefix[0] ? t_prefix: NULL);
/**
* Set the next file in sequence if current file has 4 bytes size.
* Stop if the new file is the urrent binlog file.
* Get the next file in sequence or next by GTID maps
* if current file has 4 bytes size.
* Stop if the new file is the current binlog file.
*/
while (strcmp(binlog_file, router_curr_file) != 0 &&
while (!blr_compare_binlogs(router,
f_tree,
router_curr_file,
binlog_file) &&
blr_slave_get_file_size(file_path) == 4 &&
(seqno = blr_file_get_next_seqno(binlog_file)) > 0)
blr_file_next_exists(router, slave, next_file))
{
// Log skipped file
MXS_INFO("Slave %s:%i, skip reading empty file '%s' (4 bytes size).",
@ -7631,13 +7743,14 @@ static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
dcb_get_port(slave->dcb),
binlog_file);
// Set next in sequence binlog file name
sprintf(binlog_file, BINLOG_NAMEFMT, router->fileroot, seqno);
// Update binlog_file name
sprintf(binlog_file, next_file);
// Get binlog file full-path
blr_get_file_fullpath(binlog_file,
router->binlogdir,
file_path);
file_path,
t_prefix[0] ? t_prefix: NULL);
skipped_files = true;
}
@ -7657,13 +7770,20 @@ static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
* @param root_dir The binlog storage directory
* @param full_path The output fullpahth name:
* the memory area must be preallocated.
* @param t_prefix The file_tree prefix with rep_domain
* and server_id.
*/
static inline void blr_get_file_fullpath(const char *binlog_file,
const char *root_dir,
char *full_path)
char *full_path,
const char *t_prefix)
{
strcpy(full_path, root_dir);
strcat(full_path, "/");
if (t_prefix)
{
strcat(full_path, t_prefix);
}
strcat(full_path, binlog_file);
}
@ -7775,6 +7895,7 @@ blr_show_binary_logs(ROUTER_INSTANCE *router,
* Check whether the last file is the current binlog file.
* If not then add the new row.
*/
// TODO: check whether to FIX with prefix
if (strcmp(current_file, result.last_file) != 0)
{
char pos[40];
@ -7871,6 +7992,12 @@ static int binary_logs_select_cb(void *data,
char filename[1 +
strlen(values[0]) +
BINLOG_FILE_EXTRA_INFO];
char t_prefix[BINLOG_FILE_EXTRA_INFO] = "";
sprintf(t_prefix,
"%s/%s/",
values[2], // domain ID
values[3]); // server ID
fsize = atoll(values[1]);
@ -7890,7 +8017,8 @@ static int binary_logs_select_cb(void *data,
//Get binlog filename full-path
blr_get_file_fullpath(values[0],
data_set->binlogdir,
file_path);
file_path,
t_prefix);
//Get the file size
fsize = blr_slave_get_file_size(file_path);
@ -7900,9 +8028,8 @@ static int binary_logs_select_cb(void *data,
if (data_set->extra_info)
{
sprintf(filename,
"%s/%s/%s",
values[2], // domain ID
values[3], // server ID
"%s%s",
t_prefix,
values[0]); // filename
}
else