MXS-1266: added SHOW BINARY LOGS

SHOW BINARY LOGS new query shows binary logs which have GTID saved in
gtid_maps

These feature requires the ‘mariadb10_slave_gtid’ option
This commit is contained in:
MassimilianoPinto 2017-05-22 16:26:05 +02:00
parent 362824579d
commit f0bb2425aa

View File

@ -88,6 +88,22 @@
#include <maxscale/version.h>
#include <zlib.h>
#include <maxscale/alloc.h>
#include <inttypes.h>
/**
* This struct is used by sqlite3_exec callback routine
* for SHOW BINARY LOGS.
*
* It stores the next row sequence number,
* the last binlog file name read from gtid_maps storage
* and the connected client DCB.
*/
typedef struct
{
int seq_no; /* Output sequence in result test */
char *last_file; /* Last binlog file found in GTID repository */
DCB *client; /* Connected client DCB */
} BINARY_LOG_DATA_RESULT;
extern void poll_fake_write_event(DCB *dcb);
static char* get_next_token(char *str, const char* delim, char **saveptr);
@ -295,8 +311,17 @@ static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
static inline void blr_get_file_fullpath(const char *binlog_file,
const char *root_dir,
char *full_path);
static int blr_show_binary_logs(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave);
extern bool blr_parse_gtid(const char *gtid, MARIADB_GTID_ELEMS *info);
static int binary_logs_select_cb(void *data,
int cols,
char** values,
char** names);
static GWBUF *blr_create_result_row(const char *name,
const char *value,
int seq_no);
/**
* Process a request packet from the slave server.
@ -2710,7 +2735,7 @@ blr_slave_send_fieldcount(ROUTER_INSTANCE *router,
encode_value(ptr, 1, 24); // Add length of data packet
ptr += 3;
*ptr++ = 0x01; // Sequence number in response
*ptr++ = count; // Length of result string
*ptr++ = count; // Number of columns
return slave->dcb->func.write(slave->dcb, pkt);
}
@ -2847,8 +2872,18 @@ blr_slave_send_disconnected_server(ROUTER_INSTANCE *router,
}
blr_slave_send_fieldcount(router, slave, 2);
blr_slave_send_columndef(router, slave, "server_id", BLR_TYPE_INT, 40, seqno++);
blr_slave_send_columndef(router, slave, "state", BLR_TYPE_STRING, 40, seqno++);
blr_slave_send_columndef(router,
slave,
"server_id",
BLR_TYPE_INT,
40,
seqno++);
blr_slave_send_columndef(router,
slave,
"state",
BLR_TYPE_STRING,
40,
seqno++);
blr_slave_send_eof(router, slave, seqno++);
ptr = GWBUF_DATA(pkt);
@ -2968,15 +3003,24 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
char server_id[40];
char state[40];
uint8_t *ptr;
int len, seqno;
int len, seqno = 2;
GWBUF *pkt;
/* preparing output result */
blr_slave_send_fieldcount(router, slave, 2);
blr_slave_send_columndef(router, slave, "server_id", BLR_TYPE_INT, 40, 2);
blr_slave_send_columndef(router, slave, "state", BLR_TYPE_STRING, 40, 3);
blr_slave_send_eof(router, slave, 4);
seqno = 5;
blr_slave_send_columndef(router,
slave,
"server_id",
BLR_TYPE_INT,
40,
seqno++);
blr_slave_send_columndef(router,
slave,
"state",
BLR_TYPE_STRING,
40,
seqno++);
blr_slave_send_eof(router, slave, seqno++);
spinlock_acquire(&router->lock);
sptr = router->slaves;
@ -6634,6 +6678,27 @@ static bool blr_handle_show_stmt(ROUTER_INSTANCE *router,
blr_slave_show_warnings(router, slave);
return true;
}
else if (strcasecmp(word, "BINARY") == 0)
{
if (router->mariadb10_gtid)
{
blr_show_binary_logs(router, slave);
}
else
{
char *errmsg = "SHOW BINARY LOGS needs the"
" 'mariadb10_slave_gtid' option to be set.";
MXS_ERROR("%s: %s",
errmsg,
router->service->name);
blr_slave_send_error_packet(slave,
errmsg,
1198,
NULL);
}
return true;
}
else if (strcasecmp(word, "GLOBAL") == 0)
{
if (router->master_state == BLRM_UNCONFIGURED)
@ -7515,3 +7580,219 @@ static inline void blr_get_file_fullpath(const char *binlog_file,
strcat(full_path, "/");
strcat(full_path, binlog_file);
}
/**
* Returns the list of binlog files
*
* It's called olny if mariadb10_slave_gtid option is set
*
* Limitation: it doesn't return the current binlog file
* unless a GTID is found in it.
*
* @param router The router instance
* @param slave The connected client
* @retun Sent bytes
*/
static int
blr_show_binary_logs(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
char current_file[BINLOG_FNAMELEN];
uint64_t current_pos = 0;
static const char select_query[] = "SELECT binlog_file, "
"MAX(end_pos) AS size, "
"rep_domain, "
"server_id "
"FROM gtid_maps "
"GROUP BY binlog_file "
"ORDER BY binlog_file ASC;";
int seqno;
char *errmsg = NULL;
BINARY_LOG_DATA_RESULT result = {};
/* Get current binlog finename and position */
spinlock_acquire(&router->binlog_lock);
strcpy(current_file, router->binlog_name);
current_pos = router->current_pos;
spinlock_release(&router->binlog_lock);
/**
* First part of result set:
* send 2 columns and their defintions.
*/
/* This call sets seq to 1 in the packet */
blr_slave_send_fieldcount(router, slave, 2);
/* Set 'seqno' counter to next value: 2 */
seqno = 2;
/* Col 1 def */
blr_slave_send_columndef(router,
slave,
"Log_name",
BLR_TYPE_STRING,
40,
seqno++);
/* Col 2 def */
blr_slave_send_columndef(router,
slave,
"File_size",
BLR_TYPE_INT,
40,
seqno++);
/* Cols EOF */
blr_slave_send_eof(router, slave, seqno++);
/* Initialise the result data struct */
result.seq_no = seqno;
result.client = slave->dcb;
result.last_file = NULL;
/**
* Second part of result set:
*
* add rows for select binlog files.
*
* Note:
* - result.last_file is freed and updated by binary_logs_select_cb()
* - result.seq_no is increased
*/
if (sqlite3_exec(router->gtid_maps,
select_query,
binary_logs_select_cb,
&result,
&errmsg) != SQLITE_OK)
{
MXS_ERROR("Failed to exec 'SELECT binlog_file FROM gtid_maps': "
"%s", errmsg ? errmsg : "database is not available");
sqlite3_free(errmsg);
/* Free last_file */
MXS_FREE(result.last_file);
result.last_file = NULL;
/* Add EOF for empty result set */
return blr_slave_send_eof(router, slave, result.seq_no);
}
/**
* Check whether the last file is the current binlog file.
* If not then add the new row.
*/
if (strcmp(current_file, result.last_file) != 0)
{
char pos[40];
GWBUF *pkt;
/* Free last file */
MXS_FREE(result.last_file);
/* Use seqno from last sent row, already incremented */
seqno = result.seq_no;
/* Create the string value for pos */
sprintf(pos, "%" PRIu64, current_pos);
/* Create & write the new row */
if ((pkt = blr_create_result_row(current_file,
pos,
seqno)) != NULL)
{
slave->dcb->func.write(slave->dcb, pkt);
}
}
/* Increment seqno, and add the result set EOF and return */
return blr_slave_send_eof(router, slave, ++seqno);
}
/**
* Creates a Result Set row with two STRING columns
*
* @param val1 First column value
* @param val2 Second column value
* @param seq_no Sequence number for this row
* @return An allocated GWBUF or NULL
*/
GWBUF *blr_create_result_row(const char *val1,
const char *val2,
int seq_no)
{
int val1_len = strlen(val1);
int val2_len = strlen(val2);
GWBUF *pkt;
uint8_t *ptr;
int len = MYSQL_HEADER_LEN + (1 + val1_len + (1 + val2_len));
// Allocate a new GWBUF buffer
if ((pkt = gwbuf_alloc(len)) == NULL)
{
return NULL;
}
ptr = GWBUF_DATA(pkt);
// Add length of data packet
encode_value(ptr, len - MYSQL_HEADER_LEN, 24);
ptr += 3;
// Sequence number in response
*ptr++ = seq_no;
// Length of result string "val1"
*ptr++ = val1_len;
memcpy((char *)ptr, val1, val1_len);
ptr += val1_len;
// Length of result string "val2"
*ptr++ = val2_len;
memcpy((char *)ptr, val2, val2_len);
return pkt;
}
/**
* Binary logs select callback for sqlite3 database
*
* @param data Data pointer from caller
* @param cols Number of columns
* @param values The values
* @param names The column names
*
* @return 0 on success, 1 otherwise
*/
static int binary_logs_select_cb(void *data,
int cols,
char** values,
char** names)
{
BINARY_LOG_DATA_RESULT *data_set = (BINARY_LOG_DATA_RESULT *)data;
DCB *dcb = data_set->client;
int ret = 1; // Failure
ss_dassert(cols >= 4 && dcb);
if (values[0] && // File Name
values[1] && // File Size
values[2] && // Domain ID
values[3]) // Server ID
{
GWBUF *pkt;
/* File size != 0 && server ID != 0 */
ss_dassert(atoll(values[1]) && atoll(values[3]));
/* Create the MySQL Result Set row */
if ((pkt = blr_create_result_row(values[0], // File name
values[1], // File size
data_set->seq_no)) != NULL)
{
/* Increase sequence for next row */
data_set->seq_no++;
/* Free last file name */
MXS_FREE(data_set->last_file);
/* Set last file name */
data_set->last_file = MXS_STRDUP_A(values[0]);
/* Write packet to client */
dcb->func.write(dcb, pkt);
/* Set success */
ret = 0;
}
return ret; /* Return success or fallure */
}
else
{
return 0; /* Success: no data from db or end of result set */
}
}