diff --git a/server/core/gateway.c b/server/core/gateway.c index 3e5a63a27..5d5f766c6 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -617,7 +617,8 @@ check_home_dir: free(logstr); goto return_succp; } - + +#if WRITABLE_HOME if (!file_is_writable(*p_home_dir)) { char* tailstr = "MaxScale doesn't have write permission " @@ -637,7 +638,7 @@ check_home_dir: free(logstr); goto return_succp; } - +#endif if (!daemon_mode) { fprintf(stderr, diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 028e45d26..c3c809ac8 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -41,6 +41,8 @@ #define BINLOG_NAMEFMT "%s.%06d" #define BINLOG_NAME_ROOT "mysql-bin" +#define BINLOG_EVENT_HDR_LEN 19 + /* How often to call the binlog status function (seconds) */ #define BLR_STATS_FREQ 60 #define BLR_NSTATS_MINUTES 30 @@ -64,9 +66,9 @@ * BLR_MASTER_BACKOFF_TIME The increments of the back off time (seconds) * BLR_MAX_BACKOFF Maximum number of increments to backoff to */ - -#define BLR_MASTER_BACKOFF_TIME 5 +#define BLR_MASTER_BACKOFF_TIME 10 #define BLR_MAX_BACKOFF 60 + /** * Some useful macros for examining the MySQL Response packets */ @@ -128,6 +130,7 @@ typedef struct blfile { */ typedef struct { int n_events; /*< Number of events sent */ + unsigned long n_bytes; /*< Number of bytes sent */ int n_bursts; /*< Number of bursts sent */ int n_requests; /*< Number of requests received */ int n_flows; /*< Number of flow control restarts */ @@ -138,6 +141,7 @@ typedef struct { int n_above; int n_failed_read; int n_overrun; + int n_caughtup; int n_actions[3]; uint64_t lastsample; int minno; @@ -175,6 +179,7 @@ typedef struct router_slave { *router; /*< Pointer to the owning router */ struct router_slave *next; SLAVE_STATS stats; /*< Slave statistics */ + time_t connect_time; /*< Connect time of slave */ #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif @@ -269,6 +274,7 @@ typedef struct router_instance { int active_logs; int reconnect_pending; int retry_backoff; + time_t connect_time; int handling_threads; struct router_instance *next; @@ -278,25 +284,26 @@ typedef struct router_instance { * State machine for the master to MaxScale replication */ #define BLRM_UNCONNECTED 0x0000 -#define BLRM_AUTHENTICATED 0x0001 -#define BLRM_TIMESTAMP 0x0002 -#define BLRM_SERVERID 0x0003 -#define BLRM_HBPERIOD 0x0004 -#define BLRM_CHKSUM1 0x0005 -#define BLRM_CHKSUM2 0x0006 -#define BLRM_GTIDMODE 0x0007 -#define BLRM_MUUID 0x0008 -#define BLRM_SUUID 0x0009 -#define BLRM_LATIN1 0x000A -#define BLRM_UTF8 0x000B -#define BLRM_SELECT1 0x000C -#define BLRM_SELECTVER 0x000D -#define BLRM_REGISTER 0x000E -#define BLRM_BINLOGDUMP 0x000F +#define BLRM_CONNECTING 0x0001 +#define BLRM_AUTHENTICATED 0x0002 +#define BLRM_TIMESTAMP 0x0003 +#define BLRM_SERVERID 0x0004 +#define BLRM_HBPERIOD 0x0005 +#define BLRM_CHKSUM1 0x0006 +#define BLRM_CHKSUM2 0x0007 +#define BLRM_GTIDMODE 0x0008 +#define BLRM_MUUID 0x0009 +#define BLRM_SUUID 0x000A +#define BLRM_LATIN1 0x000B +#define BLRM_UTF8 0x000C +#define BLRM_SELECT1 0x000D +#define BLRM_SELECTVER 0x000E +#define BLRM_REGISTER 0x000F +#define BLRM_BINLOGDUMP 0x0010 -#define BLRM_MAXSTATE 0x000F +#define BLRM_MAXSTATE 0x0010 -static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrieval", +static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval", "Server ID retrieval", "HeartBeat Period setup", "binlog checksum config", "binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval", "Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1", @@ -371,6 +378,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", #define ANONYMOUS_GTID_EVENT 0x22 #define PREVIOUS_GTIDS_EVENT 0x23 +#define MAX_EVENT_TYPE 0x23 + /** * Binlog event flags */ diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 2766052e9..affdece82 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -416,6 +416,7 @@ ROUTER_SLAVE *slave; slave->router = inst; slave->file = NULL; strcpy(slave->binlogfile, "unassigned"); + slave->connect_time = time(0); /** * Add this session to the list of active sessions. @@ -509,9 +510,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; { /* * We must be closing the master session. - * - * TODO: Handle closure of master session */ + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "%s: Master %s disconnected after %ld seconds. " + "%d events read.", + router->service->name, router->master->remote, + time(0) - router->connect_time, router->stats.n_binlogs))); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Binlog router close session with master server %s", @@ -529,6 +534,14 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; /* decrease server registered slaves counter */ atomic_add(&router->stats.n_registered, -1); + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "%s: Slave %s disconnected after %ld seconds. " + "%d events sent, %lu bytes.", + router->service->name, slave->dcb->remote, + time(0) - slave->connect_time, slave->stats.n_events, + slave->stats.n_bytes))); + /* * Mark the slave as unregistered to prevent the forwarding * of any more binlog records to this slave. @@ -739,19 +752,44 @@ struct tm tm; min15 /= 15.0; min10 /= 10.0; min5 /= 5.0; - dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid); + dcb_printf(dcb, + "\t\tServer-id: %d\n", + session->serverid); if (session->hostname) - dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname); - dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb); - dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno); - dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]); - dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile); - dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos); + dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname); + dcb_printf(dcb, + "\t\tSlave: %d\n", + session->dcb->remote); + dcb_printf(dcb, + "\t\tSlave DCB: %p\n", + session->dcb); + dcb_printf(dcb, + "\t\tNext Sequence No: %d\n", + session->seqno); + dcb_printf(dcb, + "\t\tState: %s\n", + blrs_states[session->state]); + dcb_printf(dcb, + "\t\tBinlog file: %s\n", + session->binlogfile); + dcb_printf(dcb, + "\t\tBinlog position: %u\n", + session->binlog_pos); if (session->nocrc) - dcb_printf(dcb, "\t\tMaster Binlog CRC: None\n"); - dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests); - dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events); - dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts); + dcb_printf(dcb, + "\t\tMaster Binlog CRC: None\n"); + dcb_printf(dcb, + "\t\tNo. requests: %u\n", + session->stats.n_requests); + dcb_printf(dcb, + "\t\tNo. events sent: %u\n", + session->stats.n_events); + dcb_printf(dcb, + "\t\tNo. bursts sent: %u\n", + session->stats.n_bursts); + dcb_printf(dcb, + "\t\tNo. transitions to follow mode: %u\n", + session->stats.n_bursts); minno = session->stats.minno - 1; if (minno == -1) minno = 30; @@ -760,15 +798,18 @@ struct tm tm; dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n", session->stats.minavgs[minno], min5, min10, min15, min30); - dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); - dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd); - dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb); - dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna); - dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read); - dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun); - dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]); - dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]); - dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]); + dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); + dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd); + dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb); + dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read); + +#if DETAILED_DIAG + dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun); + dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]); + dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]); + dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]); +#endif + if ((session->cstate & CS_UPTODATE) == 0) { dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n", @@ -793,7 +834,7 @@ struct tm tm; dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n"); spinlock_stats(&session->rses_lock, spin_reporter, dcb); #endif - dcb_printf(dcb, "\n"); + dcb_printf(dcb, "\t\t--------------------\n\n"); session = session->next; } spinlock_release(&router_inst->lock); @@ -822,6 +863,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; router->stats.lastReply = time(0); } +static char * +extract_message(GWBUF *errpkt) +{ +char *rval; +int len; + + len = EXTRACT24(errpkt->start); + if ((rval = (char *)malloc(len)) == NULL) + return NULL; + memcpy(rval, (char *)(errpkt->start) + 7, 6); + rval[6] = ' '; + memcpy(&rval[7], (char *)(errpkt->start) + 13, len - 8); + rval[len-2] = 0; + return rval; +} + + + /** * Error Reply routine * @@ -841,10 +900,10 @@ errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_ { ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; int error, len; -char msg[85]; +char msg[85], *errmsg; len = sizeof(error); - if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0) + if (router->master && getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0 && error != 0) { strerror_r(error, msg, 80); strcat(msg, " "); @@ -852,10 +911,21 @@ char msg[85]; else strcpy(msg, ""); + errmsg = extract_message(message); LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master", - message, msg))); + LOGFILE_ERROR, "%s: Master connection error '%s' in state '%s', " + "%sattempting reconnect to master", + router->service->name, errmsg, + blrm_states[router->master_state], msg))); + if (errmsg) + free(errmsg); *succp = true; + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "%s: Master %s disconnected after %ld seconds. " + "%d events read.", + router->service->name, router->master->remote, + time(0) - router->connect_time, router->stats.n_binlogs))); blr_master_reconnect(router); } diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index f4b2b82c4..77ebb2d27 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -353,7 +353,7 @@ struct stat statb; "Short read when reading the header. " "Expected 19 bytes but got %d bytes. " "Binlog file is %s, position %d", - file->binlogname, pos, n))); + n, file->binlogname, pos))); break; } return NULL; @@ -364,6 +364,17 @@ struct stat statb; hdr->event_size = extract_field(&hdbuf[9], 32); hdr->next_pos = EXTRACT32(&hdbuf[13]); hdr->flags = EXTRACT16(&hdbuf[17]); + + if (hdr->event_type > MAX_EVENT_TYPE) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Invalid event type 0x%x. " + "Binlog file is %s, position %d", + hdr->event_type, + file->binlogname, pos))); + return NULL; + } + if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 2e0ad008a..7f7c888b1 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -91,6 +91,17 @@ blr_start_master(ROUTER_INSTANCE *router) DCB *client; GWBUF *buf; + spinlock_acquire(&router->lock); + if (router->master_state != BLRM_UNCONNECTED) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "%s: Master Connect: Unexpected master state %s\n", + router->service->name, blrm_states[router->master_state]))); + spinlock_release(&router->lock); + return; + } + router->master_state = BLRM_CONNECTING; + spinlock_release(&router->lock); if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, @@ -108,17 +119,27 @@ GWBUF *buf; client->session = router->session; if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL) { - char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1); - sprintf(name, "%s Master", router->service->name); - hktask_oneshot(name, blr_start_master, router, - BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); + char *name; + if ((name = malloc(strlen(router->service->name) + + strlen(" Master") + 1)) != NULL) + { + sprintf(name, "%s Master", router->service->name); + hktask_oneshot(name, blr_start_master, router, + BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); + } if (router->retry_backoff > BLR_MAX_BACKOFF) - router->retry_backoff = 1; + router->retry_backoff = BLR_MAX_BACKOFF; LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, "Binlog router: failed to connect to master server '%s'", router->service->databases->unique_name))); return; } + router->master->remote = strdup(router->service->databases->name); + LOGIF(LM,(skygw_log_write( + LOGFILE_MESSAGE, + "%s: atempting to connect to master server %s.", + router->service->name, router->master->remote))); + router->connect_time = time(0); if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive ))) perror("setsockopt"); @@ -129,7 +150,6 @@ perror("setsockopt"); router->master_state = BLRM_TIMESTAMP; router->stats.n_masterstarts++; - router->retry_backoff = 1; } /** @@ -160,7 +180,27 @@ GWBUF *ptr; router->reconnect_pending = 0; router->active_logs = 0; spinlock_release(&router->lock); - blr_start_master(router); + if (router->master_state < BLRM_BINLOGDUMP) + { + char *name; + + router->master_state = BLRM_UNCONNECTED; + + if ((name = malloc(strlen(router->service->name) + + strlen(" Master")+1)) != NULL); + { + sprintf(name, "%s Master", router->service->name); + hktask_oneshot(name, blr_start_master, router, + BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); + } + if (router->retry_backoff > BLR_MAX_BACKOFF) + router->retry_backoff = BLR_MAX_BACKOFF; + } + else + { + router->master_state = BLRM_UNCONNECTED; + blr_start_master(router); + } } /** @@ -225,8 +265,9 @@ char query[128]; if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.", - router->master_state))); + LOGFILE_ERROR, + "Invalid master state machine state (%d) for binlog router.", + router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); if (router->reconnect_pending) @@ -234,6 +275,12 @@ char query[128]; router->active_logs = 0; spinlock_release(&router->lock); atomic_add(&router->handling_threads, -1); + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "%s: Pending reconnect in state %s.", + router->service->name, + blrm_states[router->master_state] + ))); blr_restart_master(router); return; } @@ -247,8 +294,11 @@ char query[128]; { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "Received error: %d, %s from master during %s phase of the master state machine.", - MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] + "%s: Received error: %d, %s from master during %s phase " + "of the master state machine.", + router->service->name, + MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), + blrm_states[router->master_state] ))); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); @@ -272,6 +322,7 @@ char query[128]; buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); router->master_state = BLRM_SERVERID; router->master->func.write(router->master, buf); + router->retry_backoff = 1; break; case BLRM_SERVERID: // Response to fetch of master's server-id @@ -357,6 +408,12 @@ char query[128]; buf = blr_make_binlog_dump(router); router->master_state = BLRM_BINLOGDUMP; router->master->func.write(router->master, buf); + LOGIF(LM,(skygw_log_write( + LOGFILE_MESSAGE, + "%s: Request binlog records from %s at " + "position %d from master server %s.", + router->service->name, router->binlog_name, + router->binlog_position, router->master->remote))); break; case BLRM_BINLOGDUMP: // Main body, we have received a binlog record from the master @@ -618,133 +675,169 @@ static REP_HEADER phdr; n_bufs = 1; } - blr_extract_header(ptr, &hdr); + if (len < BINLOG_EVENT_HDR_LEN) + { + char *msg = ""; - if (hdr.event_size != len - 5) - { - LOGIF(LE,(skygw_log_write( - LOGFILE_ERROR, - "Packet length is %d, but event size is %d, " - "binlog file %s position %d" - "reslen is %d and preslen is %d, " - "length of previous event %d. %s", - len, hdr.event_size, - router->binlog_name, - router->binlog_position, - reslen, preslen, prev_length, - (prev_length == -1 ? - (no_residual ? "No residual data from previous call" : "Residual data from previous call") : "") - ))); - blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len); - LOGIF(LE,(skygw_log_write( - LOGFILE_ERROR, - "This event (0x%x) was contained in %d GWBUFs, " - "the previous events was contained in %d GWBUFs", - router->lastEventReceived, n_bufs, pn_bufs))); - if (msg) + if (ptr[4] == 0xfe) /* EOF Packet */ { - free(msg); - msg = NULL; + msg = "end of file"; } - break; + else if (ptr[4] == 0xff) /* EOF Packet */ + { + msg = "error"; + } + LOGIF(LM,(skygw_log_write( + LOGFILE_MESSAGE, + "Non-event message (%s) from master.", + msg))); } - phdr = hdr; - if (hdr.ok == 0) + else { - router->stats.n_binlogs++; - router->lastEventReceived = hdr.event_type; + + blr_extract_header(ptr, &hdr); + + if (hdr.event_size != len - 5) + { + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "Packet length is %d, but event size is %d, " + "binlog file %s position %d " + "reslen is %d and preslen is %d, " + "length of previous event %d. %s", + len, hdr.event_size, + router->binlog_name, + router->binlog_position, + reslen, preslen, prev_length, + (prev_length == -1 ? + (no_residual ? "No residual data from previous call" : "Residual data from previous call") : "") + ))); + blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len); + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "This event (0x%x) was contained in %d GWBUFs, " + "the previous events was contained in %d GWBUFs", + router->lastEventReceived, n_bufs, pn_bufs))); + if (msg) + { + free(msg); + msg = NULL; + } + break; + } + phdr = hdr; + if (hdr.ok == 0) + { + router->stats.n_binlogs++; + router->lastEventReceived = hdr.event_type; // #define SHOW_EVENTS #ifdef SHOW_EVENTS - printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size); + printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size); #endif - if (hdr.event_type >= 0 && hdr.event_type < 0x24) - router->stats.events[hdr.event_type]++; - if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0) - { - // Fake format description message - LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG, - "Replication fake event. " - "Binlog %s @ %d.", - router->binlog_name, - router->binlog_position))); - router->stats.n_fakeevents++; - if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) + if (hdr.event_type >= 0 && hdr.event_type < 0x24) + router->stats.events[hdr.event_type]++; + if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0) { - /* - * We need to save this to replay to new - * slaves that attach later. - */ - if (router->saved_master.fde_event) - free(router->saved_master.fde_event); - router->saved_master.fde_len = hdr.event_size; - router->saved_master.fde_event = malloc(hdr.event_size); - if (router->saved_master.fde_event) - memcpy(router->saved_master.fde_event, - ptr + 5, hdr.event_size); + // Fake format description message + LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG, + "Replication fake event. " + "Binlog %s @ %d.", + router->binlog_name, + router->binlog_position))); + router->stats.n_fakeevents++; + if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) + { + uint8_t *new_fde; + unsigned int new_fde_len; + /* + * We need to save this to replay to new + * slaves that attach later. + */ + new_fde_len = hdr.event_size; + new_fde = malloc(hdr.event_size); + if (new_fde) + { + memcpy(new_fde, ptr + 5, hdr.event_size); + if (router->saved_master.fde_event) + free(router->saved_master.fde_event); + router->saved_master.fde_event = new_fde; + router->saved_master.fde_len = new_fde_len; + } + else + { + LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, + "%s: Received a format description " + "event that MaxScale was unable to " + "record. Event length is %d.", + router->service->name, + hdr.event_size))); + blr_log_packet(LOGFILE_ERROR, + "Format Description Event:", ptr, len); + } + } + } + else + { + if (hdr.event_type == HEARTBEAT_EVENT) + { +#ifdef SHOW_EVENTS + printf("Replication heartbeat\n"); +#endif + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Replication heartbeat. " + "Binlog %s @ %d.", + router->binlog_name, + router->binlog_position))); + router->stats.n_heartbeats++; + } + else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) + { + ptr = ptr + 5; // We don't put the first byte of the payload + // into the binlog file + if (hdr.event_type == ROTATE_EVENT) + router->rotating = 1; + blr_write_binlog_record(router, &hdr, ptr); + if (hdr.event_type == ROTATE_EVENT) + { + blr_rotate_event(router, ptr, &hdr); + } + blr_distribute_binlog_record(router, &hdr, ptr); + } + else + { + router->stats.n_artificial++; + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Artificial event not written " + "to disk or distributed. " + "Type 0x%x, Length %d, Binlog " + "%s @ %d.", + hdr.event_type, + hdr.event_size, + router->binlog_name, + router->binlog_position))); + ptr += 5; + if (hdr.event_type == ROTATE_EVENT) + { + router->rotating = 1; + blr_rotate_event(router, ptr, &hdr); + } + } } } else { - if (hdr.event_type == HEARTBEAT_EVENT) - { -#ifdef SHOW_EVENTS - printf("Replication heartbeat\n"); -#endif - LOGIF(LD,(skygw_log_write( - LOGFILE_DEBUG, - "Replication heartbeat. " - "Binlog %s @ %d.", - router->binlog_name, - router->binlog_position))); - router->stats.n_heartbeats++; - } - else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) - { - ptr = ptr + 5; // We don't put the first byte of the payload - // into the binlog file - if (hdr.event_type == ROTATE_EVENT) - router->rotating = 1; - blr_write_binlog_record(router, &hdr, ptr); - if (hdr.event_type == ROTATE_EVENT) - { - blr_rotate_event(router, ptr, &hdr); - } - blr_distribute_binlog_record(router, &hdr, ptr); - } - else - { - router->stats.n_artificial++; - LOGIF(LD,(skygw_log_write( - LOGFILE_DEBUG, - "Artificial event not written " - "to disk or distributed. " - "Type 0x%x, Length %d, Binlog " - "%s @ %d.", - hdr.event_type, - hdr.event_size, - router->binlog_name, - router->binlog_position))); - ptr += 5; - if (hdr.event_type == ROTATE_EVENT) - { - router->rotating = 1; - blr_rotate_event(router, ptr, &hdr); - } - } + LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, + "Error packet in binlog stream.%s @ %d.", + router->binlog_name, + router->binlog_position))); + blr_log_packet(LOGFILE_ERROR, "Error Packet:", + ptr, len); + router->stats.n_binlog_errors++; } } - else - { - printf("Binlog router error: %s\n", &ptr[7]); - LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, - "Error packet in binlog stream.%s @ %d.", - router->binlog_name, - router->binlog_position))); - blr_log_packet(LOGFILE_ERROR, "Error Packet:", - ptr, len); - router->stats.n_binlog_errors++; - } if (msg) { @@ -968,6 +1061,7 @@ int action; { blr_slave_rotate(slave, ptr); } + slave->stats.n_bytes += gwbuf_length(pkt); slave->dcb->func.write(slave->dcb, pkt); if (hdr->event_type != ROTATE_EVENT) { diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 89e18bd8e..c0dcc0be5 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -65,6 +65,7 @@ int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); extern int lm_enabled_logfiles_bitmask; @@ -544,29 +545,8 @@ uint32_t chksum; rval = slave->dcb->func.write(slave->dcb, resp); /* Send the FORMAT_DESCRIPTION_EVENT */ - if (router->saved_master.fde_event) - { - resp = gwbuf_alloc(router->saved_master.fde_len + 5); - ptr = GWBUF_DATA(resp); - encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length - ptr += 3; - *ptr++ = slave->seqno++; - *ptr++ = 0; // OK - memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len); - encode_value(ptr, time(0), 32); // Overwrite timestamp - /* - * Since we have changed the timestamp we must recalculate the CRC - * - * Position ptr to the start of the event header, - * calculate a new checksum - * and write it into the header - */ - ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4; - chksum = crc32(0L, NULL, 0); - chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4); - encode_value(ptr, chksum, 32); - rval = slave->dcb->func.write(slave->dcb, resp); - } + if (slave->binlog_pos != 4) + blr_slave_send_fde(router, slave); slave->dcb->low_water = router->low_water; slave->dcb->high_water = router->high_water; @@ -782,6 +762,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "blr_open_binlog took %d beats", hkheartbeat - beat1))); } + slave->stats.n_bytes += gwbuf_length(head); written = slave->dcb->func.write(slave->dcb, head); if (written && hdr.event_type != ROTATE_EVENT) { @@ -839,11 +820,23 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write( if (state_change) { - LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, - "%s: Slave %s is up to date %s, %u.", + slave->stats.n_caughtup++; + if (slave->stats.n_caughtup == 1) + { + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "%s: Slave %s is up to date %s, %u.", router->service->name, slave->dcb->remote, slave->binlogfile, slave->binlog_pos))); + } + else if ((slave->stats.n_caughtup % 50) == 0) + { + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "%s: Slave %s is up to date %s, %u.", + router->service->name, + slave->dcb->remote, + slave->binlogfile, slave->binlog_pos))); + } } } else @@ -1031,3 +1024,51 @@ uint32_t chksum; slave->dcb->func.write(slave->dcb, resp); return 1; } + +/** + * Send a "fake" format description event to the newly connected slave + * + * @param router The router instance + * @param slave The slave to send the event to + */ +static void +blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +BLFILE *file; +REP_HEADER hdr; +GWBUF *record, *head; +uint8_t *ptr; +uint32_t chksum; + + if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL) + return; + if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL) + { + blr_close_binlog(router, file); + return; + } + blr_close_binlog(router, file); + head = gwbuf_alloc(5); + ptr = GWBUF_DATA(head); + encode_value(ptr, hdr.event_size + 1, 24); // Payload length + ptr += 3; + *ptr++ = slave->seqno++; + *ptr++ = 0; // OK + head = gwbuf_append(head, record); + ptr = GWBUF_DATA(record); + encode_value(ptr, time(0), 32); // Overwrite timestamp + ptr += 13; + encode_value(ptr, 0, 32); // Set next position to 0 + /* + * Since we have changed the timestamp we must recalculate the CRC + * + * Position ptr to the start of the event header, + * calculate a new checksum + * and write it into the header + */ + ptr = GWBUF_DATA(record) + hdr.event_size - 4; + chksum = crc32(0L, NULL, 0); + chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4); + encode_value(ptr, chksum, 32); + slave->dcb->func.write(slave->dcb, head); +}