From e5a2e5cd1f41e3f3670fefd84d0d16341df8acf6 Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Thu, 6 Aug 2015 17:47:37 +0200 Subject: [PATCH] Improvements added in stop/start slave and change master Improvements added in stop/start slave and change master --- server/modules/include/blr.h | 2 + server/modules/routing/binlog/blr.c | 29 ++-- server/modules/routing/binlog/blr_file.c | 148 ++++++++++++++++----- server/modules/routing/binlog/blr_master.c | 8 +- server/modules/routing/binlog/blr_slave.c | 113 ++++++++++++++-- 5 files changed, 243 insertions(+), 57 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index aa09cbe3a..888abaa67 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -167,6 +167,7 @@ typedef struct master_server_config { unsigned short port; char logfile[BINLOG_FNAMELEN+1]; uint64_t pos; + uint64_t safe_pos; char *user; char *password; char *filestem; @@ -360,6 +361,7 @@ typedef struct router_instance { SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */ int trx_safe; /*< Detect and handle partial transactions */ int pending_transaction; /*< Pending transaction */ + uint64_t last_safe_pos; /* last committed transaction */ char binlog_name[BINLOG_FNAMELEN+1]; /*< Name of the current binlog file */ uint64_t binlog_position; diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 15c6dd106..c8c119da5 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -270,6 +270,7 @@ int rc = 0; inst->trx_safe = 0; inst->pending_transaction = 0; + inst->last_safe_pos = 0; my_uuid_init((ulong)rand()*12345,12345); if ((defuuid = (unsigned char *)malloc(20)) != NULL) @@ -580,11 +581,6 @@ int rc = 0; free(inst); return NULL; } - - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "Binlog router: current binlog file is: %s, current position %u\n", - inst->binlog_name, inst->binlog_position))); } /* @@ -614,15 +610,28 @@ int rc = 0; */ if (inst->master_state == BLRM_UNCONNECTED) { - /* NOTE: This setting will be replaced by calling - * blr_read_events_all() routine soon - * The routine may truncate binlog file or put + /** blr_read_events_all() may truncate binlog file or put * master_state into BLR_SLAVE_STOPPED state. * If an open transaction is detected @ pos xxx * inst->binlog_position will be set to xxx + * + * Files are now truncated and router is not in BLR_SLAVE_STOPPED state */ - if (inst->binlog_position == 0) - inst->binlog_position = inst->current_pos; + + int n; + n = blr_read_events_all_events(inst, 1, 0); + + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "blr_read_events_all_events() ret = %i\n", n))); + + fprintf(stderr, "blr_read_events_all_events() ret = %i\n", n); + fprintf(stderr, "current_pos / binlog_pos are [%llu] / [%llu]\n", inst->current_pos, inst->binlog_position); + + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Binlog router: current binlog file is: %s, current position %u\n", + inst->binlog_name, inst->binlog_position))); /* Start replication from master server */ blr_start_master(inst); diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 5e9a5d19b..426b8958d 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -186,7 +186,7 @@ unsigned char magic[] = BINLOG_MAGIC; write(fd, magic, 4); router->current_pos = 4; /* Initial position after the magic number */ - //router->binlog_position = 4; /* Initial position after the magic number */ + router->binlog_position = 4; /* Initial position after the magic number */ } @@ -254,11 +254,8 @@ int fd; close(router->binlog_fd); spinlock_acquire(&router->binlog_lock); strncpy(router->binlog_name, file,BINLOG_FNAMELEN); - //router->binlog_position = lseek(fd, 0L, SEEK_END); router->current_pos = lseek(fd, 0L, SEEK_END); - //if (router->binlog_position < 4) { if (router->current_pos < 4) { - //if (router->binlog_position == 0) { if (router->current_pos == 0) { blr_file_add_magic(router, fd); } else { @@ -266,7 +263,6 @@ int fd; * then report an error. */ LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: binlog file %s has an invalid length %d.", - //router->service->name, path, router->binlog_position))); router->service->name, path, router->current_pos))); close(fd); spinlock_release(&router->binlog_lock); @@ -304,7 +300,6 @@ int n; return 0; } spinlock_acquire(&router->binlog_lock); - //router->binlog_position = hdr->next_pos; router->current_pos = hdr->next_pos; router->last_written = hdr->next_pos - hdr->event_size; spinlock_release(&router->binlog_lock); @@ -931,8 +926,8 @@ int statement_len; int checksum_len=0; int found_chksum = 0; - if (router->binlog_fd == -1) { - LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + if (router->binlog_fd == -1) { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, "*** ERROR: Current binlog file %s is not open", router->binlog_name))); return 1; @@ -941,8 +936,10 @@ int found_chksum = 0; if (fstat(router->binlog_fd, &statb) == 0) filelen = statb.st_size; + router->current_pos = 4; + router->binlog_position = 4; + while (1){ - //fprintf(stderr, "Pos %llu pending trx = %i\n", pos, pending_transaction); /* Read the header information from the file */ if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19) { @@ -961,7 +958,6 @@ int found_chksum = 0; router->binlog_name, last_known_commit))); - //ftruncate(router->binlog_fd, last_known_commit); } break; @@ -985,23 +981,43 @@ int found_chksum = 0; break; } - /* force last_known_commit position */ + /** + * Check for errors and force last_known_commit position + * and current pos + */ + if (pending_transaction) { router->binlog_position = last_known_commit; + router->current_pos = pos; router->pending_transaction = 1; pending_transaction = 0; - } else { - router->binlog_position = pos; - } - /* Truncate file in case of any error */ - if (n != 0) { - ftruncate(router->binlog_fd, pos); + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : pending transaction has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); - router->binlog_position = pos; - return 1; + return 0; } else { - return 0; + /* any error */ + if (n != 0) { + router->binlog_position = last_known_commit; + router->current_pos = pos; + + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); + + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + + return 1; + } else { + router->binlog_position = pos; + router->current_pos = pos; + return 0; + } } } @@ -1030,13 +1046,41 @@ int found_chksum = 0; hdr.event_type, router->binlog_name, pos))); - ftruncate(router->binlog_fd, pos); + router->binlog_position = last_known_commit; + router->current_pos = pos; - router->binlog_position = pos; + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); + + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); return 1; } + if (hdr.event_size <= 0) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "*** ERROR: event size error: " + "size %d at %llu.", + hdr.event_size, pos))); + + router->binlog_position = last_known_commit; + router->current_pos = pos; + + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); + + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + + return 1; + } + /* Allocate a GWBUF for the event */ if ((result = gwbuf_alloc(hdr.event_size)) == NULL) { @@ -1045,9 +1089,16 @@ int found_chksum = 0; "size %d at %llu.", hdr.event_size, pos))); - ftruncate(router->binlog_fd, pos); + router->binlog_position = last_known_commit; + router->current_pos = pos; - router->binlog_position = pos; + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); + + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); return 1; } @@ -1085,9 +1136,16 @@ int found_chksum = 0; gwbuf_free(result); - ftruncate(router->binlog_fd, pos); + router->binlog_position = last_known_commit; + router->current_pos = pos; - router->binlog_position = pos; + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); + + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); return 1; } @@ -1267,11 +1325,18 @@ int found_chksum = 0; pos, pos))); - ftruncate(router->binlog_fd, pos); + router->binlog_position = last_known_commit; + router->current_pos = pos; - router->binlog_position = pos; + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); - return 3; + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + + return 2; } if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size)) { @@ -1283,11 +1348,18 @@ int found_chksum = 0; hdr.event_size, pos))); - ftruncate(router->binlog_fd, pos); + router->binlog_position = last_known_commit; + router->current_pos = pos; - router->binlog_position = pos; + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); - return 3; + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + + return 2; } /* set pos to new value */ @@ -1307,13 +1379,19 @@ int found_chksum = 0; router->binlog_name, last_known_commit))); - ftruncate(router->binlog_fd, last_known_commit); + router->binlog_position = last_known_commit; + router->current_pos = pos; + router->pending_transaction = 1; - router->binlog_position = last_known_commit; + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "warning : an error has been found. " + "Setting safe pos to %lu, current pos %lu", + router->binlog_position, router->current_pos))); - return 2; + return 0; } else { router->binlog_position = pos; + router->current_pos = pos; return 0; } diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 0f8ddffb5..75d198bdb 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -1275,10 +1275,10 @@ int n_bufs = -1, pn_bufs = -1; gwbuf_free(record); } - spinlock_acquire(&router->binlog_lock); + spinlock_acquire(&router->lock); router->binlog_position = router->current_pos; router->pending_transaction = 0; - spinlock_release(&router->binlog_lock); + spinlock_release(&router->lock); } else { /* A transaction is still pending */ //fprintf(stderr, "A Transaction is still pending @ %llu, master is @ %llu\n", router->binlog_position, router->current_pos); @@ -1450,8 +1450,12 @@ char file[BINLOG_FNAMELEN+1]; #endif strcpy(router->prevbinlog, router->binlog_name); + + printf("New file: %s/%s @ %ld, pending transaction [%i]\n", file, router->prevbinlog, pos, router->pending_transaction); + if (strncmp(router->binlog_name, file, slen) != 0) { + fprintf(stderr, "Calling rotate for [%s]/[%s] prev [%s]\n", router->binlog_name, file, router->prevbinlog); router->stats.n_rotates++; if (blr_file_rotate(router, file, pos) == 0) { diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 6e900c422..6e1fb4ff9 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -117,6 +117,7 @@ static void blr_master_free_config(MASTER_SERVER_CFG *current_master); static void blr_master_restore_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *current_master); static void blr_master_set_empty_config(ROUTER_INSTANCE *router); static void blr_master_apply_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *prev_master); +static int blr_slave_send_ok_message(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave, char *message); void poll_fake_write_event(DCB *dcb); @@ -716,9 +717,25 @@ extern char *strcasestr(); spinlock_release(&router->lock); } - blr_master_free_config(current_master); + if (!router->trx_safe) + blr_master_free_config(current_master); - return blr_slave_send_ok(router, slave); + if (router->trx_safe && router->pending_transaction) { + if (strcmp(router->binlog_name, router->prevbinlog) != 0) + { + char message[1024+1] = ""; + snprintf(message, 1024, "A transaction is open in current binlog file %s, It will be truncated at pos %lu by next START SLAVE command", current_master->logfile, current_master->safe_pos); + blr_master_free_config(current_master); + + return blr_slave_send_ok_message(router, slave, message); + } else { + blr_master_free_config(current_master); + return blr_slave_send_ok(router, slave); + } + + } else { + return blr_slave_send_ok(router, slave); + } } } } @@ -2454,6 +2471,39 @@ uint8_t *ptr; return slave->dcb->func.write(slave->dcb, pkt); } +static int +blr_slave_send_ok_message(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave, char *message) +{ +GWBUF *pkt; +uint8_t *ptr; + + if ((pkt = gwbuf_alloc(11+strlen(message)+1)) == NULL) + return 0; + ptr = GWBUF_DATA(pkt); + *ptr++ = 7 + strlen(message) +1; // Payload length + *ptr++ = 0; + *ptr++ = 0; + *ptr++ = 1; // Seqno + *ptr++ = 0; // ok + *ptr++ = 0; + *ptr++ = 0; + + *ptr++ = 2; + *ptr++ = 0; + + if(strlen(message) == 0) { + *ptr++ = 0; + *ptr++ = 0; + } else { + *ptr++ = 1; + *ptr++ = 0; + *ptr++ = strlen(message); + strcpy((char *)ptr, message); + } + + return slave->dcb->func.write(slave->dcb, pkt); +} + /** * Stop current replication from master @@ -2494,6 +2544,16 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) router->master_state = BLRM_SLAVE_STOPPED; + router->last_safe_pos = router->binlog_position; + + /** + * Set router->prevbinlog to router->binlog_name + * The FDE event with current filename may arrive after STOP SLAVE is received + */ + + if (strcmp(router->binlog_name, router->prevbinlog) !=0) + strncpy(router->prevbinlog, router->binlog_name, BINLOG_FNAMELEN); + spinlock_release(&router->lock); if (router->client) { @@ -2526,7 +2586,13 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) router->service->dbref->server->port, router->binlog_name, router->current_pos))); - return blr_slave_send_ok(router, slave); + if (router->trx_safe && router->pending_transaction) { + char message[1024+1] = ""; + snprintf(message, 1024, "A transaction is open at pos %lu, file %s", router->binlog_position, router->binlog_name); + return blr_slave_send_ok_message(router, slave, message); + } else { + return blr_slave_send_ok(router, slave); + } } /** @@ -2563,18 +2629,43 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) spinlock_release(&router->lock); /* create a new binlog or just use current one */ - if (strcmp(router->prevbinlog, router->binlog_name)) { + if (strlen(router->prevbinlog) && strcmp(router->prevbinlog, router->binlog_name)) { if (router->trx_safe && router->pending_transaction) { char msg[1024+1] = ""; + char file[PATH_MAX+1] = ""; + + snprintf(msg, 1024, "A transaction is still opened at pos %lu in file %s. Truncating it ... Try START SLAVE again.", router->last_safe_pos, router->prevbinlog); + + + /* Truncate previous binlog file to safe pos */ + + snprintf(file, PATH_MAX, "%s/%s", router->binlogdir, router->prevbinlog); + truncate(file, router->last_safe_pos); + + /* Log it */ LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "Warning: a transaction is still opened at pos %lu. " - "Current pos is %lu in file %s", - router->binlog_position, - router->current_pos, router->prevbinlog))); + "Warning: a transaction is still opened at pos %lu" + " File %s will be truncated. " + "Next binlog file is %s at pos %lu, " + "START SLAVE is required again.", + router->last_safe_pos, + router->prevbinlog, + router->binlog_name, + 4))); - snprintf(msg, 1024, "A transaction is still opened at pos %lu. Current pos is %lu in file %s", router->binlog_position, router->current_pos, router->prevbinlog); + spinlock_acquire(&router->lock); + router->pending_transaction = 0; + router->last_safe_pos = 0; + router->last_safe_pos = 0; + router->master_state = BLRM_SLAVE_STOPPED; + router->current_pos = 4; + router->binlog_position = 4; + + spinlock_release(&router->lock); + + /* Send error message to mysql command */ blr_slave_send_error_packet(slave, msg, (unsigned int)1254, NULL); return 1; @@ -2586,7 +2677,7 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) } blr_start_master(router); - + LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, "%s: START SLAVE executed by %s@%s. Trying connection to master %s:%d, binlog %s, pos %lu", @@ -3207,6 +3298,7 @@ blr_master_get_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *curr_master) { curr_master->port = router->service->dbref->server->port; curr_master->host = strdup(router->service->dbref->server->name); curr_master->pos = router->current_pos; + curr_master->safe_pos = router->binlog_position; strncpy(curr_master->logfile, router->binlog_name, BINLOG_FNAMELEN); curr_master->user = strdup(router->user); curr_master->password = strdup(router->password); @@ -3272,6 +3364,7 @@ blr_master_apply_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *prev_master) server_update_address(router->service->dbref->server, prev_master->host); server_update_port(router->service->dbref->server, prev_master->port); router->current_pos = prev_master->pos; + router->binlog_position = prev_master->safe_pos; strcpy(router->binlog_name, prev_master->logfile); if (router->user) { free(router->user);