diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 93b7adea1..aa09cbe3a 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -33,6 +33,7 @@ * 12/06/15 Massimiliano Pinto Added mariadb10 new events * 23/06/15 Massimiliano Pinto Addition of MASTER_SERVER_CFG struct * 24/06/15 Massimiliano Pinto Added BLRM_UNCONFIGURED state + * 05/08/15 Massimiliano Pinto Initial implementation of transaction safety * * @endverbatim */ @@ -357,9 +358,13 @@ typedef struct router_instance { MASTER_RESPONSES saved_master; /*< Saved master responses */ char *binlogdir; /*< The directory with the binlog files */ SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */ + int trx_safe; /*< Detect and handle partial transactions */ + int pending_transaction; /*< Pending transaction */ char binlog_name[BINLOG_FNAMELEN+1]; /*< Name of the current binlog file */ uint64_t binlog_position; + /*< last committed transaction position */ + uint64_t current_pos; /*< Current binlog position */ int binlog_fd; /*< File descriptor of the binlog * file being written diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 1a965d638..15c6dd106 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -268,6 +268,9 @@ int rc = 0; inst->m_errno = 0; inst->m_errmsg = NULL; + inst->trx_safe = 0; + inst->pending_transaction = 0; + my_uuid_init((ulong)rand()*12345,12345); if ((defuuid = (unsigned char *)malloc(20)) != NULL) { @@ -348,6 +351,10 @@ int rc = 0; { inst->initbinlog = atoi(value); } + else if (strcmp(options[i], "transaction_safety") == 0) + { + inst->trx_safe = atoi(value); + } else if (strcmp(options[i], "lowwater") == 0) { inst->low_water = atoi(value); @@ -414,6 +421,8 @@ int rc = 0; service->name))); } + fprintf(stderr, "Transaction safety is [%i]\n", inst->trx_safe); + if (inst->fileroot == NULL) inst->fileroot = strdup(BINLOG_NAME_ROOT); inst->active_logs = 0; @@ -426,6 +435,8 @@ int rc = 0; inst->lastEventTimestamp = 0; inst->binlog_position = 0; + inst->current_pos = 0; + strcpy(inst->binlog_name, ""); strcpy(inst->prevbinlog, ""); @@ -602,6 +613,18 @@ int rc = 0; * Now start the replication from the master to MaxScale */ if (inst->master_state == BLRM_UNCONNECTED) { + + /* NOTE: This setting will be replaced by calling + * blr_read_events_all() routine soon + * The routine may truncate binlog file or put + * master_state into BLR_SLAVE_STOPPED state. + * If an open transaction is detected @ pos xxx + * inst->binlog_position will be set to xxx + */ + if (inst->binlog_position == 0) + inst->binlog_position = inst->current_pos; + + /* Start replication from master server */ blr_start_master(inst); } diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index a627d788e..5e9a5d19b 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -31,6 +31,7 @@ * 23/06/2015 Massimiliano Pinto Addition of blr_file_use_binlog, blr_file_create_binlog * 29/06/2015 Massimiliano Pinto Addition of blr_file_write_master_config() * Cache directory is now 'cache' under router->binlogdir + * 05/08/2015 Massimiliano Pinto Initial implementation of transaction safety * * @endverbatim */ @@ -184,7 +185,8 @@ blr_file_add_magic(ROUTER_INSTANCE *router, int fd) unsigned char magic[] = BINLOG_MAGIC; write(fd, magic, 4); - router->binlog_position = 4; /* Initial position after the magic number */ + router->current_pos = 4; /* Initial position after the magic number */ + //router->binlog_position = 4; /* Initial position after the magic number */ } @@ -252,16 +254,20 @@ int fd; close(router->binlog_fd); spinlock_acquire(&router->binlog_lock); strncpy(router->binlog_name, file,BINLOG_FNAMELEN); - router->binlog_position = lseek(fd, 0L, SEEK_END); - if (router->binlog_position < 4) { - if (router->binlog_position == 0) { + //router->binlog_position = lseek(fd, 0L, SEEK_END); + router->current_pos = lseek(fd, 0L, SEEK_END); + //if (router->binlog_position < 4) { + if (router->current_pos < 4) { + //if (router->binlog_position == 0) { + if (router->current_pos == 0) { blr_file_add_magic(router, fd); } else { /* If for any reason the file's length is between 1 and 3 bytes * then report an error. */ LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: binlog file %s has an invalid length %d.", - router->service->name, path, router->binlog_position))); + //router->service->name, path, router->binlog_position))); + router->service->name, path, router->current_pos))); close(fd); spinlock_release(&router->binlog_lock); return; @@ -298,7 +304,8 @@ int n; return 0; } spinlock_acquire(&router->binlog_lock); - router->binlog_position = hdr->next_pos; + //router->binlog_position = hdr->next_pos; + router->current_pos = hdr->next_pos; router->last_written = hdr->next_pos - hdr->event_size; spinlock_release(&router->binlog_lock); return n; @@ -893,3 +900,421 @@ char tmp_file[PATH_MAX + 1] = ""; return 0; } + +/** + * Read all replication events from a binlog file in order to detect pending transactions + * + * @param router The router instance + * @param fix Whether to fix or not errors + * @param debug Whether to enable or not the debug for events + * @return 0 on success, >0 on failure + */ +int +blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug) { +unsigned long filelen = 0; +struct stat statb; +uint8_t hdbuf[19]; +uint8_t *data; +GWBUF *result; +unsigned long long pos = 4; +unsigned long long last_known_commit = 4; + +REP_HEADER hdr; +int pending_transaction = 0; +int n; +int db_name_len; +char *statement_sql; +uint8_t *ptr; +int len; +int var_block_len; +int statement_len; +int checksum_len=0; +int found_chksum = 0; + + if (router->binlog_fd == -1) { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Current binlog file %s is not open", + router->binlog_name))); + return 1; + } + + if (fstat(router->binlog_fd, &statb) == 0) + filelen = statb.st_size; + + while (1){ + //fprintf(stderr, "Pos %llu pending trx = %i\n", pos, pending_transaction); + + /* Read the header information from the file */ + if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19) { + switch (n) + { + case 0: + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + "End of binlog file [%s] at %llu.", + router->binlog_name, + pos))); + + if (pending_transaction) { + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "WARNING: Binlog file %s contains a previous Opened Transaction, " + "Not Truncate file @ %llu but pos for slave is safe", + router->binlog_name, + last_known_commit))); + + //ftruncate(router->binlog_fd, last_known_commit); + } + + break; + case -1: + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Failed to read binlog file %s at position %llu" + " (%s).", router->binlog_name, + pos, strerror(errno)))); + if (errno == EBADF) + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Bad file descriptor in read binlog for file %s" + ", descriptor %d.", + router->binlog_name, router->binlog_fd))); + break; + default: + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Short read when reading the header. " + "Expected 19 bytes but got %d bytes. " + "Binlog file is %s, position %llu", + n, router->binlog_name, pos))); + break; + } + + /* force last_known_commit position */ + if (pending_transaction) { + router->binlog_position = last_known_commit; + router->pending_transaction = 1; + pending_transaction = 0; + } else { + router->binlog_position = pos; + } + + /* Truncate file in case of any error */ + if (n != 0) { + ftruncate(router->binlog_fd, pos); + + router->binlog_position = pos; + return 1; + } else { + return 0; + } + } + + /* fill replication header struct */ + hdr.timestamp = EXTRACT32(hdbuf); + hdr.event_type = hdbuf[4]; + hdr.serverid = EXTRACT32(&hdbuf[5]); + hdr.event_size = extract_field(&hdbuf[9], 32); + hdr.next_pos = EXTRACT32(&hdbuf[13]); + hdr.flags = EXTRACT16(&hdbuf[17]); + + //fprintf(stderr, ">>>> pos [%llu] event type %i\n", pos, hdr.event_type); + //fprintf(stderr, ">>>> pos [%llu] event size %lu\n", pos, hdr.event_size); + //fprintf(stderr, ">>>> pos [%llu] event next_pos %lu\n", pos, hdr.next_pos); + + /* TO DO */ + + /* Add MariaDB 10 check */ + + /* Check event type against MAX_EVENT_TYPE */ + if (hdr.event_type > MAX_EVENT_TYPE) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Found an Invalid event type 0x%x. " + "Binlog file is %s, position %llu", + hdr.event_type, + router->binlog_name, pos))); + + ftruncate(router->binlog_fd, pos); + + router->binlog_position = pos; + + return 1; + } + + /* Allocate a GWBUF for the event */ + if ((result = gwbuf_alloc(hdr.event_size)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Failed to allocate memory for binlog entry, " + "size %d at %llu.", + hdr.event_size, pos))); + + ftruncate(router->binlog_fd, pos); + + router->binlog_position = pos; + + return 1; + } + + /* Copy the header in the buffer */ + data = GWBUF_DATA(result); + memcpy(data, hdbuf, 19);// Copy the header in + + /* Read event data */ + if ((n = pread(router->binlog_fd, &data[19], hdr.event_size - 19, pos + 19)) != hdr.event_size - 19) + { + if (n == -1) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Error reading the event at %llu in %s. " + "%s, expected %d bytes.", + pos, router->binlog_name, + strerror(errno), hdr.event_size - 19))); + } + else + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Short read when reading the event at %llu in %s. " + "Expected %d bytes got %d bytes.", + pos, router->binlog_name, hdr.event_size - 19, n))); + + if (filelen > 0 && filelen - pos < hdr.event_size) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Binlog event is close to the end of the binlog file %s, " + " size is %lu.", + router->binlog_name, filelen))); + } + } + + gwbuf_free(result); + + ftruncate(router->binlog_fd, pos); + + router->binlog_position = pos; + + return 1; + } + + /* check for pending transaction */ + if (pending_transaction == 0) { + last_known_commit = pos; + } + + /* get event content */ + ptr = data+19; + + /* check for FORMAT DESCRIPTION EVENT */ + if(hdr.event_type == FORMAT_DESCRIPTION_EVENT) { + int event_header_length; + int event_header_ntypes; + int n_events; + int check_alg; + uint8_t *checksum; + + if(debug) + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + "- Format Description event FDE @ %llu, size %lu", + pos, hdr.event_size))); + + event_header_length = ptr[2 + 50 + 4]; + event_header_ntypes = hdr.event_size - event_header_length - (2 + 50 + 4 + 1); + + if (event_header_ntypes == 168) { + /* mariadb 10 LOG_EVENT_TYPES*/ + event_header_ntypes -= 163; + } else { + if (event_header_ntypes == 165) { + /* mariadb 5 LOG_EVENT_TYPES*/ + event_header_ntypes -= 160; + } else { + /* mysql 5.6 LOG_EVENT_TYPES = 35 */ + event_header_ntypes -= 35; + } + } + + n_events = hdr.event_size - event_header_length - (2 + 50 + 4 + 1); + + if(debug) { + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + " FDE ServerVersion [%50s]", ptr + 2))); + + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + " FDE Header EventLength %i" + ", N. of supported MySQL/MAriaDB events %i", + event_header_length, + (n_events - event_header_ntypes)))); + } + + if (event_header_ntypes < n_events) { + checksum = ptr + hdr.event_size - event_header_length - event_header_ntypes; + check_alg = checksum[0]; + + if(debug) + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + " FDE Checksum alg desc %i, alg type %s", + check_alg, + check_alg == 1 ? "BINLOG_CHECKSUM_ALG_CRC32" : "NONE or UNDEF"))); + if (check_alg == 1) { + checksum_len = 4; + found_chksum = 1; + } else { + found_chksum = 0; + } + } + } + /* Decode ROTATE EVENT */ + if(hdr.event_type == ROTATE_EVENT) { + int len, slen; + uint64_t new_pos; + char file[BINLOG_FNAMELEN+1]; + + len = hdr.event_size - 19; + new_pos = extract_field(ptr+4, 32); + new_pos <<= 32; + new_pos |= extract_field(ptr, 32); + slen = len - (8 + 4); // Allow for position and CRC + if (found_chksum == 0) + slen += 4; + if (slen > BINLOG_FNAMELEN) + slen = BINLOG_FNAMELEN; + memcpy(file, ptr + 8, slen); + file[slen] = 0; + + if(debug) + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + "- Rotate event @ %llu, next file is [%s] @ %llu", + pos, file, new_pos))); + } + + + /* Check QUERY_EVENT */ + if(hdr.event_type == QUERY_EVENT) { + char *statement_sql; + db_name_len = ptr[4 + 4]; + var_block_len = ptr[4 + 4 + 1 + 2]; + + statement_len = hdr.event_size - 19 - (4+4+1+2+2+var_block_len+1+db_name_len); + //if (checksum_len) + // statement_len -= checksum_len; + + statement_sql = calloc(1, statement_len+1); + strncpy(statement_sql, (char *)ptr+4+4+1+2+2+var_block_len+1+db_name_len, statement_len); + + //fprintf(stderr, "QUERY_EVENT = [%s] %i / %i\n", statement_sql, (4+4+1+2+2+var_block_len+1+db_name_len), statement_len); + + /* A transaction starts with this event */ + if (strncmp(statement_sql, "BEGIN", 5) == 0) { + if (pending_transaction > 0) { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Transaction cannot be @ pos %llu: " + "Another transaction was opened at %ll", + pos, last_known_commit))); + + free(statement_sql); + gwbuf_free(result); + + break; + } else { + pending_transaction = 1; + + if (debug) + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + "> Transaction starts @ pos %llu", pos))); + } + } + + /* Commit received for non transactional tables, i.e. MyISAM */ + if (strncmp(statement_sql, "COMMIT", 6) == 0) { + if (pending_transaction > 0) { + pending_transaction = 3; + + if (debug) + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + " Transaction @ pos %llu, closing @ %llu", last_known_commit, pos))); + } + } + free(statement_sql); + + } + + if(hdr.event_type == XID_EVENT) { + /* Commit received for a transactional tables, i.e. InnoDB */ + uint64_t xid; + xid = extract_field(ptr, 64); + + if (pending_transaction > 0) { + pending_transaction = 2; + if (debug) + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + " Transaction XID @ pos %llu, closing @ %llu", last_known_commit, pos))); + } + } + + if (pending_transaction > 1) { + unsigned long long prev_pos = last_known_commit; + if (debug) + LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, + "< Transaction @ pos %llu, is now closed @ %llu", last_known_commit, pos))); + pending_transaction = 0; + last_known_commit = pos; + } + + gwbuf_free(result); + + /* pos and next_pos sanity checks */ + if (hdr.next_pos > 0 && hdr.next_pos < pos) { + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "Binlog %s: next pos %llu < pos %llu, truncating to %llu", + router->binlog_name, + hdr.next_pos, + pos, + pos))); + + ftruncate(router->binlog_fd, pos); + + router->binlog_position = pos; + + return 3; + } + + if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size)) { + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "Binlog %s: next pos %llu != (pos %llu + event_size %llu), truncating to %llu", + router->binlog_name, + hdr.next_pos, + pos, + hdr.event_size, + pos))); + + ftruncate(router->binlog_fd, pos); + + router->binlog_position = pos; + + return 3; + } + + /* set pos to new value */ + if (hdr.next_pos > 0) { + pos = hdr.next_pos; + } else { + + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: Current event type %lu @ %llu has nex pos = %llu : exiting", hdr.event_type, pos, hdr.next_pos))); + break; + } + } + + if (pending_transaction) { + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "Binlog %s contains an Open Transaction, truncating to %llu", + router->binlog_name, + last_known_commit))); + + ftruncate(router->binlog_fd, last_known_commit); + + router->binlog_position = last_known_commit; + + return 2; + } else { + router->binlog_position = pos; + + return 0; + } +} diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 39386027e..0f8ddffb5 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -39,6 +39,7 @@ * 23/06/2015 Massimiliano Pinto Master communication goes into BLRM_SLAVE_STOPPED state * when an error is encountered in BLRM_BINLOGDUMP state. * Server error code and msg are reported via SHOW SLAVE STATUS + * 03/08/2015 Massimiliano Pinto Initial implementation of transaction safety * * @endverbatim */ @@ -87,6 +88,7 @@ void blr_master_close(ROUTER_INSTANCE *); static char *blr_extract_column(GWBUF *buf, int col); void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf); void poll_fake_write_event(DCB *dcb); +GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr); static int keepalive = 1; /** @@ -167,7 +169,7 @@ DCB *client; LOGIF(LM,(skygw_log_write( LOGFILE_MESSAGE, "%s: attempting to connect to master server %s:%d, binlog %s, pos %lu", - router->service->name, router->service->dbref->server->name, router->service->dbref->server->port, router->binlog_name, router->binlog_position))); + router->service->name, router->service->dbref->server->name, router->service->dbref->server->port, router->binlog_name, router->current_pos))); router->connect_time = time(0); @@ -631,7 +633,7 @@ char query[128]; "%s: Request binlog records from %s at " "position %lu from master server %s:%d", router->service->name, router->binlog_name, - router->binlog_position, + router->current_pos, router->service->dbref->server->name, router->service->dbref->server->port))); break; @@ -729,7 +731,7 @@ int len = 0x1b; data[3] = 0; // Sequence ID data[4] = COM_BINLOG_DUMP; // Command encode_value(&data[5], - router->binlog_position, 32); // binlog position + router->current_pos, 32); // binlog position encode_value(&data[9], 0, 16); // Flags encode_value(&data[11], router->serverid, 32); // Server-id of MaxScale @@ -842,7 +844,7 @@ int n_bufs = -1, pn_bufs = -1; "Insufficient memory to buffer event " "of %d bytes. Binlog %s @ %d.", len, router->binlog_name, - router->binlog_position))); + router->current_pos))); break; } @@ -867,7 +869,7 @@ int n_bufs = -1, pn_bufs = -1; "chain, but failed to create complete " "message as expected. %s @ %d", router->binlog_name, - router->binlog_position))); + router->current_pos))); free(msg); msg = NULL; break; @@ -888,7 +890,7 @@ int n_bufs = -1, pn_bufs = -1; LOGFILE_DEBUG, "Residual data left after %d records. %s @ %d", router->stats.n_binlogs, - router->binlog_name, router->binlog_position))); + router->binlog_name, router->current_pos))); break; } else @@ -943,7 +945,7 @@ int n_bufs = -1, pn_bufs = -1; "length of previous event %d. %s", len, hdr.event_size, router->binlog_name, - router->binlog_position, + router->current_pos, reslen, preslen, prev_length, (prev_length == -1 ? (no_residual ? "No residual data from previous call" : "Residual data from previous call") : "") @@ -1000,7 +1002,7 @@ int n_bufs = -1, pn_bufs = -1; "Closing master connection.", router->service->name, router->binlog_name, - router->binlog_position))); + router->current_pos))); blr_master_close(router); blr_master_delayed_connect(router); return; @@ -1010,10 +1012,109 @@ int n_bufs = -1, pn_bufs = -1; router->lastEventReceived = hdr.event_type; router->lastEventTimestamp = hdr.timestamp; + /** + * Check for an open transaction, if the option is set + * Only complete transactions should be sent to sleves + * + * If a trasaction is pending router->last_commit_pos + * won't be updated. + */ + + if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) { + /* set last_commit_pos to binlog_position */ + + spinlock_acquire(&router->binlog_lock); + + router->binlog_position = router->current_pos; + + spinlock_release(&router->binlog_lock); + } else { + /** + * A transaction is pending. + * Last_commit_pos is on hold. + * + * Log a message if the transaction was opened + * at binlog router start + */ + + if (router->last_written == 0) { + fprintf(stderr, "*** Router started with an Open transaction at %lu / %lu\n", router->binlog_position, router->current_pos); + } + } + // #define SHOW_EVENTS #ifdef SHOW_EVENTS printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size); #endif + + //fprintf(stderr, "*** binlog last_commit_pos = %lu, binlog_pos = %lu\n", router->binlog_position, router->current_pos); + + /** + * Detect transactions in events + * Only complete transactions should be sent to sleves + */ + + if (router->trx_safe) { + /** + * look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT + */ + + if(hdr.event_type == QUERY_EVENT) { + char *statement_sql; + int db_name_len, var_block_len, statement_len; + db_name_len = ptr[4+20+ 4 + 4]; + var_block_len = ptr[4+20+ 4 + 4 + 1 + 2]; + + statement_len = len - (4+20+4+4+1+2+2+var_block_len+1+db_name_len); + statement_sql = calloc(1, statement_len+1); + strncpy(statement_sql, (char *)ptr+4+20+4+4+1+2+2+var_block_len+1+db_name_len, statement_len); + + /* Check for BEGIN (it comes for START TRANSACTION too) */ + if (strncmp(statement_sql, "BEGIN", 5) == 0) { + spinlock_acquire(&router->binlog_lock); + + if (router->pending_transaction > 0) { + //fprintf(stderr, "*** A transaction is already open!!!!\n"); + // stop replication ???? + } + router->pending_transaction = 1; + + spinlock_release(&router->binlog_lock); + + //fprintf(stderr, "Transaction is starting @ %llu / %llu\n", router->binlog_position, router->current_pos); + } + + /* Check for COMMIT in non transactional store engines */ + if (strncmp(statement_sql, "COMMIT", 6) == 0) { + spinlock_acquire(&router->binlog_lock); + + router->pending_transaction = 2; + + spinlock_release(&router->binlog_lock); + + //fprintf(stderr, "Transaction closed @ %llu / %llu\n", router->binlog_position, router->current_pos); + + } + + free(statement_sql); + } + + /* Check for COMMIT in Transactional engines, i.e InnoDB */ + if(hdr.event_type == XID_EVENT) { + uint64_t xid; + xid = extract_field(ptr+4+20, 64); + + if (router->pending_transaction) { + spinlock_acquire(&router->binlog_lock); + + router->pending_transaction = 3; + + //fprintf(stderr, "Transaction XID closed @ %llu / %llu", router->binlog_position, router->current_pos); + spinlock_release(&router->binlog_lock); + } + } + } + event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE; if (hdr.event_type >= 0 && hdr.event_type <= event_limit) @@ -1026,7 +1127,7 @@ int n_bufs = -1, pn_bufs = -1; "Replication fake event. " "Binlog %s @ %d.", router->binlog_name, - router->binlog_position))); + router->current_pos))); router->stats.n_fakeevents++; if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) { @@ -1071,7 +1172,7 @@ int n_bufs = -1, pn_bufs = -1; "Replication heartbeat. " "Binlog %s @ %d.", router->binlog_name, - router->binlog_position))); + router->current_pos))); router->stats.n_heartbeats++; } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) @@ -1080,6 +1181,8 @@ int n_bufs = -1, pn_bufs = -1; // into the binlog file if (hdr.event_type == ROTATE_EVENT) router->rotating = 1; + + /* current event is being written to disk file */ if (blr_write_binlog_record(router, &hdr, ptr) == 0) { /* @@ -1094,6 +1197,8 @@ int n_bufs = -1, pn_bufs = -1; blr_master_delayed_connect(router); return; } + + /* Check for rotete event */ if (hdr.event_type == ROTATE_EVENT) { if (!blr_rotate_event(router, ptr, &hdr)) @@ -1111,7 +1216,76 @@ int n_bufs = -1, pn_bufs = -1; return; } } - blr_distribute_binlog_record(router, &hdr, ptr); + + /** + * Distributing binlog events to slaves + * may depend on pending transaction + */ + + if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) { + spinlock_acquire(&router->binlog_lock); + + router->binlog_position = router->current_pos; + + spinlock_release(&router->binlog_lock); + + /* Now distribute events */ + blr_distribute_binlog_record(router, &hdr, ptr); + } else { + /** + * If transaction is closed: + * + * 1) read current binlog starting + * from router->last_commit_pos + * + * 2) distribute read event + * + * 3) if current pos = router->binlog_position + * update router->last_commit_pos + * + */ + + if (router->pending_transaction > 1) { + unsigned long pos; + GWBUF *record; + uint8_t *raw_data; + REP_HEADER new_hdr; + int i=0; + + //fprintf(stderr, "Time to feed slaves with complete transaction from %llu / %llu\n", router->binlog_position, router->current_pos); + + spinlock_acquire(&router->binlog_lock); + + pos = router->binlog_position; + + spinlock_release(&router->binlog_lock); + + + while ((record = blr_read_events_from_pos(router, pos, &new_hdr)) != NULL) { + i++; + raw_data = GWBUF_DATA(record); + //fprintf(stderr, "Read event %i, last commit @ %lu / %lu. *** Distributing event: Type [%i], size %u: pos @ %llu (next is %llu)\n", i, pos, router->current_pos, new_hdr.event_type, new_hdr.event_size, router->binlog_position, new_hdr.next_pos); + /* distribute event */ + blr_distribute_binlog_record(router, &new_hdr, raw_data); + spinlock_acquire(&router->binlog_lock); + + pos = new_hdr.next_pos; + + spinlock_release(&router->binlog_lock); + gwbuf_free(record); + } + + spinlock_acquire(&router->binlog_lock); + router->binlog_position = router->current_pos; + router->pending_transaction = 0; + spinlock_release(&router->binlog_lock); + } else { + /* A transaction is still pending */ + //fprintf(stderr, "A Transaction is still pending @ %llu, master is @ %llu\n", router->binlog_position, router->current_pos); + } + + } + } else { @@ -1125,7 +1299,7 @@ int n_bufs = -1, pn_bufs = -1; hdr.event_type, hdr.event_size, router->binlog_name, - router->binlog_position))); + router->current_pos))); ptr += 5; if (hdr.event_type == ROTATE_EVENT) { @@ -1178,7 +1352,7 @@ int n_bufs = -1, pn_bufs = -1; LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, "Error packet in binlog stream.%s @ %d.", router->binlog_name, - router->binlog_position))); + router->current_pos))); router->stats.n_binlog_errors++; } @@ -1574,3 +1748,146 @@ char *rval; return rval; } + +/** + * Read a replication event form current opened binlog into a GWBUF structure. + * + * @param router The router instance + * @param pos Position of binlog record to read + * @param hdr Binlog header to populate + * @return The binlog record wrapped in a GWBUF structure + */ +GWBUF +*blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr) { +unsigned long long end_pos = 0; +struct stat statb; +uint8_t hdbuf[19]; +uint8_t *data; +GWBUF *result; +int n; + + /* Get current binnlog position */ + end_pos = router->current_pos; + + /* end of file reached, we're done */ + if (pos == end_pos) { + return NULL; + } + + /* error */ + if (pos > end_pos) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Error: Reading saved events, the specified pos %lu " + "is ahead of current pos %lu for file %s", + pos, router->current_pos, router->binlog_name))); + return NULL; + } + + /* Read the event header information from the file */ + if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19) + { + switch (n) + { + case 0: + LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, + "Reading saved events: reached end of binlog file at %d.", pos))); + break; + case -1: + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: failed to read binlog " + "file %s at position %d" + " (%s).", router->binlog_name, + pos, strerror(errno)))); + + if (errno == EBADF) + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: bad file descriptor for file %s" + ", descriptor %d.", + router->binlog_name, router->binlog_fd))); + break; + default: + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: short read when reading the header. " + "Expected 19 bytes but got %d bytes. " + "Binlog file is %s, position %d", + n, router->binlog_name, pos))); + break; + } + + return NULL; + } + + //fprintf(stderr, "blr_read_events_from_pos read %i bytes. ptr[0]=[%i]\n", n, hdbuf[0]); + + hdr->timestamp = EXTRACT32(hdbuf); + hdr->event_type = hdbuf[4]; + hdr->serverid = EXTRACT32(&hdbuf[5]); + hdr->event_size = extract_field(&hdbuf[9], 32); + hdr->next_pos = EXTRACT32(&hdbuf[13]); + hdr->flags = EXTRACT16(&hdbuf[17]); + + //fprintf(stderr, ">>>> event type %i\n", hdr->event_type); + //fprintf(stderr, ">>>> event size %lu\n", hdr->event_size); + //fprintf(stderr, ">>>> event next_pos %lu\n", hdr->next_pos); + + /* Add MariaDB 10 checks */ + if (hdr->event_type > MAX_EVENT_TYPE) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: invalid event type 0x%x. " + "Binlog file is %s, position %d", + hdr->event_type, + router->binlog_name, pos))); + return NULL; + } + + if ((result = gwbuf_alloc(hdr->event_size)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: failed to allocate memory for binlog entry, " + "size %d at %d.", + hdr->event_size, pos))); + return NULL; + } + + /* Copy event header*/ + data = GWBUF_DATA(result); + memcpy(data, hdbuf, 19); + + /* Read event data and put int into buffer after header */ + if ((n = pread(router->binlog_fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19) + { + if (n == -1) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: the event at %ld in %s. " + "%s, expected %d bytes.", + pos, router->binlog_name, + strerror(errno), hdr->event_size - 19))); + } else { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: short read when reading " + "the event at %ld in %s. " + "Expected %d bytes got %d bytes.", + pos, router->binlog_name, hdr->event_size - 19, n))); + + if (end_pos - pos < hdr->event_size) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Error: Reading saved events: binlog event " + "is close to the end of the binlog file, " + "current file size is %u.", end_pos))); + } + } + + /* free buffer */ + gwbuf_free(result); + + return NULL; + } + +// fprintf(stderr, "blr_read_events_from_pos read [%i] next bytes, raw data size is [%i]\n", n, hdr->event_size); + + return result; +} diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index b12efff3a..6e900c422 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -545,7 +545,7 @@ extern char *strcasestr(); free(query_text); if (router->master_state == BLRM_SLAVE_STOPPED) { - char path[4097] = ""; + char path[PATH_MAX + 1] = ""; char error_string[BINLOG_ERROR_MSG_LEN + 1] = ""; MASTER_SERVER_CFG *current_master = NULL; int removed_cfg = 0; @@ -1013,7 +1013,9 @@ int len, file_len; sprintf(file, "%s", router->binlog_name); file_len = strlen(file); - sprintf(position, "%ld", router->binlog_position); + + sprintf(position, "%lu", router->binlog_position); + len = 5 + file_len + strlen(position) + 1 + 3; if ((pkt = gwbuf_alloc(len)) == NULL) return 0; @@ -1125,7 +1127,12 @@ char *dyn_column=NULL; strncpy((char *)ptr, column, col_len); // Result string ptr += col_len; - sprintf(column, "%ld", router->binlog_position); + /* if router->trx_safe report current_pos*/ + if (router->trx_safe) + sprintf(column, "%lu", router->current_pos); + else + sprintf(column, "%lu", router->binlog_position); + col_len = strlen(column); *ptr++ = col_len; // Length of result string strncpy((char *)ptr, column, col_len); // Result string @@ -2517,7 +2524,7 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) slave->dcb->remote, router->service->dbref->server->name, router->service->dbref->server->port, - router->binlog_name, router->binlog_position))); + router->binlog_name, router->current_pos))); return blr_slave_send_ok(router, slave); } @@ -2534,7 +2541,7 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) static int blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) { - char path[4097]=""; + char path[PATH_MAX+1]=""; int loaded; /* if unconfigured return an error */ @@ -2556,10 +2563,27 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) spinlock_release(&router->lock); /* create a new binlog or just use current one */ - if (strcmp(router->prevbinlog, router->binlog_name)) + if (strcmp(router->prevbinlog, router->binlog_name)) { + if (router->trx_safe && router->pending_transaction) { + char msg[1024+1] = ""; + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning: a transaction is still opened at pos %lu. " + "Current pos is %lu in file %s", + router->binlog_position, + router->current_pos, router->prevbinlog))); + + snprintf(msg, 1024, "A transaction is still opened at pos %lu. Current pos is %lu in file %s", router->binlog_position, router->current_pos, router->prevbinlog); + + blr_slave_send_error_packet(slave, msg, (unsigned int)1254, NULL); + + return 1; + } + blr_file_new_binlog(router, router->binlog_name); - else + } else { blr_file_use_binlog(router, router->binlog_name); + } blr_start_master(router); @@ -2572,13 +2596,13 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) router->service->dbref->server->name, router->service->dbref->server->port, router->binlog_name, - router->binlog_position))); + router->current_pos))); /* File path for router cached authentication data */ strcpy(path, router->binlogdir); - strncat(path, "/cache", 4096); + strncat(path, "/cache", PATH_MAX); - strncat(path, "/dbusers", 4096); + strncat(path, "/dbusers", PATH_MAX); /* Try loading dbusers from configured backends */ loaded = load_mysql_users(router->service); @@ -2792,7 +2816,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error master_logfile, 4, router->binlog_name, - router->binlog_position); + router->current_pos); return_error = 1; } else { @@ -2803,7 +2827,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error master_logfile, 4, router->binlog_name, - router->binlog_position); + router->current_pos); return_error = 1; } @@ -2831,6 +2855,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error memset(router->binlog_name, '\0', sizeof(router->binlog_name)); strncpy(router->binlog_name, master_logfile, BINLOG_FNAMELEN); + router->current_pos = 4; router->binlog_position = 4; LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s: New MASTER_LOG_FILE is [%s]", @@ -2860,13 +2885,13 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error } } else { - if (pos > 0 && pos != router->binlog_position) { + if (pos > 0 && pos != router->current_pos) { snprintf(error, BINLOG_ERROR_MSG_LEN, "Can not set MASTER_LOG_POS to %s: " "Permitted binlog pos is %lu. Current master_log_file=%s, master_log_pos=%lu", passed_pos, - router->binlog_position, + router->current_pos, router->binlog_name, - router->binlog_position); + router->current_pos); return_error = 1; } @@ -2893,6 +2918,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error * Also set binlog name if UNCOFIGURED */ if (router->master_state == BLRM_UNCONFIGURED) { + router->current_pos = 4; router->binlog_position = 4; memset(router->binlog_name, '\0', sizeof(router->binlog_name)); strncpy(router->binlog_name, master_logfile, BINLOG_FNAMELEN); @@ -2904,7 +2930,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s: New MASTER_LOG_POS is [%u]", router->service->name, - router->binlog_position))); + router->current_pos))); if (master_log_pos) free(master_log_pos); @@ -2967,7 +2993,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error router->service->dbref->server->name, router->service->dbref->server->port, router->binlog_name, - router->binlog_position, + router->current_pos, router->user))); blr_master_free_config(current_master); @@ -3150,7 +3176,7 @@ char *blr_set_master_logfile(ROUTER_INSTANCE *router, char *command, char *error router->fileroot, next_binlog_seqname, router->binlog_name, - router->binlog_position); + router->current_pos); free(logfile); @@ -3180,7 +3206,7 @@ static void blr_master_get_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *curr_master) { curr_master->port = router->service->dbref->server->port; curr_master->host = strdup(router->service->dbref->server->name); - curr_master->pos = router->binlog_position; + curr_master->pos = router->current_pos; strncpy(curr_master->logfile, router->binlog_name, BINLOG_FNAMELEN); curr_master->user = strdup(router->user); curr_master->password = strdup(router->password); @@ -3230,6 +3256,7 @@ blr_master_set_empty_config(ROUTER_INSTANCE *router) { server_update_address(router->service->dbref->server, "none"); server_update_port(router->service->dbref->server, (unsigned short)3306); + router->current_pos = 4; router->binlog_position = 4; strcpy(router->binlog_name, ""); } @@ -3244,7 +3271,7 @@ static void blr_master_apply_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *prev_master) { server_update_address(router->service->dbref->server, prev_master->host); server_update_port(router->service->dbref->server, prev_master->port); - router->binlog_position = prev_master->pos; + router->current_pos = prev_master->pos; strcpy(router->binlog_name, prev_master->logfile); if (router->user) { free(router->user);