Changes for file system full detection
This commit is contained in:
@ -429,9 +429,9 @@ extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr);
|
|||||||
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
|
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
|
||||||
extern void blr_init_cache(ROUTER_INSTANCE *);
|
extern void blr_init_cache(ROUTER_INSTANCE *);
|
||||||
|
|
||||||
extern void blr_file_init(ROUTER_INSTANCE *);
|
extern int blr_file_init(ROUTER_INSTANCE *);
|
||||||
extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
|
extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
|
||||||
extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
|
extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
|
||||||
extern void blr_file_flush(ROUTER_INSTANCE *);
|
extern void blr_file_flush(ROUTER_INSTANCE *);
|
||||||
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
|
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
|
||||||
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *);
|
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *);
|
||||||
|
@ -242,15 +242,30 @@ int fd;
|
|||||||
* @param router The router instance
|
* @param router The router instance
|
||||||
* @param buf The binlog record
|
* @param buf The binlog record
|
||||||
* @param len The length of the binlog record
|
* @param len The length of the binlog record
|
||||||
|
* @return Return the number of bytes written
|
||||||
*/
|
*/
|
||||||
void
|
int
|
||||||
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
|
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
|
||||||
{
|
{
|
||||||
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
|
int n;
|
||||||
|
|
||||||
|
if ((n = pwrite(router->binlog_fd, buf, hdr->event_size,
|
||||||
|
hdr->next_pos - hdr->event_size)) != hdr->event_size)
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||||
|
"%s: Failed to write binlog record at %d of %s. "
|
||||||
|
"Truncating to previous record.",
|
||||||
|
router->service->name, hdr->next_pos - hdr->event_size,
|
||||||
|
router->binlog_name)));
|
||||||
|
/* Remove any partual event that was written */
|
||||||
|
ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
spinlock_acquire(&router->binlog_lock);
|
spinlock_acquire(&router->binlog_lock);
|
||||||
router->binlog_position = hdr->next_pos;
|
router->binlog_position = hdr->next_pos;
|
||||||
router->last_written = hdr->next_pos - hdr->event_size;
|
router->last_written = hdr->next_pos - hdr->event_size;
|
||||||
spinlock_release(&router->binlog_lock);
|
spinlock_release(&router->binlog_lock);
|
||||||
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -71,13 +71,13 @@ static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
|
|||||||
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
|
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
|
||||||
void encode_value(unsigned char *data, unsigned int value, int len);
|
void encode_value(unsigned char *data, unsigned int value, int len);
|
||||||
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
|
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
|
||||||
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
|
static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
|
||||||
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||||
static void *CreateMySQLAuthData(char *username, char *password, char *database);
|
static void *CreateMySQLAuthData(char *username, char *password, char *database);
|
||||||
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
||||||
inline uint32_t extract_field(uint8_t *src, int bits);
|
inline uint32_t extract_field(uint8_t *src, int bits);
|
||||||
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
||||||
|
static void blr_master_close(ROUTER_INSTANCE *);
|
||||||
static int keepalive = 1;
|
static int keepalive = 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -247,6 +247,37 @@ int do_reconnect = 0;
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown a connection to the master
|
||||||
|
*
|
||||||
|
* @param router The router instance
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
blr_master_close(ROUTER_INSTANCE *router)
|
||||||
|
{
|
||||||
|
dcb_close(router->master);
|
||||||
|
router->master_state = BLRM_UNCONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mark this master connection for a delayed reconnect, used during
|
||||||
|
* error recovery to cause a reconnect after 60 seconds.
|
||||||
|
*
|
||||||
|
* @param router The router instance
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
blr_master_delayed_connect(ROUTER_INSTANCE *router)
|
||||||
|
{
|
||||||
|
char *name;
|
||||||
|
|
||||||
|
if ((name = malloc(strlen(router->service->name)
|
||||||
|
+ strlen(" Master Recovery")+1)) != NULL);
|
||||||
|
{
|
||||||
|
sprintf(name, "%s Master Recovery", router->service->name);
|
||||||
|
hktask_oneshot(name, blr_start_master, router, 60);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Binlog router master side state machine event handler.
|
* Binlog router master side state machine event handler.
|
||||||
*
|
*
|
||||||
@ -809,10 +840,36 @@ static REP_HEADER phdr;
|
|||||||
// into the binlog file
|
// into the binlog file
|
||||||
if (hdr.event_type == ROTATE_EVENT)
|
if (hdr.event_type == ROTATE_EVENT)
|
||||||
router->rotating = 1;
|
router->rotating = 1;
|
||||||
blr_write_binlog_record(router, &hdr, ptr);
|
if (blr_write_binlog_record(router, &hdr, ptr) == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Failed to write to the
|
||||||
|
* binlog file, destroy the
|
||||||
|
* buffer chain and close the
|
||||||
|
* connection with the master
|
||||||
|
*/
|
||||||
|
while ((pkt = gwbuf_consume(pkt,
|
||||||
|
GWBUF_LENGTH(pkt))) != NULL);
|
||||||
|
blr_master_close(router);
|
||||||
|
blr_master_delayed_connect(router);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (hdr.event_type == ROTATE_EVENT)
|
if (hdr.event_type == ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
blr_rotate_event(router, ptr, &hdr);
|
if (!blr_rotate_event(router, ptr, &hdr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Failed to write to the
|
||||||
|
* binlog file, destroy the
|
||||||
|
* buffer chain and close the
|
||||||
|
* connection with the master
|
||||||
|
*/
|
||||||
|
while ((pkt = gwbuf_consume(pkt,
|
||||||
|
GWBUF_LENGTH(pkt))) != NULL);
|
||||||
|
blr_master_close(router);
|
||||||
|
blr_master_delayed_connect(router);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||||
}
|
}
|
||||||
@ -833,7 +890,20 @@ static REP_HEADER phdr;
|
|||||||
if (hdr.event_type == ROTATE_EVENT)
|
if (hdr.event_type == ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
router->rotating = 1;
|
router->rotating = 1;
|
||||||
blr_rotate_event(router, ptr, &hdr);
|
if (!blr_rotate_event(router, ptr, &hdr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Failed to write to the
|
||||||
|
* binlog file, destroy the
|
||||||
|
* buffer chain and close the
|
||||||
|
* connection with the master
|
||||||
|
*/
|
||||||
|
while ((pkt = gwbuf_consume(pkt,
|
||||||
|
GWBUF_LENGTH(pkt))) != NULL);
|
||||||
|
blr_master_close(router);
|
||||||
|
blr_master_delayed_connect(router);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -933,7 +1003,7 @@ register uint32_t rval = 0, shift = 0;
|
|||||||
* @param ptr The packet containing the rotate event
|
* @param ptr The packet containing the rotate event
|
||||||
* @param hdr The replication message header
|
* @param hdr The replication message header
|
||||||
*/
|
*/
|
||||||
static void
|
static int
|
||||||
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
|
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
|
||||||
{
|
{
|
||||||
int len, slen;
|
int len, slen;
|
||||||
@ -963,9 +1033,14 @@ char file[BINLOG_FNAMELEN+1];
|
|||||||
if (strncmp(router->binlog_name, file, slen) != 0)
|
if (strncmp(router->binlog_name, file, slen) != 0)
|
||||||
{
|
{
|
||||||
router->stats.n_rotates++;
|
router->stats.n_rotates++;
|
||||||
blr_file_rotate(router, file, pos);
|
if (blr_file_rotate(router, file, pos) == 0)
|
||||||
|
{
|
||||||
|
router->rotating = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
router->rotating = 0;
|
router->rotating = 0;
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user