/*lint -e662 */ /*lint -e661 */ /* * Copyright (c) 2016 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl. * * Change Date: 2019-01-01 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ /** * @file blr_master.c - contains code for the router to master communication * * The binlog router is designed to be used in replication environments to * increase the replication fanout of a master server. It provides a transparant * mechanism to read the binlog entries for multiple slaves while requiring * only a single connection to the actual master to support the slaves. * * The current prototype implement is designed to support MySQL 5.6 and has * a number of limitations. This prototype is merely a proof of concept and * should not be considered production ready. * * @verbatim * Revision History * * Date Who Description * 02/04/2014 Mark Riddoch Initial implementation * 07/05/2015 Massimiliano Pinto Added MariaDB 10 Compatibility * 25/05/2015 Massimiliano Pinto Added BLRM_SLAVE_STOPPED state * 08/06/2015 Massimiliano Pinto Added m_errno and m_errmsg * 23/06/2015 Massimiliano Pinto Master communication goes into BLRM_SLAVE_STOPPED state * when an error is encountered in BLRM_BINLOGDUMP state. * Server error code and msg are reported via SHOW SLAVE STATUS * 03/08/2015 Massimiliano Pinto Initial implementation of transaction safety * 13/08/2015 Massimiliano Pinto Addition of heartbeat check * 23/08/2015 Massimiliano Pinto Added strerror_r * 26/08/2015 Massimiliano Pinto Added MariaDB 10 GTID event check with flags = 0 * This is the current supported condition for detecting * MariaDB 10 transaction start point. * It's no longer using QUERY_EVENT with BEGIN * 25/09/2015 Massimiliano Pinto Addition of lastEventReceived for slaves * 23/10/2015 Markus Makela Added current_safe_event * 26/04/2016 Massimiliano Pinto Added MariaDB 10.0 and 10.1 GTID event flags detection * * @endverbatim */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* Temporary requirement for auth data */ #include static GWBUF *blr_make_query(char *statement); static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router); void encode_value(unsigned char *data, unsigned int value, int len); void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr); void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr, blr_thread_role_t role); static void *CreateMySQLAuthData(char *username, char *password, char *database); void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len); void blr_master_close(ROUTER_INSTANCE *); char *blr_extract_column(GWBUF *buf, int col); void poll_fake_write_event(DCB *dcb); GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end); static void blr_check_last_master_event(void *inst); extern int blr_check_heartbeat(ROUTER_INSTANCE *router); static void blr_log_identity(ROUTER_INSTANCE *router); static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code); int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf); void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len); static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len); static int keepalive = 1; /** * blr_start_master - controls the connection of the binlog router to the * master MySQL server and triggers the slave registration process for * the router. * * @param router The router instance */ void blr_start_master(void* data) { ROUTER_INSTANCE *router = (ROUTER_INSTANCE*)data; DCB *client; router->stats.n_binlogs_ses = 0; spinlock_acquire(&router->lock); if (router->master_state != BLRM_UNCONNECTED) { if (router->master_state != BLRM_SLAVE_STOPPED) { MXS_ERROR("%s: Master Connect: Unexpected master state %s\n", router->service->name, blrm_states[router->master_state]); } else { MXS_NOTICE("%s: Master Connect: binlog state is %s\n", router->service->name, blrm_states[router->master_state]); } spinlock_release(&router->lock); return; } router->master_state = BLRM_CONNECTING; /* Discard the queued residual data */ while (router->residual) { router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual)); } router->residual = NULL; spinlock_release(&router->lock); if ((client = dcb_alloc(DCB_ROLE_INTERNAL, NULL)) == NULL) { MXS_ERROR("Binlog router: failed to create DCB for dummy client"); return; } router->client = client; client->state = DCB_STATE_POLLING; /* Fake the client is reading */ client->data = CreateMySQLAuthData(router->user, router->password, ""); if ((router->session = session_alloc(router->service, client)) == NULL) { MXS_ERROR("Binlog router: failed to create session for connection to master"); return; } client->session = router->session; if ((router->master = dcb_connect(router->service->dbref->server, router->session, BLR_PROTOCOL)) == NULL) { char *name; if ((name = malloc(strlen(router->service->name) + strlen(" Master") + 1)) != NULL) { sprintf(name, "%s Master", router->service->name); hktask_oneshot(name, blr_start_master, router, BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); free(name); } if (router->retry_backoff > BLR_MAX_BACKOFF) { router->retry_backoff = BLR_MAX_BACKOFF; } MXS_ERROR("Binlog router: failed to connect to master server '%s'", router->service->dbref->server->unique_name); return; } router->master->remote = strdup(router->service->dbref->server->name); MXS_NOTICE("%s: attempting to connect to master server %s:%d, binlog %s, pos %lu", router->service->name, router->service->dbref->server->name, router->service->dbref->server->port, router->binlog_name, router->current_pos); router->connect_time = time(0); if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive ))) { perror("setsockopt"); } router->master_state = BLRM_AUTHENTICATED; router->master->func.write(router->master, blr_make_query("SELECT UNIX_TIMESTAMP()")); router->master_state = BLRM_TIMESTAMP; router->stats.n_masterstarts++; } /** * Reconnect to the master server. * * IMPORTANT - must be called with router->active_logs set by the * thread that set active_logs. * * @param router The router instance */ static void blr_restart_master(ROUTER_INSTANCE *router) { dcb_close(router->client); /* Discard the queued residual data */ while (router->residual) { router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual)); } router->residual = NULL; /* Now it is safe to unleash other threads on this router instance */ spinlock_acquire(&router->lock); router->reconnect_pending = 0; router->active_logs = 0; spinlock_release(&router->lock); if (router->master_state < BLRM_BINLOGDUMP) { char *name; router->master_state = BLRM_UNCONNECTED; if ((name = malloc(strlen(router->service->name) + strlen(" Master") + 1)) != NULL) { sprintf(name, "%s Master", router->service->name); hktask_oneshot(name, blr_start_master, router, BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); free(name); } if (router->retry_backoff > BLR_MAX_BACKOFF) { router->retry_backoff = BLR_MAX_BACKOFF; } } else { router->master_state = BLRM_UNCONNECTED; blr_start_master(router); } } /** * Request a reconnect to the master. * * If another thread is active processing messages from the master * then merely set a flag for that thread to do the restart. If no * threads are active then directly call the restart routine to * reconnect to the master. * * @param router The router instance */ void blr_master_reconnect(ROUTER_INSTANCE *router) { int do_reconnect = 0; if (router->master_state == BLRM_SLAVE_STOPPED) { return; } spinlock_acquire(&router->lock); if (router->active_logs) { /* Currently processing a response, set a flag * and get the thread that is process a response * to deal with the reconnect. */ router->reconnect_pending = 1; router->stats.n_delayedreconnects++; } else { router->active_logs = 1; do_reconnect = 1; } spinlock_release(&router->lock); if (do_reconnect) { blr_restart_master(router); spinlock_acquire(&router->lock); router->active_logs = 0; spinlock_release(&router->lock); } } /** * Shutdown a connection to the master * * @param router The router instance */ void blr_master_close(ROUTER_INSTANCE *router) { dcb_close(router->master); router->master_state = BLRM_UNCONNECTED; router->master_event_state = BLR_EVENT_DONE; } /** * Mark this master connection for a delayed reconnect, used during * error recovery to cause a reconnect after 60 seconds. * * @param router The router instance */ void blr_master_delayed_connect(ROUTER_INSTANCE *router) { char *name; if ((name = malloc(strlen(router->service->name) + strlen(" Master Recovery") + 1)) != NULL) { sprintf(name, "%s Master Recovery", router->service->name); hktask_oneshot(name, blr_start_master, router, 60); free(name); } } /** * Binlog router master side state machine event handler. * * Handles an incoming response from the master server to the binlog * router. * * @param router The router instance * @param buf The incoming packet */ void blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) { char query[BLRM_MASTER_REGITRATION_QUERY_LEN + 1]; char task_name[BLRM_TASK_NAME_LEN + 1] = ""; atomic_add(&router->handling_threads, 1); ss_dassert(router->handling_threads == 1); spinlock_acquire(&router->lock); router->active_logs = 1; spinlock_release(&router->lock); if (router->master_state > BLRM_MAXSTATE) { MXS_ERROR("Invalid master state machine state (%d) for binlog router.", router->master_state); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); if (router->reconnect_pending) { router->active_logs = 0; spinlock_release(&router->lock); atomic_add(&router->handling_threads, -1); MXS_ERROR("%s: Pending reconnect in state %s.", router->service->name, blrm_states[router->master_state]); blr_restart_master(router); return; } router->active_logs = 0; spinlock_release(&router->lock); atomic_add(&router->handling_threads, -1); return; } if (router->master_state == BLRM_GTIDMODE && MYSQL_RESPONSE_ERR(buf)) { /* * If we get an error response to the GTID Mode then we * asusme the server does not support GTID modes and * continue. The error is saved and replayed to slaves if * they also request the GTID mode. */ MXS_ERROR("%s: Master server does not support GTID Mode.", router->service->name); } else if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf)) { char *msg_err = NULL; int msg_len = 0; int len = gwbuf_length(buf); unsigned long mysql_errno = extract_field(MYSQL_ERROR_CODE(buf), 16); msg_len = len - 7 - 6; // +7 is where msg starts, 6 is skipped the status message (#42000) msg_err = (char *)malloc(msg_len + 1); if (msg_err) { // skip status message only as MYSQL_RESPONSE_ERR(buf) points to GWBUF_DATA(buf) +7 strncpy(msg_err, (char *)(MYSQL_ERROR_MSG(buf) + 6), msg_len); /* NULL terminated error string */ *(msg_err + msg_len) = '\0'; } MXS_ERROR("%s: Received error: %lu, '%s' from master during '%s' phase " "of the master state machine.", router->service->name, mysql_errno, msg_err ? msg_err : "(memory failure)", blrm_states[router->master_state]); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); /* set mysql errno */ router->m_errno = mysql_errno; /* set mysql error message */ if (router->m_errmsg) { free(router->m_errmsg); } router->m_errmsg = msg_err ? msg_err : "(memory failure)"; router->active_logs = 0; if (router->reconnect_pending) { spinlock_release(&router->lock); atomic_add(&router->handling_threads, -1); blr_restart_master(router); return; } spinlock_release(&router->lock); atomic_add(&router->handling_threads, -1); return; } switch (router->master_state) { case BLRM_TIMESTAMP: // Response to a timestamp message, no need to save this. gwbuf_consume(buf, GWBUF_LENGTH(buf)); buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); router->master_state = BLRM_SERVERID; router->master->func.write(router->master, buf); router->retry_backoff = 1; break; case BLRM_SERVERID: { char *val = blr_extract_column(buf, 2); // Response to fetch of master's server-id if (router->saved_master.server_id) { GWBUF_CONSUME_ALL(router->saved_master.server_id); } router->saved_master.server_id = buf; blr_cache_response(router, "serverid", buf); // set router->masterid from master server-id if it's not set by the config option if (router->masterid == 0) { router->masterid = atoi(val); } { char str[BLRM_SET_HEARTBEAT_QUERY_LEN]; sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat); buf = blr_make_query(str); } router->master_state = BLRM_HBPERIOD; router->master->func.write(router->master, buf); free(val); break; } case BLRM_HBPERIOD: // Response to set the heartbeat period if (router->saved_master.heartbeat) { GWBUF_CONSUME_ALL(router->saved_master.heartbeat); } router->saved_master.heartbeat = buf; blr_cache_response(router, "heartbeat", buf); buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); router->master_state = BLRM_CHKSUM1; router->master->func.write(router->master, buf); break; case BLRM_CHKSUM1: // Response to set the master binlog checksum if (router->saved_master.chksum1) { GWBUF_CONSUME_ALL(router->saved_master.chksum1); } router->saved_master.chksum1 = buf; blr_cache_response(router, "chksum1", buf); buf = blr_make_query("SELECT @master_binlog_checksum"); router->master_state = BLRM_CHKSUM2; router->master->func.write(router->master, buf); break; case BLRM_CHKSUM2: { char *val = blr_extract_column(buf, 1); if (val && strncasecmp(val, "NONE", 4) == 0) { router->master_chksum = false; } if (val) { free(val); } // Response to the master_binlog_checksum, should be stored if (router->saved_master.chksum2) { GWBUF_CONSUME_ALL(router->saved_master.chksum2); } router->saved_master.chksum2 = buf; blr_cache_response(router, "chksum2", buf); if (router->mariadb10_compat) { buf = blr_make_query("SET @mariadb_slave_capability=4"); router->master_state = BLRM_MARIADB10; } else { buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); router->master_state = BLRM_GTIDMODE; } router->master->func.write(router->master, buf); break; } case BLRM_MARIADB10: // Response to the SET @mariadb_slave_capability=4, should be stored if (router->saved_master.mariadb10) { GWBUF_CONSUME_ALL(router->saved_master.mariadb10); } router->saved_master.mariadb10 = buf; blr_cache_response(router, "mariadb10", buf); buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); router->master_state = BLRM_MUUID; router->master->func.write(router->master, buf); break; case BLRM_GTIDMODE: // Response to the GTID_MODE, should be stored if (router->saved_master.gtid_mode) { GWBUF_CONSUME_ALL(router->saved_master.gtid_mode); } router->saved_master.gtid_mode = buf; blr_cache_response(router, "gtidmode", buf); buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); router->master_state = BLRM_MUUID; router->master->func.write(router->master, buf); break; case BLRM_MUUID: { char *key; char *val = NULL; key = blr_extract_column(buf, 1); if (key && strlen(key)) { val = blr_extract_column(buf, 2); } if (key) { free(key); } /* set the master_uuid from master if not set by the option */ if (router->set_master_uuid == NULL) { free(router->master_uuid); router->master_uuid = val; } else { router->master_uuid = router->set_master_uuid; } // Response to the SERVER_UUID, should be stored if (router->saved_master.uuid) { GWBUF_CONSUME_ALL(router->saved_master.uuid); } router->saved_master.uuid = buf; blr_cache_response(router, "uuid", buf); sprintf(query, "SET @slave_uuid='%s'", router->uuid); buf = blr_make_query(query); router->master_state = BLRM_SUUID; router->master->func.write(router->master, buf); break; } case BLRM_SUUID: // Response to the SET @server_uuid, should be stored if (router->saved_master.setslaveuuid) { GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid); } router->saved_master.setslaveuuid = buf; blr_cache_response(router, "ssuuid", buf); buf = blr_make_query("SET NAMES latin1"); router->master_state = BLRM_LATIN1; router->master->func.write(router->master, buf); break; case BLRM_LATIN1: // Response to the SET NAMES latin1, should be stored if (router->saved_master.setnames) { GWBUF_CONSUME_ALL(router->saved_master.setnames); } router->saved_master.setnames = buf; blr_cache_response(router, "setnames", buf); buf = blr_make_query("SET NAMES utf8"); router->master_state = BLRM_UTF8; router->master->func.write(router->master, buf); break; case BLRM_UTF8: // Response to the SET NAMES utf8, should be stored if (router->saved_master.utf8) { GWBUF_CONSUME_ALL(router->saved_master.utf8); } router->saved_master.utf8 = buf; blr_cache_response(router, "utf8", buf); buf = blr_make_query("SELECT 1"); router->master_state = BLRM_SELECT1; router->master->func.write(router->master, buf); break; case BLRM_SELECT1: // Response to the SELECT 1, should be stored if (router->saved_master.select1) { GWBUF_CONSUME_ALL(router->saved_master.select1); } router->saved_master.select1 = buf; blr_cache_response(router, "select1", buf); buf = blr_make_query("SELECT VERSION()"); router->master_state = BLRM_SELECTVER; router->master->func.write(router->master, buf); break; case BLRM_SELECTVER: // Response to SELECT VERSION should be stored if (router->saved_master.selectver) { GWBUF_CONSUME_ALL(router->saved_master.selectver); } router->saved_master.selectver = buf; blr_cache_response(router, "selectver", buf); buf = blr_make_query("SELECT @@version_comment limit 1"); router->master_state = BLRM_SELECTVERCOM; router->master->func.write(router->master, buf); break; case BLRM_SELECTVERCOM: // Response to SELECT @@version_comment should be stored if (router->saved_master.selectvercom) { GWBUF_CONSUME_ALL(router->saved_master.selectvercom); } router->saved_master.selectvercom = buf; blr_cache_response(router, "selectvercom", buf); buf = blr_make_query("SELECT @@hostname"); router->master_state = BLRM_SELECTHOSTNAME; router->master->func.write(router->master, buf); break; case BLRM_SELECTHOSTNAME: // Response to SELECT @@hostname should be stored if (router->saved_master.selecthostname) { GWBUF_CONSUME_ALL(router->saved_master.selecthostname); } router->saved_master.selecthostname = buf; blr_cache_response(router, "selecthostname", buf); buf = blr_make_query("SELECT @@max_allowed_packet"); router->master_state = BLRM_MAP; router->master->func.write(router->master, buf); break; case BLRM_MAP: // Response to SELECT @@max_allowed_packet should be stored if (router->saved_master.map) { GWBUF_CONSUME_ALL(router->saved_master.map); } router->saved_master.map = buf; blr_cache_response(router, "map", buf); buf = blr_make_registration(router); router->master_state = BLRM_REGISTER; router->master->func.write(router->master, buf); break; case BLRM_REGISTER: // Request a dump of the binlog file buf = blr_make_binlog_dump(router); router->master_state = BLRM_BINLOGDUMP; router->master->func.write(router->master, buf); MXS_NOTICE("%s: Request binlog records from %s at " "position %lu from master server %s:%d", router->service->name, router->binlog_name, router->current_pos, router->service->dbref->server->name, router->service->dbref->server->port); /* Log binlog router identity */ blr_log_identity(router); break; case BLRM_BINLOGDUMP: /** * Main body, we have received a binlog record from the master */ blr_handle_binlog_record(router, buf); /** * Set heartbeat check task */ snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name); hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat); break; } if (router->reconnect_pending) { blr_restart_master(router); } spinlock_acquire(&router->lock); router->active_logs = 0; spinlock_release(&router->lock); atomic_add(&router->handling_threads, -1); } /** * Build a MySQL query into a GWBUF that we can send to the master database * * @param query The text of the query to send */ static GWBUF * blr_make_query(char *query) { GWBUF *buf; unsigned char *data; int len; if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL) { return NULL; } data = GWBUF_DATA(buf); len = strlen(query) + 1; encode_value(&data[0], len, 24); // Payload length data[3] = 0; // Sequence id // Payload data[4] = COM_QUERY; // Command memcpy(&data[5], query, strlen(query)); return buf; } /** * Build a MySQL slave registration into a GWBUF that we can send to the * master database * * @param router The router instance * @return A MySQL Replication registration message in a GWBUF structure */ static GWBUF * blr_make_registration(ROUTER_INSTANCE *router) { GWBUF *buf; unsigned char *data; int len = 18; int port = 3306; if ((buf = gwbuf_alloc(len + 4)) == NULL) { return NULL; } data = GWBUF_DATA(buf); encode_value(&data[0], len, 24); // Payload length data[3] = 0; // Sequence ID data[4] = COM_REGISTER_SLAVE; // Command encode_value(&data[5], router->serverid, 32); // Slave Server ID data[9] = 0; // Slave hostname length data[10] = 0; // Slave username length data[11] = 0; // Slave password length if (router->service->ports) { port = router->service->ports->port; } encode_value(&data[12], port, 16); // Slave master port encode_value(&data[14], 0, 32); // Replication rank encode_value(&data[18], router->masterid, 32); // Master server-id return buf; } /** * Build a Binlog dump command into a GWBUF that we can send to the * master database * * @param router The router instance * @return A MySQL Replication COM_BINLOG_DUMP message in a GWBUF structure */ static GWBUF * blr_make_binlog_dump(ROUTER_INSTANCE *router) { GWBUF *buf; unsigned char *data; int binlog_file_len = strlen(router->binlog_name); /* COM_BINLOG_DUMP needs 11 bytes + binlogname (terminating NULL is not required) */ int len = 11 + binlog_file_len; if ((buf = gwbuf_alloc(len + 4)) == NULL) { return NULL; } data = GWBUF_DATA(buf); encode_value(&data[0], len, 24); // Payload length data[3] = 0; // Sequence ID data[4] = COM_BINLOG_DUMP; // Command encode_value(&data[5], router->current_pos, 32); // binlog position encode_value(&data[9], 0, 16); // Flags encode_value(&data[11], router->serverid, 32); // Server-id of MaxScale memcpy((char *)&data[15], router->binlog_name, binlog_file_len); // binlog filename return buf; } /** * Encode a value into a number of bits in a MySQL packet * * @param data Point to location in target packet * @param value The value to pack * @param len Number of bits to encode value into */ void encode_value(unsigned char *data, unsigned int value, int len) { while (len > 0) { *data++ = value & 0xff; value >>= 8; len -= 8; } } /** * blr_handle_binlog_record - we have received binlog records from * the master and we must now work out what to do with them. * * @param router The router instance * @param pkt The binlog records */ void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { uint8_t *msg = NULL, *ptr, *pdata; REP_HEADER hdr; unsigned int len = 0, reslen; unsigned int pkt_length; int no_residual = 1; int preslen = -1; int prev_length = -1; int n_bufs = -1, pn_bufs = -1; /* * Prepend any residual buffer to the buffer chain we have * been called with. */ if (router->residual) { pkt = gwbuf_append(router->residual, pkt); router->residual = NULL; no_residual = 0; } pkt_length = gwbuf_length(pkt); /* * Loop over all the packets while we still have some data * and the packet length is enough to hold a replication event * header. */ while (pkt && pkt_length > 24) { reslen = GWBUF_LENGTH(pkt); pdata = GWBUF_DATA(pkt); if (reslen < 3) // Payload length straddles buffers { /* Get the length of the packet from the residual and new packet */ if (reslen >= 3) { len = EXTRACT24(pdata); } else if (reslen == 2) { len = EXTRACT16(pdata); len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16); } else if (reslen == 1) { len = *pdata; len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8); } len += 4; // Allow space for the header } else { len = EXTRACT24(pdata) + 4; } /* len is now the payload length for the packet we are working on */ if (reslen < len && pkt_length >= len) { /* * The message is contained in more than the current * buffer, however we have the complete messasge in * this buffer and the chain of remaining buffers. * * Allocate a contiguous buffer for the binlog message * and copy the complete message into this buffer. */ int msg_remainder = len; GWBUF *p = pkt; if ((msg = malloc(len)) == NULL) { MXS_ERROR("Insufficient memory to buffer event " "of %d bytes. Binlog %s @ %lu.", len, router->binlog_name, router->current_pos); break; } n_bufs = 0; ptr = msg; while (p && msg_remainder > 0) { int plen = GWBUF_LENGTH(p); int n = (msg_remainder > plen ? plen : msg_remainder); memcpy(ptr, GWBUF_DATA(p), n); msg_remainder -= n; ptr += n; if (msg_remainder > 0) { p = p->next; } n_bufs++; } if (msg_remainder) { MXS_ERROR("Expected entire message in buffer " "chain, but failed to create complete " "message as expected. %s @ %lu", router->binlog_name, router->current_pos); free(msg); /* msg = NULL; Not needed unless msg will be referred to again */ break; } ptr = msg; } else if (reslen < len) { /* * The message is not fully contained in the current * and we do not have the complete message in the * buffer chain. Therefore we must stop processing * until we receive the next buffer. */ router->stats.n_residuals++; MXS_DEBUG("Residual data left after %lu records. %s @ %lu", router->stats.n_binlogs, router->binlog_name, router->current_pos); break; } else { /* * The message is fully contained in the current buffer */ ptr = pdata; n_bufs = 1; } /* * ptr now points at the current message in a contiguous buffer, * this buffer is either within the GWBUF or in a malloc'd * copy if the message straddles GWBUF's. */ if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING) { char *event_msg = ""; /* Packet is too small to be a binlog event */ if (ptr[4] == 0xfe) /* EOF Packet */ { event_msg = "end of file"; } else if (ptr[4] == 0xff) /* EOF Packet */ { event_msg = "error"; } MXS_NOTICE("Non-event message (%s) from master.", event_msg); } else { if (router->master_event_state == BLR_EVENT_DONE) { spinlock_acquire(&router->lock); router->stats.n_binlogs++; router->stats.n_binlogs_ses++; spinlock_release(&router->lock); blr_extract_header(ptr, &hdr); /* Sanity check */ if (hdr.ok == 0) { if (hdr.event_size != len - 5 && (hdr.event_size + 1) < MYSQL_PACKET_LENGTH_MAX) { MXS_ERROR("Packet length is %d, but event size is %d, " "binlog file %s position %lu " "reslen is %d and preslen is %d, " "length of previous event %d. %s", len, hdr.event_size, router->binlog_name, router->current_pos, reslen, preslen, prev_length, (prev_length == -1 ? (no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")); blr_log_packet(LOG_ERR, "Packet:", ptr, len); MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, " "the previous events was contained in %d GWBUFs", router->lastEventReceived, n_bufs, pn_bufs); if (msg) { free(msg); /* msg = NULL; Not needed unless msg will be referred to again */ } break; } else if ((hdr.event_size + 1) >= MYSQL_PACKET_LENGTH_MAX) { router->master_event_state = BLR_EVENT_STARTED; /** Store the header for later use */ memcpy(&router->stored_header, &hdr, sizeof(hdr)); } /** Prepare the checksum variables for this event */ router->stored_checksum = crc32(0L, NULL, 0); router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN; router->partial_checksum_bytes = 0; } else { /* Terminate replication and exit from main loop */ blr_terminate_master_replication(router, ptr, len); gwbuf_free(pkt); pkt = NULL; pkt_length = 0; break; } if (hdr.ok == 0) { spinlock_acquire(&router->lock); /* set mysql errno to 0 */ router->m_errno = 0; /* Remove error message */ if (router->m_errmsg) { free(router->m_errmsg); } router->m_errmsg = NULL; spinlock_release(&router->lock); #ifdef SHOW_EVENTS printf("blr: len %lu, event type 0x%02x, flags 0x%04x, " "event size %d, event timestamp %lu\n", (unsigned long)len - 4, hdr.event_type, hdr.flags, hdr.event_size, (unsigned long)hdr.timestamp); #endif } } /* pending large event */ if (router->master_event_state != BLR_EVENT_DONE) { if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX) { /** This is the last packet, we can now proceed to distribute * the event afer it has been written to disk */ ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE); router->master_event_state = BLR_EVENT_COMPLETE; memcpy(&hdr, &router->stored_header, sizeof(hdr)); } else { /* current partial event is being written to disk file */ uint32_t offset = MYSQL_HEADER_LEN; uint32_t extra_bytes = MYSQL_HEADER_LEN; /** Don't write the OK byte into the binlog */ if (router->master_event_state == BLR_EVENT_STARTED) { offset = MYSQL_HEADER_LEN + 1; router->master_event_state = BLR_EVENT_ONGOING; extra_bytes = MYSQL_HEADER_LEN + 1; } if (router->master_chksum) { uint32_t size = (len - extra_bytes) < router->checksum_size ? len - extra_bytes : router->checksum_size; router->stored_checksum = crc32(router->stored_checksum, ptr + offset, size); router->checksum_size -= size; if (router->checksum_size == 0 && size < len - offset) { extract_checksum(router, ptr + offset + size, len - offset - size); } } if (blr_write_data_into_binlog(router, len - offset, ptr + offset) == 0) { /** Failed to write to the binlog file, destroy the buffer * chain and close the connection with the master */ while (pkt) { pkt = GWBUF_CONSUME_ALL(pkt); } blr_master_close(router); blr_master_delayed_connect(router); return; } pkt = gwbuf_consume(pkt, len); pkt_length -= len; continue; } } /* * First check that the checksum we calculate matches the * checksum in the packet we received. */ if (router->master_chksum) { uint32_t pktsum, offset = MYSQL_HEADER_LEN; uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN; if (router->master_event_state == BLR_EVENT_DONE) { /** Set the pointer offset to the first byte after * the header and OK byte */ offset = MYSQL_HEADER_LEN + 1; size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1; } size = MIN(size, router->checksum_size); if (router->checksum_size > 0) { router->stored_checksum = crc32(router->stored_checksum, ptr + offset, size); router->checksum_size -= size; } if (router->checksum_size == 0 && size < len - offset) { extract_checksum(router, ptr + offset + size, len - offset - size); } if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN) { pktsum = EXTRACT32(router->partial_checksum); if (pktsum != router->stored_checksum) { router->stats.n_badcrc++; free(msg); /* msg = NULL; Not needed unless msg will be referred to again */ MXS_ERROR("%s: Checksum error in event from master, " "binlog %s @ %lu. Closing master connection.", router->service->name, router->binlog_name, router->current_pos); blr_master_close(router); blr_master_delayed_connect(router); return; } } else { pkt = gwbuf_consume(pkt, len); pkt_length -= len; continue; } } if (hdr.ok == 0) { router->lastEventReceived = hdr.event_type; router->lastEventTimestamp = hdr.timestamp; /** * Check for an open transaction, if the option is set * Only complete transactions should be sent to sleves * * If a trasaction is pending router->binlog_position * won't be updated to router->current_pos */ spinlock_acquire(&router->binlog_lock); if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) { /* no pending transaction: set current_pos to binlog_position */ router->binlog_position = router->current_pos; router->current_safe_event = router->current_pos; } spinlock_release(&router->binlog_lock); /** * Detect transactions in events * Only complete transactions should be sent to sleves */ /* If MariaDB 10 compatibility: * check for MARIADB10_GTID_EVENT with flags = 0 * This marks the transaction starts instead of * QUERY_EVENT with "BEGIN" */ if (router->trx_safe && router->master_event_state == BLR_EVENT_DONE) { if (router->mariadb10_compat) { if (hdr.event_type == MARIADB10_GTID_EVENT) { uint64_t n_sequence; uint32_t domainid; unsigned int flags; n_sequence = extract_field(ptr + 4 + 20, 64); domainid = extract_field(ptr + 4 + 20 + 8, 32); flags = *(ptr + 4 + 20 + 8 + 4); if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0) { spinlock_acquire(&router->binlog_lock); if (router->pending_transaction > 0) { MXS_ERROR("A MariaDB 10 transaction " "is already open " "@ %lu (GTID %u-%u-%lu) and " "a new one starts @ %lu", router->binlog_position, domainid, hdr.serverid, n_sequence, router->current_pos); // An action should be taken here } router->pending_transaction = 1; spinlock_release(&router->binlog_lock); } } } /** * look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT */ if (hdr.event_type == QUERY_EVENT) { char *statement_sql; int db_name_len, var_block_len, statement_len; db_name_len = ptr[4 + 20 + 4 + 4]; var_block_len = ptr[4 + 20 + 4 + 4 + 1 + 2]; statement_len = len - (4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len); statement_sql = calloc(1, statement_len + 1); strncpy(statement_sql, (char *)ptr + 4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len, statement_len); spinlock_acquire(&router->binlog_lock); /* Check for BEGIN (it comes for START TRANSACTION too) */ if (strncmp(statement_sql, "BEGIN", 5) == 0) { if (router->pending_transaction > 0) { MXS_ERROR("A transaction is already open " "@ %lu and a new one starts @ %lu", router->binlog_position, router->current_pos); // An action should be taken here } router->pending_transaction = 1; } /* Check for COMMIT in non transactional store engines */ if (strncmp(statement_sql, "COMMIT", 6) == 0) { router->pending_transaction = 2; } spinlock_release(&router->binlog_lock); free(statement_sql); } /* Check for COMMIT in Transactional engines, i.e InnoDB */ if (hdr.event_type == XID_EVENT) { spinlock_acquire(&router->binlog_lock); if (router->pending_transaction) { router->pending_transaction = 3; } spinlock_release(&router->binlog_lock); } } /** Gather statistics about the replication event types */ int event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE; if (hdr.event_type <= event_limit) { router->stats.events[hdr.event_type]++; } if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0) { // Fake format description message MXS_DEBUG("Replication fake event. " "Binlog %s @ %lu.", router->binlog_name, router->current_pos); router->stats.n_fakeevents++; if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) { uint8_t *new_fde; unsigned int new_fde_len; /* * We need to save this to replay to new * slaves that attach later. */ new_fde_len = hdr.event_size; new_fde = malloc(hdr.event_size); if (new_fde) { memcpy(new_fde, ptr + 5, hdr.event_size); if (router->saved_master.fde_event) { free(router->saved_master.fde_event); } router->saved_master.fde_event = new_fde; router->saved_master.fde_len = new_fde_len; } else { MXS_ERROR("%s: Received a format description " "event that MaxScale was unable to " "record. Event length is %d.", router->service->name, hdr.event_size); blr_log_packet(LOG_ERR, "Format Description Event:", ptr, len); } } } else { if (hdr.event_type == HEARTBEAT_EVENT) { #ifdef SHOW_EVENTS printf("Replication heartbeat\n"); #endif MXS_DEBUG("Replication heartbeat. " "Binlog %s @ %lu.", router->binlog_name, router->current_pos); router->stats.n_heartbeats++; if (router->pending_transaction) { router->stats.lastReply = time(0); } } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) { ptr = ptr + 4; // Skip header uint32_t offset = 4; if (router->master_event_state == BLR_EVENT_STARTED || router->master_event_state == BLR_EVENT_DONE) { ptr++; offset++; } if (hdr.event_type == ROTATE_EVENT) { spinlock_acquire(&router->binlog_lock); router->rotating = 1; spinlock_release(&router->binlog_lock); } /* Current event is being written to disk file. * It is possible for an empty packet to be sent if an * event is exactly 2^24 bytes long. In this case the * empty packet should be discarded. */ if (len > MYSQL_HEADER_LEN && blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0) { /* * Failed to write to the * binlog file, destroy the * buffer chain and close the * connection with the master */ while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL) { ; } blr_master_close(router); blr_master_delayed_connect(router); return; } /* Check for rotete event */ if (hdr.event_type == ROTATE_EVENT) { if (!blr_rotate_event(router, ptr, &hdr)) { /* * Failed to write to the * binlog file, destroy the * buffer chain and close the * connection with the master */ while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL) { ; } blr_master_close(router); blr_master_delayed_connect(router); return; } } /** * Distributing binlog events to slaves * may depend on pending transaction */ spinlock_acquire(&router->binlog_lock); if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) { router->binlog_position = router->current_pos; router->current_safe_event = router->last_event_pos; spinlock_release(&router->binlog_lock); if (router->master_event_state == BLR_EVENT_COMPLETE) { /** Read the complete event from the disk */ GWBUF *record = blr_read_events_from_pos(router, router->last_event_pos, &hdr, hdr.next_pos); if (record) { uint8_t *data = GWBUF_DATA(record); blr_distribute_binlog_record(router, &hdr, data, BLR_THREAD_ROLE_MASTER_LARGE_NOTRX); gwbuf_free(record); } else { MXS_ERROR("Failed to read event at position" "%lu with a size of %u bytes.", router->last_event_pos, hdr.event_size); } } else { /* Now distribute events */ blr_distribute_binlog_record(router, &hdr, ptr, BLR_THREAD_ROLE_MASTER_NOTRX); } } else { /** * If transaction is closed: * * 1) read current binlog starting * from router->binlog_position * * 2) distribute read event * * 3) set router->binlog_position to * router->current_pos * */ if (router->pending_transaction > 1) { unsigned long long pos; unsigned long long end_pos; GWBUF *record; uint8_t *raw_data; REP_HEADER new_hdr; pos = router->binlog_position; end_pos = router->current_pos; spinlock_release(&router->binlog_lock); while ((record = blr_read_events_from_pos(router, pos, &new_hdr, end_pos)) != NULL) { raw_data = GWBUF_DATA(record); /* distribute event */ blr_distribute_binlog_record(router, &new_hdr, raw_data, BLR_THREAD_ROLE_MASTER_TRX); spinlock_acquire(&router->binlog_lock); /** The current safe position is only updated * if it points to the event we just distributed. */ if (router->current_safe_event == pos) { router->current_safe_event = new_hdr.next_pos; } else { MXS_ERROR("Current safe event (%lu) does" " not point at the event we " "just sent (%llu) from binlog file %s. " "Last commit at %lu, last write at %lu.", router->current_safe_event, pos, router->binlog_name, router->last_safe_pos, router->last_written); } pos = new_hdr.next_pos; spinlock_release(&router->binlog_lock); gwbuf_free(record); } /* Check whether binlog records has been read in previous loop */ if (pos < router->current_pos) { char err_message[BINLOG_ERROR_MSG_LEN + 1]; err_message[BINLOG_ERROR_MSG_LEN] = '\0'; /* No event has been sent */ if (pos == router->binlog_position) { MXS_ERROR("No events distributed to slaves for a pending " "transaction in %s at %lu. " "Last event from master at %lu", router->binlog_name, router->binlog_position, router->current_pos); strncpy(err_message, "No transaction events sent", BINLOG_ERROR_MSG_LEN); } else { /* Some events have been sent */ MXS_ERROR("Some events were not distributed to slaves for a " "pending transaction in %s at %lu. Last distributed " "even at %llu, last event from master at %lu", router->binlog_name, router->binlog_position, pos, router->current_pos); strncpy(err_message, "Incomplete transaction events sent", BINLOG_ERROR_MSG_LEN); } /* distribute error message to registered slaves */ blr_distribute_error_message(router, err_message, "HY000", 1236); } /* update binlog_position and set pending to 0 */ spinlock_acquire(&router->binlog_lock); router->binlog_position = router->current_pos; router->pending_transaction = 0; spinlock_release(&router->binlog_lock); } else { spinlock_release(&router->binlog_lock); } } } else { router->stats.n_artificial++; MXS_DEBUG("Artificial event not written " "to disk or distributed. " "Type 0x%x, Length %d, Binlog " "%s @ %lu.", hdr.event_type, hdr.event_size, router->binlog_name, router->current_pos); ptr += 5; if (hdr.event_type == ROTATE_EVENT) { spinlock_acquire(&router->binlog_lock); router->rotating = 1; spinlock_release(&router->binlog_lock); if (!blr_rotate_event(router, ptr, &hdr)) { /* * Failed to write to the * binlog file, destroy the * buffer chain and close the * connection with the master */ while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL) { ; } blr_master_close(router); blr_master_delayed_connect(router); return; } } } } /** A large event is now fully received and processed */ if (router->master_event_state == BLR_EVENT_COMPLETE) { router->master_event_state = BLR_EVENT_DONE; } } else { blr_terminate_master_replication(router, ptr, len); } } if (msg) { free(msg); msg = NULL; } prev_length = len; while (len > 0) { unsigned int n, plen; plen = GWBUF_LENGTH(pkt); n = (plen < len ? plen : len); pkt = gwbuf_consume(pkt, n); len -= n; pkt_length -= n; } preslen = reslen; pn_bufs = n_bufs; } /* * Check if we have a residual, part binlog message to deal with. * Just simply store the GWBUF for next time */ if (pkt) { router->residual = pkt; ss_dassert(pkt_length != 0); } else { ss_dassert(pkt_length == 0); } blr_file_flush(router); } /** * Populate a header structure for a replication message from a GWBUF structure. * * @param pkt The incoming packet in a GWBUF chain * @param hdr The packet header to populate */ void blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr) { hdr->payload_len = EXTRACT24(ptr); hdr->seqno = ptr[3]; hdr->ok = ptr[4]; hdr->timestamp = EXTRACT32(&ptr[5]); hdr->event_type = ptr[9]; hdr->serverid = EXTRACT32(&ptr[10]); hdr->event_size = EXTRACT32(&ptr[14]); hdr->next_pos = EXTRACT32(&ptr[18]); hdr->flags = EXTRACT16(&ptr[22]); } /** * Process a binlog rotate event. * * @param router The instance of the router * @param ptr The packet containing the rotate event * @param hdr The replication message header * @return 1 if the file could be rotated, 0 otherwise. */ static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr) { int len, slen; uint64_t pos; char file[BINLOG_FNAMELEN + 1]; ptr += 19; // Skip event header len = hdr->event_size - 19; // Event size minus header pos = extract_field(ptr + 4, 32); pos <<= 32; pos |= extract_field(ptr, 32); slen = len - (8 + 4); // Allow for position and CRC if (router->master_chksum == 0) { slen += 4; } if (slen > BINLOG_FNAMELEN) { slen = BINLOG_FNAMELEN; } memcpy(file, ptr + 8, slen); file[slen] = 0; #ifdef VERBOSE_ROTATE printf("binlog rotate: "); while (len--) { printf("0x%02x ", *ptr++); } printf("\n"); printf("New file: %s @ %ld\n", file, pos); #endif strcpy(router->prevbinlog, router->binlog_name); int rotated = 1; if (strncmp(router->binlog_name, file, slen) != 0) { router->stats.n_rotates++; if (blr_file_rotate(router, file, pos) == 0) { rotated = 0; } } spinlock_acquire(&router->binlog_lock); router->rotating = 0; spinlock_release(&router->binlog_lock); return rotated; } /** * Create the auth data needed to be able to call dcb_connect. * * This doesn't really belong here and should be moved at some stage. */ static void * CreateMySQLAuthData(char *username, char *password, char *database) { MYSQL_session *auth_info; if (username == NULL || password == NULL) { MXS_ERROR("You must specify both username and password for the binlog router.\n"); return NULL; } if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL) { return NULL; } strncpy(auth_info->user, username, MYSQL_USER_MAXLEN); strncpy(auth_info->db, database, MYSQL_DATABASE_MAXLEN); gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1); return auth_info; } /** Actions that can be taken when an event is being distributed to the slaves*/ typedef enum { SLAVE_SEND_EVENT, /*< Send the event to the slave */ SLAVE_FORCE_CATCHUP, /*< Force the slave into catchup mode */ SLAVE_EVENT_ALREADY_SENT /*< The slave already has the event, don't send it */ } slave_event_action_t; /** * Distribute the binlog record we have just received to all the registered slaves. * * @param router The router instance * @param hdr The replication event header * @param ptr The raw replication event data */ void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr, blr_thread_role_t role) { ROUTER_SLAVE *slave; int action; unsigned int cstate; spinlock_acquire(&router->lock); slave = router->slaves; while (slave) { if (slave->state != BLRS_DUMPING) { slave = slave->next; continue; } spinlock_acquire(&slave->catch_lock); if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == CS_UPTODATE) { /* * This slave is reporting it is to date with the binlog of the * master running on this slave. * It has no thread running currently that is sending binlog * events. */ action = 1; slave->cstate |= CS_BUSY; } else if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == (CS_UPTODATE | CS_BUSY)) { /* * The slave is up to date with the binlog and a process is * running on this slave to send binlog events. */ slave->overrun = 1; action = 2; } else if ((slave->cstate & CS_UPTODATE) == 0) { /* Slave is in catchup mode */ action = 3; } else { MXS_ERROR("slave->cstate does not contain a meaningful state %d", slave->cstate); action = 0; } slave->stats.n_actions[action - 1]++; spinlock_release(&slave->catch_lock); if (action == 1) { spinlock_acquire(&router->binlog_lock); slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP; const bool same_file = strcmp(slave->binlogfile, router->binlog_name) == 0; const bool rotate = hdr->event_type == ROTATE_EVENT && strcmp(slave->binlogfile, router->prevbinlog) == 0; if (router->trx_safe && (same_file || rotate) && slave->binlog_pos == router->current_safe_event) { /** Slave needs the current event being distributed */ slave_action = SLAVE_SEND_EVENT; } else if (!router->trx_safe && (same_file || rotate) && slave->binlog_pos == router->last_event_pos) { /** Transaction safety is off */ slave_action = SLAVE_SEND_EVENT; } else if (same_file) { if (slave->binlog_pos == hdr->next_pos) { /* * Slave has already read record from file, no * need to distrbute this event */ slave_action = SLAVE_EVENT_ALREADY_SENT; } else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)) { /* * The slave is ahead of the master, this should never * happen. Force the slave to catchup mode in order to * try to resolve the issue. */ MXS_ERROR("Slave %s:%d server ID %d is ahead of expected position %s@%u. " "Expected position %d", slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port), slave->serverid, slave->binlogfile, slave->binlog_pos, hdr->next_pos - hdr->event_size); } /* If none of the above conditions were met, a slave in catchup * mode transitioned into up-to-date mode while we were * distributing events. The slave needs to be forced into * catchup mode since it isn't up to date anymore. */ } else if (rotate) { /** Slave is more than one binlog file behind */ MXS_WARNING("Slave %s:%d server ID %d is behind more than one binlog file " "from the master. Slave is using '%s' with position %d " "when master binlog file is '%s'.", slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port), slave->serverid, slave->binlogfile, slave->binlog_pos, router->binlog_name); } else { /** Slave is lagging behind */ MXS_WARNING("Slave %s:%d server ID %d is using binlog file '%s' with " "position %d. Master binlog file is '%s' at position %lu " "with last safe event at %lu.", slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port), slave->serverid, slave->binlogfile, slave->binlog_pos, router->binlog_name, router->current_pos, router->current_safe_event); } spinlock_release(&router->binlog_lock); /* * If slave_action is SLAVE_FORCE_CATCHUP then * the slave is not at the position it should be. Force it into * catchup mode rather than send this event. */ switch (slave_action) { char binlog_name[BINLOG_FNAMELEN + 1]; uint32_t binlog_pos; case SLAVE_SEND_EVENT: /* * The slave should be up to date, check that the binlog * position matches the event we have to distribute or * this is a rotate event. Send the event directly from * memory to the slave. */ slave->lastEventTimestamp = hdr->timestamp; slave->lastEventReceived = hdr->event_type; /* set lastReply */ if (router->send_slave_heartbeat) { slave->lastReply = time(0); } strcpy(binlog_name, slave->binlogfile); binlog_pos = slave->binlog_pos; if (hdr->event_type == ROTATE_EVENT) { blr_slave_rotate(router, slave, ptr); } if (blr_send_event(role, binlog_name, binlog_pos, slave, hdr, ptr)) { spinlock_acquire(&slave->catch_lock); if (hdr->event_type != ROTATE_EVENT) { slave->binlog_pos = hdr->next_pos; } if (slave->overrun) { slave->stats.n_overrun++; slave->overrun = 0; poll_fake_write_event(slave->dcb); } else { slave->cstate &= ~CS_BUSY; } spinlock_release(&slave->catch_lock); } else { MXS_WARNING("Slave %s:%i, server-id %d, binlog '%s, position %u: " "Master-thread could not send event to slave, closing connection.", slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port), slave->serverid, binlog_name, binlog_pos); slave->state = BLRS_ERRORED; dcb_close(slave->dcb); } break; case SLAVE_EVENT_ALREADY_SENT: spinlock_acquire(&slave->catch_lock); slave->cstate &= ~CS_BUSY; spinlock_release(&slave->catch_lock); break; case SLAVE_FORCE_CATCHUP: spinlock_acquire(&slave->catch_lock); cstate = slave->cstate; slave->cstate &= ~(CS_UPTODATE | CS_BUSY); slave->cstate |= CS_EXPECTCB; spinlock_release(&slave->catch_lock); if ((cstate & CS_UPTODATE) == CS_UPTODATE) { #ifdef STATE_CHANGE_LOGGING_ENABLED MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from " "up-to-date to catch-up in blr_distribute_binlog_record, " "binlog file '%s', position %lu.", router->service->name, slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port), slave->serverid, slave->binlogfile, (unsigned long)slave->binlog_pos); #endif } poll_fake_write_event(slave->dcb); break; } } else if (action == 3) { /* Slave is not up to date * Check if it is either expecting a callback or * is busy processing a callback */ spinlock_acquire(&slave->catch_lock); if ((slave->cstate & (CS_EXPECTCB | CS_BUSY)) == 0) { slave->cstate |= CS_EXPECTCB; spinlock_release(&slave->catch_lock); poll_fake_write_event(slave->dcb); } else { spinlock_release(&slave->catch_lock); } } slave = slave->next; } spinlock_release(&router->lock); } /** * Write a raw event (the first 40 bytes at most) to a log file * * @param priority The syslog priority of the message (LOG_ERR, LOG_WARNING, etc.) * @param msg A textual message to write before the packet * @param ptr Pointer to the message buffer * @param len Length of message packet */ static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len) { char buf[400] = ""; char *bufp; int i; bufp = buf; bufp += sprintf(bufp, "%s length = %d: ", msg, len); for (i = 0; i < len && i < 40; i++) { bufp += sprintf(bufp, "0x%02x ", ptr[i]); } if (i < len) { MXS_LOG_MESSAGE(priority, "%s...", buf); } else { MXS_LOG_MESSAGE(priority, "%s", buf); } } /** * Check if the master connection is in place and we * are downlaoding binlogs * * @param router The router instance * @return non-zero if we are recivign binlog records */ int blr_master_connected(ROUTER_INSTANCE *router) { return router->master_state == BLRM_BINLOGDUMP; } /** * Extract a result value from the set of messages that make up a * MySQL response packet. * * @param buf The GWBUF containing the response * @param col The column number to return * @return The result form the column or NULL. The caller must free the result */ char * blr_extract_column(GWBUF *buf, int col) { uint8_t *ptr; int len, ncol, collen; char *rval; if (buf == NULL) { return NULL; } ptr = (uint8_t *)GWBUF_DATA(buf); /* First packet should be the column count */ len = EXTRACT24(ptr); ptr += 3; if (*ptr != 1) // Check sequence number is 1 { return NULL; } ptr++; ncol = *ptr++; if (ncol < col) // Not that many column in result { return NULL; } // Now ptr points at the column definition while (ncol-- > 0) { len = EXTRACT24(ptr); ptr += 4; // Skip to payload ptr += len; // Skip over payload } // Now we should have an EOF packet len = EXTRACT24(ptr); ptr += 4; // Skip to payload if (*ptr != 0xfe) { return NULL; } ptr += len; // Finally we have reached the row len = EXTRACT24(ptr); ptr += 4; /** The first EOF packet signals the start of the resultset rows and the second EOF packet signals the end of the result set. If the resultset contains a second EOF packet right after the first one, the result set is empty and contains no rows. */ if (len == 5 && *ptr == 0xfe) { return NULL; } while (--col > 0) { collen = *ptr++; ptr += collen; } collen = *ptr++; if ((rval = malloc(collen + 1)) == NULL) { return NULL; } memcpy(rval, ptr, collen); rval[collen] = 0; // NULL terminate return rval; } /** * Read a replication event form current opened binlog into a GWBUF structure. * * @param router The router instance * @param pos Position of binlog record to read * @param hdr Binlog header to populate * @return The binlog record wrapped in a GWBUF structure */ GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr, unsigned long long pos_end) { unsigned long long end_pos = 0; uint8_t hdbuf[19]; uint8_t *data; GWBUF *result; int n; int event_limit; /* Get current binnlog position */ end_pos = pos_end; /* end of file reached, we're done */ if (pos == end_pos) { return NULL; } /* error */ if (pos > end_pos) { MXS_ERROR("Reading saved events, the specified pos %llu " "is ahead of current pos %lu for file %s", pos, router->current_pos, router->binlog_name); return NULL; } /* Read the event header information from the file */ if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19) { switch (n) { case 0: MXS_DEBUG("Reading saved events: reached end of binlog file at %llu.", pos); break; case -1: { char err_msg[STRERROR_BUFLEN]; MXS_ERROR("Reading saved events: failed to read binlog " "file %s at position %llu" " (%s).", router->binlog_name, pos, strerror_r(errno, err_msg, sizeof(err_msg))); if (errno == EBADF) { MXS_ERROR("Reading saved events: bad file descriptor for file %s" ", descriptor %d.", router->binlog_name, router->binlog_fd); } } break; default: MXS_ERROR("Reading saved events: short read when reading the header. " "Expected 19 bytes but got %d bytes. " "Binlog file is %s, position %llu", n, router->binlog_name, pos); break; } return NULL; } hdr->timestamp = EXTRACT32(hdbuf); hdr->event_type = hdbuf[4]; hdr->serverid = EXTRACT32(&hdbuf[5]); hdr->event_size = extract_field(&hdbuf[9], 32); hdr->next_pos = EXTRACT32(&hdbuf[13]); hdr->flags = EXTRACT16(&hdbuf[17]); event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE; if (hdr->event_type > event_limit) { MXS_ERROR("Reading saved events: invalid event type 0x%x. " "Binlog file is %s, position %llu", hdr->event_type, router->binlog_name, pos); return NULL; } if ((result = gwbuf_alloc(hdr->event_size)) == NULL) { MXS_ERROR("Reading saved events: failed to allocate memory for binlog entry, " "size %d at %llu.", hdr->event_size, pos); return NULL; } /* Copy event header*/ data = GWBUF_DATA(result); memcpy(data, hdbuf, 19); /* Read event data and put int into buffer after header */ if ((n = pread(router->binlog_fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19) { if (n == -1) { char err_msg[STRERROR_BUFLEN]; MXS_ERROR("Reading saved events: the event at %llu in %s. " "%s, expected %d bytes.", pos, router->binlog_name, strerror_r(errno, err_msg, sizeof(err_msg)), hdr->event_size - 19); } else { MXS_ERROR("Reading saved events: short read when reading " "the event at %llu in %s. " "Expected %d bytes got %d bytes.", pos, router->binlog_name, hdr->event_size - 19, n); if (end_pos - pos < hdr->event_size) { MXS_ERROR("Reading saved events: binlog event " "is close to the end of the binlog file, " "current file size is %llu.", end_pos); } } /* free buffer */ gwbuf_free(result); return NULL; } return result; } /** * Stop and start the master connection * * @param router The router instance */ void blr_stop_start_master(ROUTER_INSTANCE *router) { if (router->master) { if (router->master->fd != -1 && router->master->state == DCB_STATE_POLLING) { blr_master_close(router); } } spinlock_acquire(&router->lock); router->master_state = BLRM_SLAVE_STOPPED; /* set last_safe_pos */ router->last_safe_pos = router->binlog_position; /** * Set router->prevbinlog to router->binlog_name * The FDE event with current filename may arrive after STOP SLAVE is received */ if (strcmp(router->binlog_name, router->prevbinlog) != 0) { strncpy(router->prevbinlog, router->binlog_name, BINLOG_FNAMELEN); } if (router->client) { if (router->client->fd != -1 && router->client->state == DCB_STATE_POLLING) { dcb_close(router->client); router->client = NULL; } } /* Discard the queued residual data */ while (router->residual) { router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual)); } router->residual = NULL; router->master_state = BLRM_UNCONNECTED; spinlock_release(&router->lock); blr_master_reconnect(router); } /** * The heartbeat check function called from the housekeeper. * We can try a new master connection if current one is seen out of date * * @param router Current router instance */ static void blr_check_last_master_event(void *inst) { ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst; int master_check = 1; int master_state = BLRM_UNCONNECTED; char task_name[BLRM_TASK_NAME_LEN + 1] = ""; spinlock_acquire(&router->lock); master_check = blr_check_heartbeat(router); master_state = router->master_state; spinlock_release(&router->lock); if (!master_check) { /* * stop current master connection * and try a new connection */ blr_stop_start_master(router); } if ( (!master_check) || (master_state != BLRM_BINLOGDUMP) ) { /* * Remove the task, it will be added again * when master state is back to BLRM_BINLOGDUMP * by blr_master_response() */ snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name); hktask_remove(task_name); } } /** * Check last heartbeat or last received event against router->heartbeat time interval * * checked interval is againts (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME) * that is currently set to 1 * * @param router Current router instance * @return 0 if master connection must be closed and opened again, 1 otherwise */ int blr_check_heartbeat(ROUTER_INSTANCE *router) { time_t t_now = time(0); char *event_desc = NULL; if (router->master_state != BLRM_BINLOGDUMP) { return 1; } event_desc = blr_last_event_description(router); if (router->master_state == BLRM_BINLOGDUMP && router->lastEventReceived > 0) { if ((t_now - router->stats.lastReply) > (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME)) { MXS_ERROR("No event received from master %s:%d in heartbeat period (%lu seconds), " "last event (%s %d) received %lu seconds ago. Assuming connection is dead " "and reconnecting.", router->service->dbref->server->name, router->service->dbref->server->port, router->heartbeat, event_desc != NULL ? event_desc : "unknown", router->lastEventReceived, t_now - router->stats.lastReply); return 0; } } return 1; } /** * Log binlog router identy after master registration, state is BLRM_BINLOG_DUMP * * @param router The router instance */ static void blr_log_identity(ROUTER_INSTANCE *router) { char *master_uuid; char *master_hostname; char *master_version; if (router->set_master_version) { master_version = router->set_master_version; } else { master_version = blr_extract_column(router->saved_master.selectver, 1); } if (router->set_master_hostname) { master_hostname = router->set_master_hostname; } else { master_hostname = blr_extract_column(router->saved_master.selecthostname, 1); } if (router->set_master_uuid) { master_uuid = router->master_uuid; } else { master_uuid = blr_extract_column(router->saved_master.uuid, 2); } /* Seen by the master */ MXS_NOTICE("%s: identity seen by the master: " "server_id: %d, uuid: %s", router->service->name, router->serverid, (router->uuid == NULL ? "not available" : router->uuid)); /* Seen by the slaves */ /* MariaDB 5.5 and MariaDB don't have the MASTER_UUID var */ if (master_uuid == NULL) { MXS_NOTICE("%s: identity seen by the slaves: " "server_id: %d, hostname: %s, MySQL version: %s", router->service->name, router->masterid, (master_hostname == NULL ? "not available" : master_hostname), (master_version == NULL ? "not available" : master_version)); } else { MXS_NOTICE("%s: identity seen by the slaves: " "server_id: %d, uuid: %s, hostname: %s, MySQL version: %s", router->service->name, router->masterid, master_uuid, (master_hostname == NULL ? "not available" : master_hostname), (master_version == NULL ? "not available" : master_version)); } } /** * Distribute an error message to all the registered slaves. * * @param router The router instance * @param message The message to send * @param state The MySQL State for message * @param err_code The MySQL error code for message */ static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code) { ROUTER_SLAVE *slave; spinlock_acquire(&router->lock); slave = router->slaves; while (slave) { if (slave->state != BLRS_DUMPING) { slave = slave->next; continue; } /* send the error that stops slave replication */ blr_send_custom_error(slave->dcb, slave->seqno++, 0, message, state, err_code); slave = slave->next; } spinlock_release(&router->lock); } int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf) { int n; if ((n = pwrite(router->binlog_fd, buf, data_len, router->last_written)) != data_len) { char err_msg[STRERROR_BUFLEN]; MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. " "Truncating to previous record.", router->service->name, router->last_written, router->binlog_name, strerror_r(errno, err_msg, sizeof(err_msg))); /* Remove any partial event that was written */ if (ftruncate(router->binlog_fd, router->last_written)) { MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ", router->service->name, router->last_written, router->binlog_name, strerror_r(errno, err_msg, sizeof(err_msg))); } return 0; } router->last_written += data_len; return n; } /** * Send a replication event packet to a slave * * The first replication event packet contains one byte set to either * 0x0, 0xfe or 0xff which signals what the state of the replication stream is. * If the data pointed by @c buf is not the start of the replication header * and part of the replication event is already sent, @c first must be set to * false so that the first status byte is not sent again. * * @param slave Slave where the packet is sent to * @param buf Buffer containing the data * @param len Length of the data * @param first If this is the first packet of a multi-packet event * @return True on success, false when memory allocation fails */ bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first) { bool rval = true; unsigned int datalen = len + (first ? 1 : 0); GWBUF *buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN); if (buffer) { uint8_t *data = GWBUF_DATA(buffer); encode_value(data, datalen, 24); data += 3; *data++ = slave->seqno++; if (first) { *data++ = 0; // OK byte } if (len > 0) { memcpy(data, buf, len); } slave->stats.n_bytes += GWBUF_LENGTH(buffer); slave->dcb->func.write(slave->dcb, buffer); } else { MXS_ERROR("failed to allocate %ld bytes of memory when writing an" " event.", datalen + MYSQL_HEADER_LEN); rval = false; } return rval; } /** * Send a single replication event to a slave * * This sends the complete replication event to a slave. If the event size exceeds * the maximum size of a MySQL packet, it will be sent in multiple packets. * * @param role What is the role of the caller, slave or master. * @param binlog_name The name of the binlogfile. * @param binlog_pos The position in the binlogfile. * @param slave Slave where the event is sent to * @param hdr Replication header * @param buf Pointer to the replication event as it was read from the disk * @return True on success, false if memory allocation failed */ bool blr_send_event(blr_thread_role_t role, const char* binlog_name, uint32_t binlog_pos, ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf) { bool rval = true; if ((strcmp(slave->lsi_binlog_name, binlog_name) == 0) && (slave->lsi_binlog_pos == binlog_pos)) { MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s', position %u: " "thread %lu in the role of %s could not send the event, " "the event has already been sent by thread %lu in the role of %s. " "%u bytes buffered for writing in DCB %p. %lu events received from master.", slave->dcb->remote, ntohs((slave->dcb->ipv4).sin_port), slave->serverid, binlog_name, binlog_pos, thread_self(), ROLETOSTR(role), slave->lsi_sender_tid, ROLETOSTR(slave->lsi_sender_role), gwbuf_length(slave->dcb->writeq), slave->dcb, slave->router->stats.n_binlogs); return false; } /** Check if the event and the OK byte fit into a single packet */ if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX) { rval = blr_send_packet(slave, buf, hdr->event_size, true); } else { /** Total size of all the payloads in all the packets */ int64_t len = hdr->event_size + 1; bool first = true; while (rval && len > 0) { uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 : MIN(MYSQL_PACKET_LENGTH_MAX, len); if (blr_send_packet(slave, buf, payload_len, first)) { /** The check for exactly 0x00ffffff bytes needs to be done * here as well */ if (len == MYSQL_PACKET_LENGTH_MAX) { blr_send_packet(slave, buf, 0, false); } /** Add the extra byte written by blr_send_packet */ len -= first ? payload_len + 1 : payload_len; buf += payload_len; first = false; } else { rval = false; } } } slave->stats.n_events++; if (rval) { strcpy(slave->lsi_binlog_name, binlog_name); slave->lsi_binlog_pos = binlog_pos; slave->lsi_sender_role = role; slave->lsi_sender_tid = thread_self(); } else { MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.", hdr->event_size, slave->dcb->remote, ntohs(slave->dcb->ipv4.sin_port)); } return rval; } /** * Extract the checksum from the binlogs * * This updates the internal state of the router and will allow us to detect * if the checksum is split across two packets. * @param router Router instance * @param cksumptr Pointer to the checksum * @param len How much of the data is readable */ void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len) { uint8_t *ptr = cksumptr; while (ptr - cksumptr < len) { router->partial_checksum[router->partial_checksum_bytes] = *ptr; ptr++; router->partial_checksum_bytes++; } } /** * Stop the slave connection and log errors * * @param router Router instance * @param ptr Pointer to the start of the packet * @param len Length of the packet */ static void blr_terminate_master_replication(ROUTER_INSTANCE* router, uint8_t* ptr, int len) { unsigned long mysql_errno = extract_field(ptr + 5, 16); int msg_len = len - 7 - 6; // msg len is decreased by 7 and 6 char *msg_err = (char *)malloc(msg_len + 1); strncpy(msg_err, (char *)ptr + 7 + 6, msg_len); *(msg_err + msg_len) = '\0'; spinlock_acquire(&router->lock); char* old_errmsg = router->m_errmsg; router->m_errmsg = msg_err; router->m_errno = mysql_errno; router->master_state = BLRM_SLAVE_STOPPED; router->stats.n_binlog_errors++; spinlock_release(&router->lock); free(old_errmsg); MXS_ERROR("Error packet in binlog stream.%s @ %lu.", router->binlog_name, router->current_pos); }