Large events are now processed in chuncks

The router->last_written is used to store the position where the last event was
written. The replication header is also stored in a separate structure in
the router which is used later when the last packet of a multi-packet event
arrives.
This commit is contained in:
Markus Makela
2016-02-08 17:19:55 +02:00
parent d3e1d4dd2f
commit ae33df3cbc
5 changed files with 115 additions and 77 deletions

View File

@ -235,7 +235,8 @@ blr_file_create(ROUTER_INSTANCE *router, char *file)
router->current_pos = BINLOG_MAGIC_SIZE; /* Initial position after the magic number */
router->binlog_position = BINLOG_MAGIC_SIZE;
router->current_safe_event = BINLOG_MAGIC_SIZE;
router->last_written = 0;
router->last_written = BINLOG_MAGIC_SIZE;
router->last_event_pos = 0;
spinlock_release(&router->binlog_lock);
created = 1;
@ -296,7 +297,8 @@ int fd;
router->current_pos = BINLOG_MAGIC_SIZE;
router->binlog_position = BINLOG_MAGIC_SIZE;
router->current_safe_event = BINLOG_MAGIC_SIZE;
router->last_written = 0;
router->last_written = BINLOG_MAGIC_SIZE;
router->last_event_pos = 0;
} else {
MXS_ERROR("%s: Could not write magic to binlog file.", router->service->name);
}
@ -323,26 +325,27 @@ int fd;
* @return Return the number of bytes written
*/
int
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint32_t size, uint8_t *buf)
{
int n;
if ((n = pwrite(router->binlog_fd, buf, hdr->event_size,
hdr->next_pos - hdr->event_size)) != hdr->event_size)
if ((n = pwrite(router->binlog_fd, buf, size,
router->last_written)) != size)
{
char err_msg[STRERROR_BUFLEN];
MXS_ERROR("%s: Failed to write binlog record at %d of %s, %s. "
MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
"Truncating to previous record.",
router->service->name, hdr->next_pos - hdr->event_size,
router->service->name, router->last_written,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
/* Remove any partual event that was written */
ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size);
ftruncate(router->binlog_fd, router->last_written);
return 0;
}
spinlock_acquire(&router->binlog_lock);
router->current_pos = hdr->next_pos;
router->last_written = hdr->next_pos - hdr->event_size;
router->last_written =+ size;
router->last_event_pos = hdr->next_pos - hdr->event_size;
spinlock_release(&router->binlog_lock);
return n;
}