diff --git a/server/modules/routing/binlogrouter/blr_file.c b/server/modules/routing/binlogrouter/blr_file.c index e7b326f9c..fae1a3bf0 100644 --- a/server/modules/routing/binlogrouter/blr_file.c +++ b/server/modules/routing/binlogrouter/blr_file.c @@ -2099,35 +2099,44 @@ blr_read_events_all_events(ROUTER_INSTANCE *router, { if (hdr.event_type == MARIADB10_GTID_GTID_LIST_EVENT) { - uint32_t n_gtids; /* The lower 28 bits are the number of GTIDs */ - uint32_t domainid; /* 4 bytes */ - uint32_t serverid; /* 4 bytes */ - uint64_t n_sequence;/* 8 bytes */ - uint8_t flags; /* 1 byte, 4 bits */ - char mariadb_gtid[GTID_MAX_LEN + 1]; - + unsigned long n_gtids; n_gtids = extract_field(ptr, 32); + /* The lower 28 bits are the number of GTIDs */ n_gtids &= 0x01111111; - domainid = extract_field(ptr + 4, 32); - serverid = extract_field(ptr + 4 + 4, 32); - n_sequence = extract_field(ptr + 4 + 4 + 4, 64); - - snprintf(mariadb_gtid, - GTID_MAX_LEN, - "%u-%u-%lu", - domainid, - serverid, - n_sequence); - - MXS_DEBUG("GTID List has %lu GTIDs, first is %s", - (unsigned long)n_gtids, - mariadb_gtid); - - /* Set MariaDB GTID */ - if (router->mariadb10_gtid) + if (n_gtids) { - strcpy(router->last_mariadb_gtid, mariadb_gtid); + ptr += 4; + uint32_t domainid; /* 4 bytes */ + domainid = extract_field(ptr, 32); + ptr += 4; + + uint32_t serverid; /* 4 bytes */ + serverid = extract_field(ptr, 32); + ptr += 4; + + uint64_t n_sequence;/* 8 bytes */ + n_sequence = extract_field(ptr, 64); + ptr += 4; + + char mariadb_gtid[GTID_MAX_LEN + 1]; + + snprintf(mariadb_gtid, + GTID_MAX_LEN, + "%u-%u-%lu", + domainid, + serverid, + n_sequence); + + MXS_DEBUG("GTID List has %lu GTIDs, first is %s", + n_gtids, + mariadb_gtid); + + /* Set MariaDB GTID */ + if (router->mariadb10_gtid) + { + strcpy(router->last_mariadb_gtid, mariadb_gtid); + } } } } @@ -3460,3 +3469,52 @@ bool blr_fetch_mariadb_gtid(ROUTER_SLAVE *slave, return result->gtid ? true : false; } + +/** + * Get the next binlog file sequence number + * + * @param filename The current filename + * @return 0 on error, >0 as sequence number + */ +unsigned int +blr_file_get_next_seqno(const char *filename) +{ + char *sptr; + int filenum; + + if ((sptr = strrchr(filename, '.')) == NULL) + { + return 0; + } + filenum = atoi(sptr + 1); + if (filenum) + { + filenum++; + } + + return filenum; +} + +/** + * Return the binlog file size. + * + * @param filename The current filename + * @return 0 on error, >0 size + */ +uint32_t blr_slave_get_file_size(const char *filename) +{ + struct stat statb; + + if (stat(filename, &statb) == 0) + { + return statb.st_size; + } + else + { + MXS_ERROR("Failed to get %s file size: %d %s", + filename, + errno, + mxs_strerror(errno)); + return 0; + } +} diff --git a/server/modules/routing/binlogrouter/blr_master.c b/server/modules/routing/binlogrouter/blr_master.c index 567076f1c..75ae6a1c7 100644 --- a/server/modules/routing/binlogrouter/blr_master.c +++ b/server/modules/routing/binlogrouter/blr_master.c @@ -76,6 +76,7 @@ #include #include #include +#include static GWBUF *blr_make_query(DCB *dcb, char *query); static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); @@ -128,7 +129,7 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router, GWBUF *in_buf); static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf); static void blr_register_mariadb_gtid_request(ROUTER_INSTANCE *router, - GWBUF *buf); + GWBUF *buf); static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); @@ -140,6 +141,9 @@ extern int blr_write_special_event(ROUTER_INSTANCE *router, uint32_t hole_size, REP_HEADER *hdr, int type); +extern int blr_file_new_binlog(ROUTER_INSTANCE *router, char *file); +static bool blr_handle_missing_files(ROUTER_INSTANCE *router, + char *new_file); static void worker_cb_start_master(int worker_id, void* data); static void blr_start_master_in_main(void* data); @@ -758,13 +762,12 @@ static void reset_errors(ROUTER_INSTANCE *router, REP_HEADER *hdr) spinlock_release(&router->lock); #ifdef SHOW_EVENTS - printf("blr: len %lu, event type 0x%02x, flags 0x%04x, " - "event size %d, event timestamp %lu\n", - (unsigned long)len - MYSQL_HEADER_LEN, + printf("blr: event type 0x%02x, flags 0x%04x, " + "event size %d, event timestamp %" PRIu32 "\n", hdr->event_type, hdr->flags, hdr->event_size, - (unsigned long)hdr->timestamp); + hdr->timestamp); #endif } @@ -3043,8 +3046,9 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf) * registration command */ static void blr_register_mariadb_gtid_request(ROUTER_INSTANCE *router, - GWBUF *buf) + GWBUF *buf) { + const char format_gtid_val[] = "SET @slave_connect_state='%s'"; // Extract GTID domain char *val = blr_extract_column(buf, 1); // Store the Master GTID domain @@ -3054,9 +3058,9 @@ static void blr_register_mariadb_gtid_request(ROUTER_INSTANCE *router, gwbuf_free(buf); // SET the requested GTID - char set_gtid[GTID_MAX_LEN + 33 + 1]; + char set_gtid[GTID_MAX_LEN + sizeof(format_gtid_val)]; sprintf(set_gtid, - "SET @slave_connect_state='%s'", + format_gtid_val, router->last_mariadb_gtid); MXS_INFO("%s: Requesting GTID (%s) from master server.", @@ -3104,13 +3108,18 @@ static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router, pos <<= 32; pos |= extract_field(ptr + BINLOG_EVENT_HDR_LEN, 32); - MXS_INFO("Fake ROTATE_EVENT received: file %s, pos %lu. Next event at pos %lu\n", - file, - (unsigned long)pos, - (unsigned long)hdr->next_pos); + MXS_DEBUG("Fake ROTATE_EVENT received: file %s, pos %" PRIu64 + ". Next event at pos %" PRIu32, + file, + pos, + hdr->next_pos); /** - * TODO: Detect any missing file in sequence. + * Detect any missing file in sequence. */ + if (!blr_handle_missing_files(router, file)) + { + return false; + } spinlock_acquire(&router->binlog_lock); @@ -3129,6 +3138,7 @@ static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router, router->current_pos = BINLOG_MAGIC_SIZE; router->binlog_position = BINLOG_MAGIC_SIZE; router->last_event_pos = BINLOG_MAGIC_SIZE; + router->current_safe_event = BINLOG_MAGIC_SIZE; } router->rotating = 1; @@ -3161,10 +3171,11 @@ static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router, { uint64_t binlog_file_eof = lseek(router->binlog_fd, 0L, SEEK_END); - MXS_INFO("Fake GTID_LIST received: file %s, pos %lu. Next event at pos %lu\n", + MXS_INFO("Fake GTID_LIST received: file %s, pos %" PRIu64 + ". Next event at pos %" PRIu32, router->binlog_name, - (unsigned long)router->current_pos, - (unsigned long)hdr->next_pos); + router->current_pos, + hdr->next_pos); /** * We could write in any binlog file position: @@ -3177,8 +3188,9 @@ static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router, uint64_t hole_size = hdr->next_pos - binlog_file_eof; MXS_INFO("Detected hole while processing" - " a Fake GTID_LIST Event: hole size will be %lu bytes", - (unsigned long)hole_size); + " a Fake GTID_LIST Event: hole size will be %" + PRIu64 " bytes", + hole_size); /* Set the offet for the write routine */ spinlock_acquire(&router->binlog_lock); @@ -3208,3 +3220,72 @@ static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router, } } } + +/** + * Detect any missing file in sequence between + * current router->binlog_name and new_file + * in fake ROTATE_EVENT. + * + * In case of missing files, new files with 4 bytes + * will be created up to new_file. + * + * @param router The current router + * @param new_file The filename in Fake ROTATE_EVENT + * @return true on success, false on errors + */ +static bool blr_handle_missing_files(ROUTER_INSTANCE *router, + char *new_file) +{ + char *fptr; + uint32_t new_fseqno; + uint32_t curr_fseqno; + char buf[BLRM_BINLOG_NAME_STR_LEN]; + char bigbuf[PATH_MAX + 1]; + + if ((fptr = strrchr(new_file, '.')) == NULL) + { + return false; + } + new_fseqno = atol(fptr + 1); + if ((fptr = strrchr(router->binlog_name, '.')) == NULL) + { + return false; + } + curr_fseqno = atol(fptr + 1); + int32_t delta_seq = new_fseqno - (curr_fseqno + 1); + + /** + * Try creating delta_seq empty binlog files + */ + if (delta_seq > 0) + { + MXS_INFO("Fake ROTATE_EVENT comes with a %" PRIu32 + " delta sequence in its name." + " Creating %" PRIi32 " empty files", + delta_seq, + delta_seq); + + // Create up to (delta_seq - 1) empty (with 4 bytes) binlog files + for (int i = 1; i <= delta_seq; i++) + { + sprintf(buf, BINLOG_NAMEFMT, router->fileroot, curr_fseqno + i); + if (!blr_file_new_binlog(router, buf)) + { + return false; + } + else + { + MXS_INFO("Created empty binlog file [%d] '%s'" + " due to Fake ROTATE_EVENT file sequence delta.", + i, + buf); + } + } + + // Some files created, return true + return true; + } + + // Did nothing, just return true + return true; +} diff --git a/server/modules/routing/binlogrouter/blr_slave.c b/server/modules/routing/binlogrouter/blr_slave.c index de6ed03a0..7d76e21aa 100644 --- a/server/modules/routing/binlogrouter/blr_slave.c +++ b/server/modules/routing/binlogrouter/blr_slave.c @@ -89,6 +89,7 @@ #include #include +extern void poll_fake_write_event(DCB *dcb); static char* get_next_token(char *str, const char* delim, char **saveptr); extern int load_mysql_users(SERV_LISTENER *listener); extern void blr_master_close(ROUTER_INSTANCE* router); @@ -287,8 +288,14 @@ static bool blr_handle_admin_stmt(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *admin_stmt, char *admin_options); +extern unsigned int blr_file_get_next_seqno(const char *filename); +extern uint32_t blr_slave_get_file_size(const char *filename); +static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router, + ROUTER_SLAVE *slave); -void poll_fake_write_event(DCB *dcb); +static inline void blr_get_file_fullpath(const char *binlog_file, + const char *root_dir, + char *full_path); /** * Process a request packet from the slave server. @@ -309,6 +316,7 @@ void poll_fake_write_event(DCB *dcb); int blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) { + int rv = 0; if (slave->state < 0 || slave->state > BLRS_MAXSTATE) { MXS_ERROR("Invalid slave state machine state (%d) for binlog router.", @@ -322,96 +330,109 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) { case COM_QUERY: slave->stats.n_queries++; - return blr_slave_query(router, slave, queue); + rv = blr_slave_query(router, slave, queue); + break; case COM_REGISTER_SLAVE: if (router->master_state == BLRM_UNCONFIGURED) { + char *err_msg = "Binlog router is not yet configured" + " for replication."; slave->state = BLRS_ERRORED; blr_slave_send_error_packet(slave, - "Binlog router is not yet configured for replication", - (unsigned int) 1597, NULL); + err_msg, + 1597, + NULL); - MXS_ERROR("%s: Slave %s: Binlog router is not yet configured for replication", + MXS_ERROR("%s: Slave %s: %s", router->service->name, - slave->dcb->remote); + slave->dcb->remote, + err_msg); dcb_close(slave->dcb); - return 1; + rv = 1; } - - /* - * If Master is MariaDB10 don't allow registration from - * MariaDB/Mysql 5 Slaves - */ - if (router->mariadb10_compat && !slave->mariadb10_compat) + else if (router->mariadb10_compat && !slave->mariadb10_compat) { + char *err_msg = "MariaDB 10 Slave is required" + " for Slave registration."; + /** + * If Master is MariaDB10 don't allow registration from + * MariaDB/Mysql 5 Slaves + */ slave->state = BLRS_ERRORED; /* Send error that stops slave replication */ blr_send_custom_error(slave->dcb, ++slave->seqno, 0, - "MariaDB 10 Slave is required for Slave registration", + err_msg, "42000", 1064); - MXS_ERROR("%s: Slave %s: a MariaDB 10 Slave is required for Slave registration", + MXS_ERROR("%s: Slave %s: %s", router->service->name, - slave->dcb->remote); + slave->dcb->remote, + err_msg); dcb_close(slave->dcb); - return 1; + rv = 1; } else if (router->mariadb10_master_gtid && !slave->mariadb_gtid) { + /** + * If GTID master replication is set + * only GTID slaves can continue the registration. + */ + char *err_msg = "MariaDB 10 Slave GTID is required" + " for Slave registration."; slave->state = BLRS_ERRORED; /* Send error that stops slave replication */ blr_send_custom_error(slave->dcb, ++slave->seqno, 0, - "MariaDB 10 Slave GTID is required for Slave registration.", + err_msg, "HY000", - //BINLOG_FATAL_ERROR_READING); 1597); - MXS_ERROR("%s: Slave %s: a MariaDB 10 Slave GTID request" - " is needed for Slave registration." + + MXS_ERROR("%s: Slave %s: %s" " Please use: CHANGE MASTER TO master_use_gtid=slave_pos.", router->service->name, - slave->dcb->remote); + slave->dcb->remote, + err_msg); dcb_close(slave->dcb); - return 1; + rv = 1; } else { /* Master and Slave version OK: continue with slave registration */ - return blr_slave_register(router, slave, queue); + rv = blr_slave_register(router, slave, queue); } + break; case COM_BINLOG_DUMP: + rv = blr_slave_binlog_dump(router, slave, queue); + + if (rv && router->send_slave_heartbeat && slave->heartbeat > 0) { char task_name[BLRM_TASK_NAME_LEN + 1] = ""; + snprintf(task_name, + BLRM_TASK_NAME_LEN, + "%s slaves heartbeat send", + router->service->name); - int rc = blr_slave_binlog_dump(router, slave, queue); - - if (router->send_slave_heartbeat && rc && slave->heartbeat > 0) - { - snprintf(task_name, - BLRM_TASK_NAME_LEN, - "%s slaves heartbeat send", - router->service->name); - - /* Add slave heartbeat check task with 1 second frequency */ - hktask_add(task_name, blr_send_slave_heartbeat, router, 1); - } - - return rc; + /* Add slave heartbeat check task with 1 second frequency */ + hktask_add(task_name, blr_send_slave_heartbeat, router, 1); } + break; case COM_STATISTICS: - return blr_statistics(router, slave, queue); + rv = blr_statistics(router, slave, queue); + break; case COM_PING: - return blr_ping(router, slave, queue); + rv = blr_ping(router, slave, queue); + break; case COM_QUIT: MXS_DEBUG("COM_QUIT received from slave with server_id %d", slave->serverid); - return 1; + rv = 1; + break; default: blr_send_custom_error(slave->dcb, 1, 0, "You have an error in your SQL syntax; Check the " @@ -421,7 +442,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) MYSQL_COMMAND(queue)); break; } - return 0; + return rv; } /* @@ -1723,14 +1744,27 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue } } - MXS_DEBUG("%s: COM_BINLOG_DUMP: binlog name '%s', length %lu, " - "from position %lu.", router->service->name, - slave->binlogfile, strlen(slave->binlogfile), + MXS_DEBUG("%s: Slave %s:%i, COM_BINLOG_DUMP: binlog name '%s', length %lu, " + "from position %lu.", + router->service->name, + slave->dcb->remote, + dcb_get_port(slave->dcb), + slave->binlogfile, + strlen(slave->binlogfile), (unsigned long)slave->binlog_pos); /* First reply starts from seq = 1 */ slave->seqno = 1; + /** + * Check whether the request file is empty + * and try using next file in sequence. + * If one or more files have been skipped then + * the slave->binlog_pos is set to 4 and + * slave->binlogname set to new filename. + */ + blr_slave_skip_empty_files(router, slave); + /* Build and send Fake Rotate Event */ if (!blr_send_connect_fake_rotate(router, slave)) { @@ -1792,8 +1826,12 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue blr_slave_read_ste(router, slave, fde_end_pos); } - /* Add GTID_LIST Fake Event before sending any new event */ - if (slave->mariadb10_compat && + /** + * Add GTID_LIST Fake Event before sending any new event + * Note: slave->binlog_pos must not be 4 + */ + if (slave->binlog_pos != 4 && + slave->mariadb10_compat && slave->mariadb_gtid) { if (!blr_send_fake_gtid_list(slave, @@ -6193,7 +6231,7 @@ static GWBUF *blr_build_fake_rotate_event(ROUTER_SLAVE *slave, * * Default position is 4, default file is router->binlog_file. * - * If req_file is false then the file to read data from + * If req_file is false then the file to read data from * could be either router->binlog_file or the file the GTID * belongs to. * @@ -6958,7 +6996,7 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router, heading[strlen(heading) - 1] = '\0'; if (!heading[0]) { - MXS_ERROR("Cannot request empty GTID righ now"); + MXS_ERROR("Cannot request empty GTID right now"); blr_slave_send_error_packet(slave, "Empty GTID not implemented righ now", (unsigned int)1198, NULL); @@ -7417,3 +7455,81 @@ static bool blr_handle_admin_stmt(ROUTER_INSTANCE *router, return false; } + +/** + * Skip reading empty binlog files (4 bytes only) + * + * @param router Current router instance + * @param slave Current connected slave + */ +static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router, + ROUTER_SLAVE *slave) +{ + char binlog_file[BLRM_BINLOG_NAME_STR_LEN + 1]; + char router_curr_file[BLRM_BINLOG_NAME_STR_LEN + 1]; + char file_path[PATH_MAX + 1] = ""; + unsigned int seqno; + bool skipped_files = false; + + // Save the current router binlog filename + spinlock_acquire(&router->binlog_lock); + strcpy(router_curr_file, router->binlog_name); + spinlock_release(&router->binlog_lock); + + // Set the starting filename + strcpy(binlog_file, slave->binlogfile); + + // Get binlog filename full-path + blr_get_file_fullpath(binlog_file, + router->binlogdir, + file_path); + + /** + * Set the next file in sequence if current file has 4 bytes size. + * Stop if the new file is the urrent binlog file. + */ + while (strcmp(binlog_file, router_curr_file) != 0 && + blr_slave_get_file_size(file_path) == 4 && + (seqno = blr_file_get_next_seqno(binlog_file)) > 0) + { + // Log skipped file + MXS_INFO("Slave %s:%i, skip reading empty file '%s' (4 bytes size).", + slave->dcb->remote, + dcb_get_port(slave->dcb), + binlog_file); + + // Set next in sequence binlog file name + sprintf(binlog_file, BINLOG_NAMEFMT, router->fileroot, seqno); + + // Get binlog file full-path + blr_get_file_fullpath(binlog_file, + router->binlogdir, + file_path); + + skipped_files = true; + } + + // One or more files skipped: set last found filename and pos = 4 + if (skipped_files) + { + strcpy(slave->binlogfile, binlog_file); + slave->binlog_pos = 4; + } +} + +/** + * Get the full path of a binlog filename. + * + * @param binlog_file The binlog filename + * @param root_dir The binlog storage directory + * @param full_path The output fullpahth name: + * the memory area must be preallocated. + */ +static inline void blr_get_file_fullpath(const char *binlog_file, + const char *root_dir, + char *full_path) +{ + strcpy(full_path, root_dir); + strcat(full_path, "/"); + strcat(full_path, binlog_file); +}