MXS-1881: Read event header in a separate function

Reading the binlog event header in a separate function makes it easier to
comprehend. Cleaned up some unused variables and code that would is not
used.
This commit is contained in:
Markus Mäkelä
2018-05-23 18:36:54 +03:00
parent 5a3bbf0d15
commit be098538e5

View File

@ -39,8 +39,7 @@
static const char *statefile_section = "avro-conversion";
static const char *ddl_list_name = "table-ddl.list";
void handle_query_event(Avro *router, REP_HEADER *hdr,
int *pending_transaction, uint8_t *ptr);
void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
bool is_create_table_statement(Avro *router, char* ptr, size_t len);
void avro_notify_client(AvroSession *client);
void avro_update_index(Avro* router);
@ -456,6 +455,67 @@ void do_checkpoint(Avro *router, uint64_t *total_rows, uint64_t *total_commits)
router->row_count = router->trx_count = 0;
}
bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_binlog_end_t* rc)
{
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
int n = pread(router->binlog_fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos);
/* Read the header information from the file */
if (n != BINLOG_EVENT_HDR_LEN)
{
switch (n)
{
case 0:
break;
case -1:
MXS_ERROR("Failed to read binlog file %s at position %llu (%s).",
router->binlog_name.c_str(), pos,
mxs_strerror(errno));
break;
default:
MXS_ERROR("Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %llu",
n, router->binlog_name.c_str(), pos);
break;
}
router->current_pos = pos;
*rc = n == 0 ? AVRO_OK : AVRO_BINLOG_ERROR;
return false;
}
/* fill replication header struct */
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
bool rval = true;
if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10)
{
MXS_ERROR("Invalid MariaDB 10 event type 0x%x. Binlog file is %s, position %llu",
hdr->event_type, router->binlog_name.c_str(), pos);
router->current_pos = pos;
*rc = AVRO_BINLOG_ERROR;
rval = false;
}
else if (hdr->event_size <= 0)
{
MXS_ERROR("Event size error: size %d at %llu.", hdr->event_size, pos);
router->current_pos = pos;
*rc = AVRO_BINLOG_ERROR;
rval = false;
}
return rval;
}
/**
* @brief Read all replication events from a binlog file.
*
@ -469,105 +529,36 @@ void do_checkpoint(Avro *router, uint64_t *total_rows, uint64_t *total_commits)
*/
avro_binlog_end_t avro_read_all_events(Avro *router)
{
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
unsigned long long pos = router->current_pos;
char next_binlog[BINLOG_FNAMELEN + 1];
REP_HEADER hdr;
int pending_transaction = 0;
uint8_t *ptr;
uint64_t total_commits = 0, total_rows = 0;
bool found_chksum = false;
bool rotate_seen = false;
bool stop_seen = false;
if (router->binlog_fd == -1)
{
MXS_ERROR("Current binlog file %s is not open", router->binlog_name.c_str());
return AVRO_BINLOG_ERROR;
}
ss_dassert(router->binlog_fd != -1);
while (!router->service->svc_do_shutdown)
{
int n;
/* Read the header information from the file */
if ((n = pread(router->binlog_fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN)
avro_binlog_end_t rc;
REP_HEADER hdr;
if (!read_header(router, pos, &hdr, &rc))
{
switch (n)
{
case 0:
break;
case -1:
{
MXS_ERROR("Failed to read binlog file %s at position %llu (%s).",
router->binlog_name.c_str(), pos,
mxs_strerror(errno));
if (errno == EBADF)
MXS_ERROR("Bad file descriptor in read binlog for file %s"
", descriptor %d.",
router->binlog_name.c_str(), router->binlog_fd);
break;
}
default:
MXS_ERROR("Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %llu",
n, router->binlog_name.c_str(), pos);
break;
}
router->current_pos = pos;
/* any error */
if (n != 0)
{
return AVRO_BINLOG_ERROR;
}
else
if (rc == AVRO_OK)
{
do_checkpoint(router, &total_rows, &total_commits);
MXS_INFO("Processed %lu transactions and %lu row events.",
total_commits, total_rows);
if (rotate_seen)
{
rotate_to_file(router, pos, next_binlog);
return AVRO_OK;
}
else
{
return rotate_to_next_file_if_exists(router, pos, stop_seen);
rc = rotate_to_next_file_if_exists(router, pos, stop_seen);
}
}
}
/* fill replication header struct */
hdr.timestamp = EXTRACT32(hdbuf);
hdr.event_type = hdbuf[4];
hdr.serverid = EXTRACT32(&hdbuf[5]);
hdr.event_size = extract_field(&hdbuf[9], 32);
hdr.next_pos = EXTRACT32(&hdbuf[13]);
hdr.flags = EXTRACT16(&hdbuf[17]);
/* Check event type against MAX_EVENT_TYPE */
if (hdr.event_type > MAX_EVENT_TYPE_MARIADB10)
{
MXS_ERROR("Invalid MariaDB 10 event type 0x%x. "
"Binlog file is %s, position %llu",
hdr.event_type, router->binlog_name.c_str(), pos);
router->current_pos = pos;
return AVRO_BINLOG_ERROR;
}
if (hdr.event_size <= 0)
{
MXS_ERROR("Event size error: "
"size %d at %llu.",
hdr.event_size, pos);
router->current_pos = pos;
return AVRO_BINLOG_ERROR;
return rc;
}
GWBUF *result = read_event_data(router, &hdr, pos);
@ -579,9 +570,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
}
/* get event content */
ptr = GWBUF_DATA(result);
MXS_DEBUG("%s(%x) - %llu", binlog_event_name(hdr.event_type), hdr.event_type, pos);
uint8_t* ptr = GWBUF_DATA(result);
uint32_t original_size = hdr.event_size;
@ -653,21 +642,13 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
{
uint64_t n_sequence; /* 8 bytes */
uint32_t domainid; /* 4 bytes */
unsigned int flags; /* 1 byte */
n_sequence = extract_field(ptr, 64);
domainid = extract_field(ptr + 8, 32);
flags = *(ptr + 8 + 4);
router->gtid.domain = domainid;
router->gtid.server_id = hdr.serverid;
router->gtid.seq = n_sequence;
router->gtid.event_num = 0;
router->gtid.timestamp = hdr.timestamp;
/* GTID event flags check, for 10.0 and 10.1 */
if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
{
pending_transaction = 1;
}
}
/**
* Check QUERY_EVENT
@ -677,19 +658,11 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
*/
else if (hdr.event_type == QUERY_EVENT)
{
int trx_before = pending_transaction;
handle_query_event(router, &hdr, &pending_transaction, ptr);
if (trx_before != pending_transaction)
{
/** A non-transactional engine finished a transaction */
router->trx_count++;
}
handle_query_event(router, &hdr, ptr);
}
else if (hdr.event_type == XID_EVENT)
{
router->trx_count++;
pending_transaction = 0;
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
@ -978,7 +951,7 @@ static void strip_executable_comments(char *sql, int* len)
* @param pending_transaction Pointer where status of pending transaction is stored
* @param ptr Pointer to the start of the event payload
*/
void handle_query_event(Avro *router, REP_HEADER *hdr, int *pending_transaction, uint8_t *ptr)
void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
{
int dblen = ptr[DBNM_OFF];
int vblklen = ptr[VBLK_OFF];
@ -1060,16 +1033,10 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, int *pending_transaction,
MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident);
}
}
/* A transaction starts with this event */
else if (strncmp(sql, "BEGIN", 5) == 0)
{
*pending_transaction = 1;
}
/* Commit received for non transactional tables, i.e. MyISAM */
else if (strncmp(sql, "COMMIT", 6) == 0)
{
// TODO: Handle COMMIT
*pending_transaction = 0;
router->trx_count++;
}
MXS_FREE(tmp);