MXS-770 Addition of PURGE BINARY LOGS feature

PURGE BINARY LOGS; deletes all files in binlogdir and GTID maps repo
but keeps current binlog file.

PURGE BINARY LOGS TO ‘file’; deletes all files in binlogdir and GTID
maps repo up to specified file.

mariadb10_slave_gtid=On option is needed in order to keep the list of
binlog files.
This commit is contained in:
MassimilianoPinto 2017-08-04 11:33:31 +02:00
parent a438ff7c86
commit 113d2ad87a

View File

@ -100,11 +100,13 @@
*/
typedef struct
{
int seq_no; /* Output sequence in result test */
int seq_no; /* Output sequence in result set */
char *last_file; /* Last binlog file found in GTID repo */
const char *binlogdir; /* Binlog files cache dir */
DCB *client; /* Connected client DCB */
bool use_tree; /* Binlog structure type */
size_t n_files; /* How many files */
uint64_t rowid; /* ROWID of router current file*/
} BINARY_LOG_DATA_RESULT;
extern void poll_fake_write_event(DCB *dcb);
@ -342,6 +344,13 @@ extern bool blr_compare_binlogs(ROUTER_INSTANCE *router,
MARIADB_GTID_INFO *slave,
const char *r_file,
const char *s_file);
static bool blr_purge_binary_logs(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
char *purge_stmt);
static int binary_logs_find_file_cb(void *data,
int cols,
char** values,
char** names);
/**
* Process a request packet from the slave server.
@ -7673,6 +7682,40 @@ static bool blr_handle_admin_stmt(ROUTER_INSTANCE *router,
MXS_ERROR("%s: Incomplete admin command.", router->service->name);
return false;
}
/* Handle PURGE command */
else if (strcasecmp(admin_stmt, "PURGE") == 0)
{
if (router->master_state != BLRM_SLAVE_STOPPED)
{
blr_slave_send_error_packet(slave,
"Cannot execute PURGE BINARY LOGS "
"with a running slave; "
"run STOP SLAVE first.",
1198,
NULL);
return true;
}
/* Check for GTID support */
if (router->mariadb10_gtid)
{
blr_purge_binary_logs(router, slave, admin_opts);
}
else
{
char *errmsg = "PURGE BINARY LOGS needs the "
"'mariadb10_slave_gtid' option to be set.";
MXS_ERROR("%s: %s",
errmsg,
router->service->name);
blr_slave_send_error_packet(slave,
errmsg,
1198,
NULL);
}
return true;
}
/* Handle RESET command */
else if (strcasecmp(admin_stmt, "RESET") == 0)
{
@ -8149,6 +8192,7 @@ blr_show_binary_logs(ROUTER_INSTANCE *router,
"server_id, "
"binlog_file "
"ORDER BY id ASC;";
static char sql_stmt[GTID_SQL_BUFFER_SIZE];
int seqno;
char *errmsg = NULL;
BINARY_LOG_DATA_RESULT result = {};
@ -8205,6 +8249,7 @@ blr_show_binary_logs(ROUTER_INSTANCE *router,
* - result.last_file is freed and updated by binary_logs_select_cb()
* - result.seq_no is increased
*/
if (sqlite3_exec(router->gtid_maps,
!result.use_tree ?
select_query :
@ -8225,7 +8270,7 @@ blr_show_binary_logs(ROUTER_INSTANCE *router,
return blr_slave_send_eof(router, slave, result.seq_no);
}
/* Use seqno ofof last sent packet */
/* Use seqno of last sent packet */
seqno = result.seq_no;
/**
@ -8514,3 +8559,322 @@ static bool blr_handle_complex_select(ROUTER_INSTANCE *router,
return false;
}
}
/**
* Purge binary logs find binlog callback for sqlite3 database
*
* @param data Data pointer from caller
* @param cols Number of columns
* @param values The values
* @param names The column names
*
* @return 0 on success, 1 otherwise
*/
static int binary_logs_find_file_cb(void *data,
int cols,
char** values,
char** names)
{
ss_dassert(cols == 2);
BINARY_LOG_DATA_RESULT *data_set = (BINARY_LOG_DATA_RESULT *)data;
if (values[0]) // Server ID
{
data_set->rowid = atoll(values[0]);
}
return 0;
}
/**
* Purge binary logs delete files callback for sqlite3 database
*
* @param data Data pointer from caller
* @param cols Number of columns
* @param values The values
* @param names The column names
*
* @return 0 on success, 1 otherwise
*/
static int binary_logs_purge_cb(void *data,
int cols,
char** values,
char** names)
{
ss_dassert(cols == 2);
BINARY_LOG_DATA_RESULT *result_data = (BINARY_LOG_DATA_RESULT *)data;
if (values[0] && values[1])
{
char *filename;
char full_path[PATH_MAX + 1];
/* values[0] is filename, values[1] is prefix + file */
filename = !result_data->use_tree ?
values[0] :
values[1];
sprintf(full_path, "%s/%s", result_data->binlogdir, filename);
MXS_DEBUG("Deleting binlog file %s", full_path);
if (unlink(full_path) == -1 && errno != ENOENT)
{
MXS_ERROR("Failed to remove binlog file '%s': %d, %s",
full_path, errno, mxs_strerror(errno));
}
result_data->n_files++;
}
return 0;
}
/**
* Parse the PURGE BINARY LOGS [TO 'file'] SQL statement
* and return last_file or the one found in the command.
*
* @param purge_command The SQL command to parse
* @param last_file Current binlog file name
* @return A pointer to the file
* or NULL in case of parse errors.
*/
static const char *blr_purge_getfile(char *purge_command,
const char *last_file)
{
char *word;
char *brkb;
char *sep = " \t";
word = strtok_r(purge_command, sep, &brkb);
// Check BINARY
if (strcasecmp(word, "BINARY") != 0)
{
MXS_ERROR("Invalid PURGE command: PURGE %s", word);
return NULL;
}
word = strtok_r(NULL, sep, &brkb);
// Check LOGS
if (!word || strcasecmp(word, "LOGS") != 0)
{
MXS_ERROR("Invalid PURGE command: PURGE BINARY %s",
word ? word : "");
return NULL;
}
word = strtok_r(NULL, sep, &brkb);
// Nothing else, return last file
if (!word)
{
return last_file;
}
else
// Check for TO 'file'
{
if (strcasecmp(word, "TO") != 0)
{
MXS_ERROR("Invalid PURGE command: PURGE BINARY LOGS %s", word);
return NULL;
}
// Get filename
if ((word = strtok_r(NULL, sep, &brkb)) != NULL)
{
// Remove heading and trailing "'"
char *p = word;
if (*p == '\'')
{
word++;
}
if (p[strlen(p) - 1] == '\'')
{
p[strlen(p) - 1] = '\0';
}
return word;
}
else
{
MXS_ERROR("Invalid PURGE command: PURGE BINARY LOGS TO %s", word);
return NULL;
}
}
return NULL;
}
/**
* Purge MaxScale binlog files
*
* The routine it's called olny if mariadb10_slave_gtid option is set
* as the up to date list of binlog files is in the GTID maps repo.
*
* Note: the current binlog file is not deleted frm disk/db.
*
* @param router The router instance
* @param slave The connected client
* @param purge_opts The PURGE BINARY LOGS options
* @retun Sent bytes
*/
static bool
blr_purge_binary_logs(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
char *purge_opts)
{
char *errmsg = NULL;
size_t n_delete = 0;
// Select first ROWID of current binlog file
static const char last_file_tpl[] = "SELECT MIN(ROWID) AS min_id, "
"(rep_domain || '/' || "
"server_id || '/' || "
"binlog_file) AS file "
"FROM gtid_maps "
"WHERE file = '%s' "
"GROUP BY file "
"ORDER BY id ASC;";
// Select first ROWID of user specifed file
static const char find_file_tpl[] = "SELECT MIN(ROWID) AS min_id, "
"(rep_domain || '/' || "
"server_id || '/' || "
"binlog_file) AS file "
"FROM gtid_maps "
"WHERE binlog_file = '%s' "
"GROUP BY binlog_file "
"ORDER BY binlog_file ASC;";
// SELECT files with ROWID < given one and DELETE
static const char delete_list_tpl[] = "SELECT binlog_file, "
"(rep_domain || '/' || "
"server_id || '/' || "
"binlog_file) AS file "
"FROM gtid_maps "
"WHERE ROWID < %" PRIu64 " "
"GROUP BY file "
"ORDER BY id ASC; "
"DELETE FROM gtid_maps "
"WHERE ROWID < %" PRIu64 ";";
static char sql_stmt[GTID_SQL_BUFFER_SIZE];
BINARY_LOG_DATA_RESULT result;
static const char *last_file;
static char current_file[BINLOG_FILE_EXTRA_INFO + BINLOG_FNAMELEN + 1];
/**
* Parse PURGE BINARY LOGS [TO 'file'] statement:
* If file is not set then router->binlog_name is returned.
*/
if ((last_file = blr_purge_getfile(purge_opts,
router->binlog_name)) == NULL)
{
// Abort on parsing failure
blr_slave_send_error_packet(slave,
"Malformed PURGE BINARY LOGS [TO 'file'] detected.",
1064,
"42000");
return false;
}
/* Initialise result data fields */
result.rowid = 0;
result.n_files = 0;
result.binlogdir = router->binlogdir;
result.use_tree = router->storage_type == BLR_BINLOG_STORAGE_TREE;
/* Acquire lock accessing router structure fields */
spinlock_acquire(&router->binlog_lock);
/* Compare last file with router current file */
bool use_last = strcmp(last_file, router->binlog_name) == 0;
/* Set current file */
sprintf(current_file,
"%" PRIu32 "/%" PRIu32 "/%s",
router->mariadb10_gtid_domain,
router->orig_masterid,
router->binlog_name);
/* Release lock */
spinlock_release(&router->binlog_lock);
/**
* Prepare SQL statement for last file ROWID o find_file ROWID
*/
if (use_last)
{
/* Use current file, with prefix */
sprintf(sql_stmt,
last_file_tpl,
current_file);
}
else
{
/* Use the provided name, no prefix: find the first row */
sprintf(sql_stmt,
find_file_tpl,
last_file);
}
/* Get file rowid */
if (sqlite3_exec(router->gtid_maps,
sql_stmt,
binary_logs_find_file_cb,
&result,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("PURGE BINARY LOGS: failed to select ROWID of current file "
"from GTID maps DB, %s, select [%s]",
errmsg,
sql_stmt);
sqlite3_free(errmsg);
blr_slave_send_error_packet(slave,
"Cannot find current file in binlog GTID DB.",
1373,
NULL);
return false;
}
if (result.rowid)
{
/* Prepare SQL statement for ROWID < result.rowid */
sprintf(sql_stmt,
delete_list_tpl,
result.rowid,
result.rowid);
/* Purge all files with ROWID < result.rowid */
if (sqlite3_exec(router->gtid_maps,
sql_stmt,
binary_logs_purge_cb,
&result,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to select list of files to purge"
"from GTID maps DB: %s, select [%s]",
errmsg,
sql_stmt);
sqlite3_free(errmsg);
blr_slave_send_error_packet(slave,
"Cannot build the purge list of files.",
1373,
NULL);
return false;
}
}
else
{
blr_slave_send_error_packet(slave,
"Target log not found in binlog index",
1373,
NULL);
return false;
}
MXS_INFO("Deleted %lu binlog files in %s",
result.n_files,
result.binlogdir);
// Send OK and nithing else
blr_slave_send_ok(router, slave);
return true;
}