diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index d5f5c10ee..5d31c6bf6 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -39,8 +39,7 @@ static const char *statefile_section = "avro-conversion"; static const char *ddl_list_name = "table-ddl.list"; -void handle_query_event(Avro *router, REP_HEADER *hdr, - int *pending_transaction, uint8_t *ptr); +void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); bool is_create_table_statement(Avro *router, char* ptr, size_t len); void avro_notify_client(AvroSession *client); void avro_update_index(Avro* router); @@ -456,6 +455,67 @@ void do_checkpoint(Avro *router, uint64_t *total_rows, uint64_t *total_commits) router->row_count = router->trx_count = 0; } +bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_binlog_end_t* rc) +{ + uint8_t hdbuf[BINLOG_EVENT_HDR_LEN]; + int n = pread(router->binlog_fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos); + + /* Read the header information from the file */ + if (n != BINLOG_EVENT_HDR_LEN) + { + switch (n) + { + case 0: + break; + + case -1: + MXS_ERROR("Failed to read binlog file %s at position %llu (%s).", + router->binlog_name.c_str(), pos, + mxs_strerror(errno)); + break; + + default: + MXS_ERROR("Short read when reading the header. " + "Expected 19 bytes but got %d bytes. " + "Binlog file is %s, position %llu", + n, router->binlog_name.c_str(), pos); + break; + } + + router->current_pos = pos; + *rc = n == 0 ? AVRO_OK : AVRO_BINLOG_ERROR; + return false; + } + + /* fill replication header struct */ + hdr->timestamp = EXTRACT32(hdbuf); + hdr->event_type = hdbuf[4]; + hdr->serverid = EXTRACT32(&hdbuf[5]); + hdr->event_size = extract_field(&hdbuf[9], 32); + hdr->next_pos = EXTRACT32(&hdbuf[13]); + hdr->flags = EXTRACT16(&hdbuf[17]); + + bool rval = true; + + if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10) + { + MXS_ERROR("Invalid MariaDB 10 event type 0x%x. Binlog file is %s, position %llu", + hdr->event_type, router->binlog_name.c_str(), pos); + router->current_pos = pos; + *rc = AVRO_BINLOG_ERROR; + rval = false; + } + else if (hdr->event_size <= 0) + { + MXS_ERROR("Event size error: size %d at %llu.", hdr->event_size, pos); + router->current_pos = pos; + *rc = AVRO_BINLOG_ERROR; + rval = false; + } + + return rval; +} + /** * @brief Read all replication events from a binlog file. * @@ -469,105 +529,36 @@ void do_checkpoint(Avro *router, uint64_t *total_rows, uint64_t *total_commits) */ avro_binlog_end_t avro_read_all_events(Avro *router) { - uint8_t hdbuf[BINLOG_EVENT_HDR_LEN]; unsigned long long pos = router->current_pos; char next_binlog[BINLOG_FNAMELEN + 1]; - REP_HEADER hdr; - int pending_transaction = 0; - uint8_t *ptr; uint64_t total_commits = 0, total_rows = 0; bool found_chksum = false; bool rotate_seen = false; bool stop_seen = false; - if (router->binlog_fd == -1) - { - MXS_ERROR("Current binlog file %s is not open", router->binlog_name.c_str()); - return AVRO_BINLOG_ERROR; - } + ss_dassert(router->binlog_fd != -1); while (!router->service->svc_do_shutdown) { - int n; - /* Read the header information from the file */ - if ((n = pread(router->binlog_fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN) + avro_binlog_end_t rc; + REP_HEADER hdr; + + if (!read_header(router, pos, &hdr, &rc)) { - switch (n) - { - case 0: - break; - case -1: - { - MXS_ERROR("Failed to read binlog file %s at position %llu (%s).", - router->binlog_name.c_str(), pos, - mxs_strerror(errno)); - - if (errno == EBADF) - MXS_ERROR("Bad file descriptor in read binlog for file %s" - ", descriptor %d.", - router->binlog_name.c_str(), router->binlog_fd); - break; - } - default: - MXS_ERROR("Short read when reading the header. " - "Expected 19 bytes but got %d bytes. " - "Binlog file is %s, position %llu", - n, router->binlog_name.c_str(), pos); - break; - } - - router->current_pos = pos; - - /* any error */ - if (n != 0) - { - return AVRO_BINLOG_ERROR; - } - else + if (rc == AVRO_OK) { do_checkpoint(router, &total_rows, &total_commits); - MXS_INFO("Processed %lu transactions and %lu row events.", - total_commits, total_rows); if (rotate_seen) { rotate_to_file(router, pos, next_binlog); - return AVRO_OK; } else { - return rotate_to_next_file_if_exists(router, pos, stop_seen); + rc = rotate_to_next_file_if_exists(router, pos, stop_seen); } } - } - - /* fill replication header struct */ - hdr.timestamp = EXTRACT32(hdbuf); - hdr.event_type = hdbuf[4]; - hdr.serverid = EXTRACT32(&hdbuf[5]); - hdr.event_size = extract_field(&hdbuf[9], 32); - hdr.next_pos = EXTRACT32(&hdbuf[13]); - hdr.flags = EXTRACT16(&hdbuf[17]); - - /* Check event type against MAX_EVENT_TYPE */ - - if (hdr.event_type > MAX_EVENT_TYPE_MARIADB10) - { - MXS_ERROR("Invalid MariaDB 10 event type 0x%x. " - "Binlog file is %s, position %llu", - hdr.event_type, router->binlog_name.c_str(), pos); - router->current_pos = pos; - return AVRO_BINLOG_ERROR; - } - - if (hdr.event_size <= 0) - { - MXS_ERROR("Event size error: " - "size %d at %llu.", - hdr.event_size, pos); - - router->current_pos = pos; - return AVRO_BINLOG_ERROR; + return rc; } GWBUF *result = read_event_data(router, &hdr, pos); @@ -579,9 +570,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router) } /* get event content */ - ptr = GWBUF_DATA(result); - - MXS_DEBUG("%s(%x) - %llu", binlog_event_name(hdr.event_type), hdr.event_type, pos); + uint8_t* ptr = GWBUF_DATA(result); uint32_t original_size = hdr.event_size; @@ -653,21 +642,13 @@ avro_binlog_end_t avro_read_all_events(Avro *router) { uint64_t n_sequence; /* 8 bytes */ uint32_t domainid; /* 4 bytes */ - unsigned int flags; /* 1 byte */ n_sequence = extract_field(ptr, 64); domainid = extract_field(ptr + 8, 32); - flags = *(ptr + 8 + 4); router->gtid.domain = domainid; router->gtid.server_id = hdr.serverid; router->gtid.seq = n_sequence; router->gtid.event_num = 0; router->gtid.timestamp = hdr.timestamp; - - /* GTID event flags check, for 10.0 and 10.1 */ - if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) - { - pending_transaction = 1; - } } /** * Check QUERY_EVENT @@ -677,19 +658,11 @@ avro_binlog_end_t avro_read_all_events(Avro *router) */ else if (hdr.event_type == QUERY_EVENT) { - int trx_before = pending_transaction; - handle_query_event(router, &hdr, &pending_transaction, ptr); - - if (trx_before != pending_transaction) - { - /** A non-transactional engine finished a transaction */ - router->trx_count++; - } + handle_query_event(router, &hdr, ptr); } else if (hdr.event_type == XID_EVENT) { router->trx_count++; - pending_transaction = 0; if (router->row_count >= router->row_target || router->trx_count >= router->trx_target) @@ -978,7 +951,7 @@ static void strip_executable_comments(char *sql, int* len) * @param pending_transaction Pointer where status of pending transaction is stored * @param ptr Pointer to the start of the event payload */ -void handle_query_event(Avro *router, REP_HEADER *hdr, int *pending_transaction, uint8_t *ptr) +void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) { int dblen = ptr[DBNM_OFF]; int vblklen = ptr[VBLK_OFF]; @@ -1060,16 +1033,10 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, int *pending_transaction, MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident); } } - /* A transaction starts with this event */ - else if (strncmp(sql, "BEGIN", 5) == 0) - { - *pending_transaction = 1; - } /* Commit received for non transactional tables, i.e. MyISAM */ else if (strncmp(sql, "COMMIT", 6) == 0) { - // TODO: Handle COMMIT - *pending_transaction = 0; + router->trx_count++; } MXS_FREE(tmp);