Support non-default binlog filenames

Add support for bin log file names that are shorter than the default.
Handle events per more than 2 packets
This commit is contained in:
Mark Riddoch
2014-06-04 18:37:41 +01:00
parent 4b5f801ff9
commit bb0e6c3858
5 changed files with 99 additions and 20 deletions

View File

@ -262,6 +262,7 @@ GWBUF *
gwbuf_consume(GWBUF *head, unsigned int length) gwbuf_consume(GWBUF *head, unsigned int length)
{ {
GWBUF *rval = head; GWBUF *rval = head;
CHK_GWBUF(head); CHK_GWBUF(head);
GWBUF_CONSUME(head, length); GWBUF_CONSUME(head, length);
CHK_GWBUF(head); CHK_GWBUF(head);
@ -271,6 +272,9 @@ GWBUF *rval = head;
rval = head->next; rval = head->next;
gwbuf_free(head); gwbuf_free(head);
} }
ss_dassert(rval->end > rval->start);
return rval; return rval;
} }

View File

@ -35,7 +35,8 @@
#define BINLOG_FNAMELEN 16 #define BINLOG_FNAMELEN 16
#define BLR_PROTOCOL "MySQLBackend" #define BLR_PROTOCOL "MySQLBackend"
#define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e } #define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e }
#define BINLOG_NAMEFMT "mysql-bin.%06d" #define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin"
/** /**
* High and Low water marks for the slave dcb. These values can be overriden * High and Low water marks for the slave dcb. These values can be overriden
@ -172,6 +173,7 @@ typedef struct router_instance {
int serverid; /*< Server ID to use with master */ int serverid; /*< Server ID to use with master */
char *user; /*< User name to use with master */ char *user; /*< User name to use with master */
char *password; /*< Password to use with master */ char *password; /*< Password to use with master */
char *fileroot; /*< Root of binlog filename */
DCB *master; /*< DCB for master connection */ DCB *master; /*< DCB for master connection */
SESSION *session; /*< Fake session for master connection */ SESSION *session; /*< Fake session for master connection */
unsigned int master_state; /*< State of the master FSM */ unsigned int master_state; /*< State of the master FSM */

View File

@ -56,7 +56,7 @@
extern int lm_enabled_logfiles_bitmask; extern int lm_enabled_logfiles_bitmask;
static char *version_str = "V1.0.0"; static char *version_str = "V1.0.1";
/* The router entry points */ /* The router entry points */
static ROUTER *createInstance(SERVICE *service, char **options); static ROUTER *createInstance(SERVICE *service, char **options);
@ -234,6 +234,10 @@ int i;
{ {
inst->masterid = atoi(value); inst->masterid = atoi(value);
} }
else if (strcmp(options[i], "filestem") == 0)
{
inst->fileroot = strdup(value);
}
else if (strcmp(options[i], "lowwater") == 0) else if (strcmp(options[i], "lowwater") == 0)
{ {
inst->low_water = atoi(value); inst->low_water = atoi(value);
@ -252,6 +256,8 @@ int i;
} }
} }
} }
if (inst->fileroot == NULL)
inst->fileroot = strdup(BINLOG_NAME_ROOT);
} }
/* /*

View File

@ -33,6 +33,7 @@
#include <string.h> #include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <dirent.h>
#include <fcntl.h> #include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <service.h> #include <service.h>
@ -68,6 +69,9 @@ blr_file_init(ROUTER_INSTANCE *router)
{ {
char *ptr, path[1024], filename[1050]; char *ptr, path[1024], filename[1050];
int file_found, n = 1; int file_found, n = 1;
int root_len, i;
DIR *dirp;
struct dirent *dp;
strcpy(path, "/usr/local/skysql/MaxScale"); strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL) if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
@ -79,9 +83,25 @@ int file_found, n = 1;
if (access(path, R_OK) == -1) if (access(path, R_OK) == -1)
mkdir(path, 0777); mkdir(path, 0777);
/* First try to find a binlog file number by reading the directory */
root_len = strlen(router->fileroot);
dirp = opendir(path);
while ((dp = readdir(dirp)) != NULL)
{
if (strncmp(dp->d_name, router->fileroot, root_len) == 0)
{
i = atoi(dp->d_name + root_len + 1);
if (i > n)
n = i;
}
}
closedir(dirp);
file_found = 0; file_found = 0;
do { do {
sprintf(filename, "%s/" BINLOG_NAMEFMT, path, n); sprintf(filename, "%s/" BINLOG_NAMEFMT, path, router->fileroot, n);
if (access(filename, R_OK) != -1) if (access(filename, R_OK) != -1)
{ {
file_found = 1; file_found = 1;
@ -94,12 +114,12 @@ int file_found, n = 1;
if (n == 0) // No binlog files found if (n == 0) // No binlog files found
{ {
sprintf(filename, BINLOG_NAMEFMT, 1); sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1);
blr_file_create(router, filename); blr_file_create(router, filename);
} }
else else
{ {
sprintf(filename, BINLOG_NAMEFMT, n); sprintf(filename, BINLOG_NAMEFMT, router->fileroot, n);
blr_file_append(router, filename); blr_file_append(router, filename);
} }

View File

@ -153,7 +153,7 @@ char query[128];
return; return;
} }
if (MYSQL_RESPONSE_ERR(buf)) if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf))
{ {
LOGIF(LM, (skygw_log_write( LOGIF(LM, (skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
@ -422,19 +422,46 @@ int no_residual = 1;
if (reslen < len && gwbuf_length(pkt) >= len) if (reslen < len && gwbuf_length(pkt) >= len)
{ {
/* /*
* The message is contianed in more than the current * The message is contained in more than the current
* buffer, however we have the complete messasge in * buffer, however we have the complete messasge in
* this buffer and the chain of remaining buffers. * this buffer and the chain of remaining buffers.
* *
* Allocate a contiguous buffer for the binlog message * Allocate a contiguous buffer for the binlog message
* and copy the complete message into this buffer. * and copy the complete message into this buffer.
*/ */
msg = malloc(len); int remainder = len;
GWBUF *p = pkt;
if ((msg = malloc(len)) == NULL)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Insufficient memory to buffer event "
"of %d bytes\n.", len)));
break;
}
ptr = msg;
while (p && remainder > 0)
{
int plen = GWBUF_LENGTH(p);
int n = (remainder > plen ? plen : remainder);
memcpy(ptr, GWBUF_DATA(p), n);
remainder -= n;
ptr += n;
if (remainder > 0)
p = p->next;
}
if (remainder)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Expected entire message in buffer "
"chain, but failed to create complete "
"message as expected.\n")));
break;
}
if (GWBUF_LENGTH(pkt->next) < len - reslen)
printf("Packet (length %d) spans more than 2 buffers\n", len);
memcpy(msg, pdata, reslen);
memcpy(&msg[reslen], GWBUF_DATA(pkt->next), len - reslen);
ptr = msg; ptr = msg;
} }
else if (reslen < len) else if (reslen < len)
@ -456,6 +483,13 @@ int no_residual = 1;
} }
blr_extract_header(ptr, &hdr); blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
{
printf("Packet length is %d, but event size is %d\n",
len, hdr.event_size);
abort();
}
if (hdr.ok == 0) if (hdr.ok == 0)
{ {
router->stats.n_binlogs++; router->stats.n_binlogs++;
@ -525,12 +559,14 @@ int no_residual = 1;
{ {
free(msg); free(msg);
msg = NULL; msg = NULL;
pkt = gwbuf_consume(pkt, reslen);
pkt = gwbuf_consume(pkt, len - reslen);
} }
else while (len > 0)
{ {
pkt = gwbuf_consume(pkt, len); int n, plen;
plen = GWBUF_LENGTH(pkt);
n = (plen < len ? plen : len);
pkt = gwbuf_consume(pkt, n);
len -= n;
} }
} }
@ -596,15 +632,18 @@ uint32_t rval = 0, shift = 0;
static void static void
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr) blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
{ {
int len; int len, slen;
uint64_t pos; uint64_t pos;
char file[BINLOG_FNAMELEN+1]; char file[BINLOG_FNAMELEN+1];
ptr += 19; // Skip event header ptr += 19; // Skip event header
len = hdr->event_size - 19; // Event size minus header len = hdr->event_size - 19; // Event size minus header
pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32);
memcpy(file, ptr + 8, BINLOG_FNAMELEN); slen = len - 8;
file[BINLOG_FNAMELEN] = 0; if (slen > BINLOG_FNAMELEN)
slen = BINLOG_FNAMELEN;
memcpy(file, ptr + 8, slen);
file[slen] = 0;
#ifdef VEBOSE_ROTATE #ifdef VEBOSE_ROTATE
printf("binlog rotate: "); printf("binlog rotate: ");
@ -614,7 +653,7 @@ char file[BINLOG_FNAMELEN+1];
printf("New file: %s @ %ld\n", file, pos); printf("New file: %s @ %ld\n", file, pos);
#endif #endif
if (strncmp(router->binlog_name, file, BINLOG_FNAMELEN) != 0) if (strncmp(router->binlog_name, file, slen) != 0)
{ {
router->stats.n_rotates++; router->stats.n_rotates++;
blr_file_rotate(router, file, pos); blr_file_rotate(router, file, pos);
@ -631,6 +670,14 @@ CreateMySQLAuthData(char *username, char *password, char *database)
{ {
MYSQL_session *auth_info; MYSQL_session *auth_info;
if (username == NULL || password == NULL)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"You must specify both username and password for the binlog router.\n")));
return NULL;
}
if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL) if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL)
return NULL; return NULL;
strcpy(auth_info->user, username); strcpy(auth_info->user, username);