3133 lines
		
	
	
		
			109 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			3133 lines
		
	
	
		
			109 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*lint -e662 */
 | |
| /*lint -e661 */
 | |
| 
 | |
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl.
 | |
|  *
 | |
|  * Change Date: 2019-07-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * @file blr_master.c - contains code for the router to master communication
 | |
|  *
 | |
|  * The binlog router is designed to be used in replication environments to
 | |
|  * increase the replication fanout of a master server. It provides a transparant
 | |
|  * mechanism to read the binlog entries for multiple slaves while requiring
 | |
|  * only a single connection to the actual master to support the slaves.
 | |
|  *
 | |
|  * The current prototype implement is designed to support MySQL 5.6 and has
 | |
|  * a number of limitations. This prototype is merely a proof of concept and
 | |
|  * should not be considered production ready.
 | |
|  *
 | |
|  * @verbatim
 | |
|  * Revision History
 | |
|  *
 | |
|  * Date         Who                 Description
 | |
|  * 02/04/2014   Mark Riddoch        Initial implementation
 | |
|  * 07/05/2015   Massimiliano Pinto  Added MariaDB 10 Compatibility
 | |
|  * 25/05/2015   Massimiliano Pinto  Added BLRM_SLAVE_STOPPED state
 | |
|  * 08/06/2015   Massimiliano Pinto  Added m_errno and m_errmsg
 | |
|  * 23/06/2015   Massimiliano Pinto  Master communication goes into BLRM_SLAVE_STOPPED state
 | |
|  *                                  when an error is encountered in BLRM_BINLOGDUMP state.
 | |
|  *                                  Server error code and msg are reported via SHOW SLAVE STATUS
 | |
|  * 03/08/2015   Massimiliano Pinto  Initial implementation of transaction safety
 | |
|  * 13/08/2015   Massimiliano Pinto  Addition of heartbeat check
 | |
|  * 23/08/2015   Massimiliano Pinto  Added strerror_r
 | |
|  * 26/08/2015   Massimiliano Pinto  Added MariaDB 10 GTID event check with flags = 0
 | |
|  *                                  This is the current supported condition for detecting
 | |
|  *                                  MariaDB 10 transaction start point.
 | |
|  *                                  It's no longer using QUERY_EVENT with BEGIN
 | |
|  * 25/09/2015   Massimiliano Pinto  Addition of lastEventReceived for slaves
 | |
|  * 23/10/2015   Markus Makela       Added current_safe_event
 | |
|  * 26/04/2016   Massimiliano Pinto  Added MariaDB 10.0 and 10.1 GTID event flags detection
 | |
|  * 22/07/2016   Massimiliano Pinto  Added semi_sync replication support
 | |
|  * 01/08/2016   Massimiliano Pinto  Added support for ANNOTATE_ROWS_EVENT in COM_BINLOG_DUMP
 | |
|  *
 | |
|  * @endverbatim
 | |
|  */
 | |
| #include <stdio.h>
 | |
| #include <stdlib.h>
 | |
| #include <string.h>
 | |
| #include <service.h>
 | |
| #include <server.h>
 | |
| #include <router.h>
 | |
| #include <atomic.h>
 | |
| #include <session.h>
 | |
| #include <blr.h>
 | |
| #include <dcb.h>
 | |
| #include <spinlock.h>
 | |
| #include <housekeeper.h>
 | |
| #include <buffer.h>
 | |
| 
 | |
| #include <sys/types.h>
 | |
| #include <sys/socket.h>
 | |
| 
 | |
| #include <skygw_types.h>
 | |
| #include <skygw_utils.h>
 | |
| #include <log_manager.h>
 | |
| 
 | |
| #include <rdtsc.h>
 | |
| #include <thread.h>
 | |
| 
 | |
| /* Temporary requirement for auth data */
 | |
| #include <mysql_client_server_protocol.h>
 | |
| #include <maxscale/alloc.h>
 | |
| 
 | |
| 
 | |
| 
 | |
| static GWBUF *blr_make_query(char *statement);
 | |
| static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
 | |
| static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
 | |
| void encode_value(unsigned char *data, unsigned int value, int len);
 | |
| void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
 | |
| static int  blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
 | |
| void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr,
 | |
|                                   blr_thread_role_t role);
 | |
| static void *CreateMySQLAuthData(char *username, char *password, char *database);
 | |
| void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
 | |
| static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len);
 | |
| void blr_master_close(ROUTER_INSTANCE *);
 | |
| char *blr_extract_column(GWBUF *buf, int col);
 | |
| void poll_fake_write_event(DCB *dcb);
 | |
| GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr,
 | |
|                                 unsigned long long pos_end);
 | |
| static void blr_check_last_master_event(void *inst);
 | |
| extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
 | |
| static void blr_log_identity(ROUTER_INSTANCE *router);
 | |
| static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state,
 | |
|                                          unsigned int err_code);
 | |
| static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
 | |
| static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
 | |
| static int blr_get_master_semisync(GWBUF *buf);
 | |
| 
 | |
| int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
 | |
| void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
 | |
| static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len);
 | |
| 
 | |
| static int keepalive = 1;
 | |
| 
 | |
| /** Transactio-Safety feature */
 | |
| typedef enum
 | |
| {
 | |
|     BLRM_NO_TRANSACTION, /*< No transaction */
 | |
|     BLRM_TRANSACTION_START, /*< A transaction is open*/
 | |
|     BLRM_COMMIT_SEEN, /*< Received COMMIT event in the current transaction */
 | |
|     BLRM_XID_EVENT_SEEN /*< Received XID event of current transaction */
 | |
| } master_transaction_t;
 | |
| 
 | |
| /** Master Semi-Sync capability */
 | |
| typedef enum
 | |
| {
 | |
|     MASTER_SEMISYNC_NOT_AVAILABLE, /*< Semi-Sync replication not available */
 | |
|     MASTER_SEMISYNC_DISABLED, /*< Semi-Sync is disabled */
 | |
|     MASTER_SEMISYNC_ENABLED /*< Semi-Sync is enabled */
 | |
| } master_semisync_capability_t;
 | |
| 
 | |
| #define MASTER_BYTES_BEFORE_EVENT 5
 | |
| #define MASTER_BYTES_BEFORE_EVENT_SEMI_SYNC MASTER_BYTES_BEFORE_EVENT + 2
 | |
| /* Semi-Sync indicator in network packet (byte 6) */
 | |
| #define BLR_MASTER_SEMI_SYNC_INDICATOR  0xef
 | |
| /* Semi-Sync flag ACK_REQ in network packet (byte 7) */
 | |
| #define BLR_MASTER_SEMI_SYNC_ACK_REQ    0x01
 | |
| 
 | |
| /**
 | |
|  * blr_start_master - controls the connection of the binlog router to the
 | |
|  * master MySQL server and triggers the slave registration process for
 | |
|  * the router.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  */
 | |
| void
 | |
| blr_start_master(void* data)
 | |
| {
 | |
|     ROUTER_INSTANCE *router = (ROUTER_INSTANCE*)data;
 | |
|     DCB *client;
 | |
| 
 | |
|     router->stats.n_binlogs_ses = 0;
 | |
|     spinlock_acquire(&router->lock);
 | |
|     if (router->master_state != BLRM_UNCONNECTED)
 | |
|     {
 | |
|         if (router->master_state != BLRM_SLAVE_STOPPED)
 | |
|         {
 | |
|             MXS_ERROR("%s: Master Connect: Unexpected master state %s\n",
 | |
|                       router->service->name, blrm_states[router->master_state]);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_NOTICE("%s: Master Connect: binlog state is %s\n",
 | |
|                        router->service->name, blrm_states[router->master_state]);
 | |
|         }
 | |
|         spinlock_release(&router->lock);
 | |
|         return;
 | |
|     }
 | |
|     router->master_state = BLRM_CONNECTING;
 | |
| 
 | |
|     /* Discard the queued residual data */
 | |
|     while (router->residual)
 | |
|     {
 | |
|         router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
 | |
|     }
 | |
|     router->residual = NULL;
 | |
| 
 | |
|     spinlock_release(&router->lock);
 | |
|     if ((client = dcb_alloc(DCB_ROLE_INTERNAL, NULL)) == NULL)
 | |
|     {
 | |
|         MXS_ERROR("Binlog router: failed to create DCB for dummy client");
 | |
|         return;
 | |
|     }
 | |
|     router->client = client;
 | |
|     client->state = DCB_STATE_POLLING;  /* Fake the client is reading */
 | |
|     client->data = CreateMySQLAuthData(router->user, router->password, "");
 | |
|     if ((router->session = session_alloc(router->service, client)) == NULL)
 | |
|     {
 | |
|         MXS_ERROR("Binlog router: failed to create session for connection to master");
 | |
|         return;
 | |
|     }
 | |
|     client->session = router->session;
 | |
|     if ((router->master = dcb_connect(router->service->dbref->server, router->session, BLR_PROTOCOL)) == NULL)
 | |
|     {
 | |
|         char *name = MXS_MALLOC(strlen(router->service->name) + strlen(" Master") + 1);
 | |
| 
 | |
|         if (name)
 | |
|         {
 | |
|             sprintf(name, "%s Master", router->service->name);
 | |
|             hktask_oneshot(name, blr_start_master, router,
 | |
|                            BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
 | |
|             MXS_FREE(name);
 | |
|         }
 | |
|         if (router->retry_backoff > BLR_MAX_BACKOFF)
 | |
|         {
 | |
|             router->retry_backoff = BLR_MAX_BACKOFF;
 | |
|         }
 | |
|         MXS_ERROR("Binlog router: failed to connect to master server '%s'",
 | |
|                   router->service->dbref->server->unique_name);
 | |
|         return;
 | |
|     }
 | |
|     router->master->remote = MXS_STRDUP_A(router->service->dbref->server->name);
 | |
| 
 | |
|     MXS_NOTICE("%s: attempting to connect to master server %s:%d, binlog %s, pos %lu",
 | |
|                router->service->name, router->service->dbref->server->name,
 | |
|                router->service->dbref->server->port, router->binlog_name, router->current_pos);
 | |
| 
 | |
|     router->connect_time = time(0);
 | |
| 
 | |
|     if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
 | |
|     {
 | |
|         perror("setsockopt");
 | |
|     }
 | |
| 
 | |
|     router->master_state = BLRM_AUTHENTICATED;
 | |
|     router->master->func.write(router->master, blr_make_query("SELECT UNIX_TIMESTAMP()"));
 | |
|     router->master_state = BLRM_TIMESTAMP;
 | |
| 
 | |
|     router->stats.n_masterstarts++;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Reconnect to the master server.
 | |
|  *
 | |
|  * IMPORTANT - must be called with router->active_logs set by the
 | |
|  * thread that set active_logs.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  */
 | |
| static void
 | |
| blr_restart_master(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     dcb_close(router->client);
 | |
| 
 | |
|     /* Discard the queued residual data */
 | |
|     while (router->residual)
 | |
|     {
 | |
|         router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
 | |
|     }
 | |
|     router->residual = NULL;
 | |
| 
 | |
|     /* Now it is safe to unleash other threads on this router instance */
 | |
|     spinlock_acquire(&router->lock);
 | |
|     router->reconnect_pending = 0;
 | |
|     router->active_logs = 0;
 | |
|     spinlock_release(&router->lock);
 | |
|     if (router->master_state < BLRM_BINLOGDUMP)
 | |
|     {
 | |
|         router->master_state = BLRM_UNCONNECTED;
 | |
| 
 | |
|         char *name = (char*)MXS_MALLOC(strlen(router->service->name)  + strlen(" Master") + 1);
 | |
| 
 | |
|         if (name)
 | |
|         {
 | |
|             sprintf(name, "%s Master", router->service->name);
 | |
|             hktask_oneshot(name, blr_start_master, router,
 | |
|                            BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
 | |
|             MXS_FREE(name);
 | |
|         }
 | |
|         if (router->retry_backoff > BLR_MAX_BACKOFF)
 | |
|         {
 | |
|             router->retry_backoff = BLR_MAX_BACKOFF;
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         router->master_state = BLRM_UNCONNECTED;
 | |
|         blr_start_master(router);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Request a reconnect to the master.
 | |
|  *
 | |
|  * If another thread is active processing messages from the master
 | |
|  * then merely set a flag for that thread to do the restart. If no
 | |
|  * threads are active then directly call the restart routine to
 | |
|  * reconnect to the master.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  */
 | |
| void
 | |
| blr_master_reconnect(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     int do_reconnect = 0;
 | |
| 
 | |
|     if (router->master_state == BLRM_SLAVE_STOPPED)
 | |
|     {
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     spinlock_acquire(&router->lock);
 | |
|     if (router->active_logs)
 | |
|     {
 | |
|         /* Currently processing a response, set a flag
 | |
|          * and get the thread that is process a response
 | |
|          * to deal with the reconnect.
 | |
|          */
 | |
|         router->reconnect_pending = 1;
 | |
|         router->stats.n_delayedreconnects++;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         router->active_logs = 1;
 | |
|         do_reconnect = 1;
 | |
|     }
 | |
|     spinlock_release(&router->lock);
 | |
|     if (do_reconnect)
 | |
|     {
 | |
|         blr_restart_master(router);
 | |
|         spinlock_acquire(&router->lock);
 | |
|         router->active_logs = 0;
 | |
|         spinlock_release(&router->lock);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Shutdown a connection to the master
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  */
 | |
| void
 | |
| blr_master_close(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     dcb_close(router->master);
 | |
|     router->master_state = BLRM_UNCONNECTED;
 | |
|     router->master_event_state = BLR_EVENT_DONE;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Mark this master connection for a delayed reconnect, used during
 | |
|  * error recovery to cause a reconnect after 60 seconds.
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  */
 | |
| void
 | |
| blr_master_delayed_connect(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     char *name = (char*)MXS_MALLOC(strlen(router->service->name) + strlen(" Master Recovery") + 1);
 | |
| 
 | |
|     if (name)
 | |
|     {
 | |
|         sprintf(name, "%s Master Recovery", router->service->name);
 | |
|         hktask_oneshot(name, blr_start_master, router, 60);
 | |
|         MXS_FREE(name);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Binlog router master side state machine event handler.
 | |
|  *
 | |
|  * Handles an incoming response from the master server to the binlog
 | |
|  * router.
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param buf       The incoming packet
 | |
|  */
 | |
| void
 | |
| blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
 | |
| {
 | |
|     char query[BLRM_MASTER_REGITRATION_QUERY_LEN + 1];
 | |
|     char task_name[BLRM_TASK_NAME_LEN + 1] = "";
 | |
| 
 | |
|     atomic_add(&router->handling_threads, 1);
 | |
|     ss_dassert(router->handling_threads == 1);
 | |
|     spinlock_acquire(&router->lock);
 | |
|     router->active_logs = 1;
 | |
|     spinlock_release(&router->lock);
 | |
|     if (router->master_state > BLRM_MAXSTATE)
 | |
|     {
 | |
|         MXS_ERROR("Invalid master state machine state (%d) for binlog router.",
 | |
|                   router->master_state);
 | |
|         gwbuf_free(buf);
 | |
| 
 | |
|         spinlock_acquire(&router->lock);
 | |
|         if (router->reconnect_pending)
 | |
|         {
 | |
|             router->active_logs = 0;
 | |
|             spinlock_release(&router->lock);
 | |
|             atomic_add(&router->handling_threads, -1);
 | |
|             MXS_ERROR("%s: Pending reconnect in state %s.",
 | |
|                       router->service->name,
 | |
|                       blrm_states[router->master_state]);
 | |
|             blr_restart_master(router);
 | |
|             return;
 | |
|         }
 | |
|         router->active_logs = 0;
 | |
|         spinlock_release(&router->lock);
 | |
|         atomic_add(&router->handling_threads, -1);
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     if (router->master_state == BLRM_GTIDMODE && MYSQL_RESPONSE_ERR(buf))
 | |
|     {
 | |
|         /*
 | |
|          * If we get an error response to the GTID Mode then we
 | |
|          * asusme the server does not support GTID modes and
 | |
|          * continue. The error is saved and replayed to slaves if
 | |
|          * they also request the GTID mode.
 | |
|          */
 | |
|         MXS_ERROR("%s: Master server does not support GTID Mode.",
 | |
|                   router->service->name);
 | |
|     }
 | |
|     else if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf))
 | |
|     {
 | |
|         char *msg_err = NULL;
 | |
|         int msg_len = 0;
 | |
|         int len = gwbuf_length(buf);
 | |
|         unsigned long mysql_errno = extract_field(MYSQL_ERROR_CODE(buf), 16);
 | |
| 
 | |
|         msg_len = len - 7 - 6; // +7 is where msg starts, 6 is skipped the status message (#42000)
 | |
|         msg_err = (char *)MXS_MALLOC(msg_len + 1);
 | |
| 
 | |
|         if (msg_err)
 | |
|         {
 | |
|             // skip status message only as MYSQL_RESPONSE_ERR(buf) points to GWBUF_DATA(buf) +7
 | |
|             memcpy(msg_err, (char *)(MYSQL_ERROR_MSG(buf) + 6), msg_len);
 | |
| 
 | |
|             /* NULL terminated error string */
 | |
|             *(msg_err + msg_len) = '\0';
 | |
|         }
 | |
| 
 | |
|         MXS_ERROR("%s: Received error: %lu, '%s' from master during '%s' phase "
 | |
|                   "of the master state machine.",
 | |
|                   router->service->name,
 | |
|                   mysql_errno,
 | |
|                   msg_err ? msg_err : "(memory failure)",
 | |
|                   blrm_states[router->master_state]);
 | |
|         gwbuf_free(buf);
 | |
| 
 | |
|         spinlock_acquire(&router->lock);
 | |
| 
 | |
|         /* set mysql errno */
 | |
|         router->m_errno = mysql_errno;
 | |
| 
 | |
|         /* set mysql error message */
 | |
|         if (router->m_errmsg)
 | |
|         {
 | |
|             MXS_FREE(router->m_errmsg);
 | |
|         }
 | |
|         router->m_errmsg = msg_err ? msg_err : "(memory failure)";
 | |
| 
 | |
|         router->active_logs = 0;
 | |
|         if (router->reconnect_pending)
 | |
|         {
 | |
|             spinlock_release(&router->lock);
 | |
|             atomic_add(&router->handling_threads, -1);
 | |
|             blr_restart_master(router);
 | |
|             return;
 | |
|         }
 | |
|         spinlock_release(&router->lock);
 | |
|         atomic_add(&router->handling_threads, -1);
 | |
|         return;
 | |
|     }
 | |
|     switch (router->master_state)
 | |
|     {
 | |
|     case BLRM_TIMESTAMP:
 | |
|         // Response to a timestamp message, no need to save this.
 | |
|         gwbuf_free(buf);
 | |
|         buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
 | |
|         router->master_state = BLRM_SERVERID;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         router->retry_backoff = 1;
 | |
|         break;
 | |
|     case BLRM_SERVERID:
 | |
|         {
 | |
|             char *val = blr_extract_column(buf, 2);
 | |
| 
 | |
|             // Response to fetch of master's server-id
 | |
|             if (router->saved_master.server_id)
 | |
|             {
 | |
|                 GWBUF_CONSUME_ALL(router->saved_master.server_id);
 | |
|             }
 | |
|             router->saved_master.server_id = buf;
 | |
|             blr_cache_response(router, "serverid", buf);
 | |
| 
 | |
|             // set router->masterid from master server-id if it's not set by the config option
 | |
|             if (router->masterid == 0)
 | |
|             {
 | |
|                 router->masterid = atoi(val);
 | |
|             }
 | |
| 
 | |
|             {
 | |
|                 char str[BLRM_SET_HEARTBEAT_QUERY_LEN];
 | |
|                 sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
 | |
|                 buf = blr_make_query(str);
 | |
|             }
 | |
|             router->master_state = BLRM_HBPERIOD;
 | |
|             router->master->func.write(router->master, buf);
 | |
|             MXS_FREE(val);
 | |
|             break;
 | |
|         }
 | |
|     case BLRM_HBPERIOD:
 | |
|         // Response to set the heartbeat period
 | |
|         if (router->saved_master.heartbeat)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.heartbeat);
 | |
|         }
 | |
|         router->saved_master.heartbeat = buf;
 | |
|         blr_cache_response(router, "heartbeat", buf);
 | |
|         buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
 | |
|         router->master_state = BLRM_CHKSUM1;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_CHKSUM1:
 | |
|         // Response to set the master binlog checksum
 | |
|         if (router->saved_master.chksum1)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.chksum1);
 | |
|         }
 | |
|         router->saved_master.chksum1 = buf;
 | |
|         blr_cache_response(router, "chksum1", buf);
 | |
|         buf = blr_make_query("SELECT @master_binlog_checksum");
 | |
|         router->master_state = BLRM_CHKSUM2;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_CHKSUM2:
 | |
|         {
 | |
|             char *val = blr_extract_column(buf, 1);
 | |
| 
 | |
|             if (val && strncasecmp(val, "NONE", 4) == 0)
 | |
|             {
 | |
|                 router->master_chksum = false;
 | |
|             }
 | |
|             if (val)
 | |
|             {
 | |
|                 MXS_FREE(val);
 | |
|             }
 | |
|             // Response to the master_binlog_checksum, should be stored
 | |
|             if (router->saved_master.chksum2)
 | |
|             {
 | |
|                 GWBUF_CONSUME_ALL(router->saved_master.chksum2);
 | |
|             }
 | |
|             router->saved_master.chksum2 = buf;
 | |
|             blr_cache_response(router, "chksum2", buf);
 | |
| 
 | |
|             if (router->mariadb10_compat)
 | |
|             {
 | |
|                 buf = blr_make_query("SET @mariadb_slave_capability=4");
 | |
|                 router->master_state = BLRM_MARIADB10;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
 | |
|                 router->master_state = BLRM_GTIDMODE;
 | |
|             }
 | |
|             router->master->func.write(router->master, buf);
 | |
|             break;
 | |
|         }
 | |
|     case BLRM_MARIADB10:
 | |
|         // Response to the SET @mariadb_slave_capability=4, should be stored
 | |
|         if (router->saved_master.mariadb10)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.mariadb10);
 | |
|         }
 | |
|         router->saved_master.mariadb10 = buf;
 | |
|         blr_cache_response(router, "mariadb10", buf);
 | |
|         buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
 | |
|         router->master_state = BLRM_MUUID;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_GTIDMODE:
 | |
|         // Response to the GTID_MODE, should be stored
 | |
|         if (router->saved_master.gtid_mode)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.gtid_mode);
 | |
|         }
 | |
|         router->saved_master.gtid_mode = buf;
 | |
|         blr_cache_response(router, "gtidmode", buf);
 | |
|         buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
 | |
|         router->master_state = BLRM_MUUID;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_MUUID:
 | |
|         {
 | |
|             char *key;
 | |
|             char *val = NULL;
 | |
| 
 | |
|             key = blr_extract_column(buf, 1);
 | |
|             if (key && strlen(key))
 | |
|             {
 | |
|                 val = blr_extract_column(buf, 2);
 | |
|             }
 | |
|             if (key)
 | |
|             {
 | |
|                 MXS_FREE(key);
 | |
|             }
 | |
| 
 | |
|             /* set the master_uuid from master if not set by the option */
 | |
|             if (router->set_master_uuid == NULL)
 | |
|             {
 | |
|                 MXS_FREE(router->master_uuid);
 | |
|                 router->master_uuid = val;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 router->master_uuid = router->set_master_uuid;
 | |
|             }
 | |
| 
 | |
|             // Response to the SERVER_UUID, should be stored
 | |
|             if (router->saved_master.uuid)
 | |
|             {
 | |
|                 GWBUF_CONSUME_ALL(router->saved_master.uuid);
 | |
|             }
 | |
|             router->saved_master.uuid = buf;
 | |
|             blr_cache_response(router, "uuid", buf);
 | |
|             sprintf(query, "SET @slave_uuid='%s'", router->uuid);
 | |
|             buf = blr_make_query(query);
 | |
|             router->master_state = BLRM_SUUID;
 | |
|             router->master->func.write(router->master, buf);
 | |
|             break;
 | |
|         }
 | |
|     case BLRM_SUUID:
 | |
|         // Response to the SET @server_uuid, should be stored
 | |
|         if (router->saved_master.setslaveuuid)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid);
 | |
|         }
 | |
|         router->saved_master.setslaveuuid = buf;
 | |
|         blr_cache_response(router, "ssuuid", buf);
 | |
|         buf = blr_make_query("SET NAMES latin1");
 | |
|         router->master_state = BLRM_LATIN1;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_LATIN1:
 | |
|         // Response to the SET NAMES latin1, should be stored
 | |
|         if (router->saved_master.setnames)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.setnames);
 | |
|         }
 | |
|         router->saved_master.setnames = buf;
 | |
|         blr_cache_response(router, "setnames", buf);
 | |
|         buf = blr_make_query("SET NAMES utf8");
 | |
|         router->master_state = BLRM_UTF8;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_UTF8:
 | |
|         // Response to the SET NAMES utf8, should be stored
 | |
|         if (router->saved_master.utf8)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.utf8);
 | |
|         }
 | |
|         router->saved_master.utf8 = buf;
 | |
|         blr_cache_response(router, "utf8", buf);
 | |
|         buf = blr_make_query("SELECT 1");
 | |
|         router->master_state = BLRM_SELECT1;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_SELECT1:
 | |
|         // Response to the SELECT 1, should be stored
 | |
|         if (router->saved_master.select1)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.select1);
 | |
|         }
 | |
|         router->saved_master.select1 = buf;
 | |
|         blr_cache_response(router, "select1", buf);
 | |
|         buf = blr_make_query("SELECT VERSION()");
 | |
|         router->master_state = BLRM_SELECTVER;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_SELECTVER:
 | |
|         // Response to SELECT VERSION should be stored
 | |
|         if (router->saved_master.selectver)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.selectver);
 | |
|         }
 | |
|         router->saved_master.selectver = buf;
 | |
|         blr_cache_response(router, "selectver", buf);
 | |
|         buf = blr_make_query("SELECT @@version_comment limit 1");
 | |
|         router->master_state = BLRM_SELECTVERCOM;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_SELECTVERCOM:
 | |
|         // Response to SELECT @@version_comment should be stored
 | |
|         if (router->saved_master.selectvercom)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.selectvercom);
 | |
|         }
 | |
|         router->saved_master.selectvercom = buf;
 | |
|         blr_cache_response(router, "selectvercom", buf);
 | |
|         buf = blr_make_query("SELECT @@hostname");
 | |
|         router->master_state = BLRM_SELECTHOSTNAME;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_SELECTHOSTNAME:
 | |
|         // Response to SELECT @@hostname should be stored
 | |
|         if (router->saved_master.selecthostname)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.selecthostname);
 | |
|         }
 | |
|         router->saved_master.selecthostname = buf;
 | |
|         blr_cache_response(router, "selecthostname", buf);
 | |
|         buf = blr_make_query("SELECT @@max_allowed_packet");
 | |
|         router->master_state = BLRM_MAP;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_MAP:
 | |
|         // Response to SELECT @@max_allowed_packet should be stored
 | |
|         if (router->saved_master.map)
 | |
|         {
 | |
|             GWBUF_CONSUME_ALL(router->saved_master.map);
 | |
|         }
 | |
|         router->saved_master.map = buf;
 | |
|         blr_cache_response(router, "map", buf);
 | |
|         buf = blr_make_registration(router);
 | |
|         router->master_state = BLRM_REGISTER;
 | |
|         router->master->func.write(router->master, buf);
 | |
|         break;
 | |
|     case BLRM_REGISTER:
 | |
|         /* discard master reply to COM_REGISTER_SLAVE */
 | |
|         gwbuf_free(buf);
 | |
| 
 | |
|         /* if semisync option is set, check for master semi-sync availability */
 | |
|         if (router->request_semi_sync)
 | |
|         {
 | |
|             MXS_NOTICE("%s: checking Semi-Sync replication capability for master server %s:%d",
 | |
|                        router->service->name,
 | |
|                        router->service->dbref->server->name,
 | |
|                        router->service->dbref->server->port);
 | |
| 
 | |
|             buf = blr_make_query("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled'");
 | |
|             router->master_state = BLRM_CHECK_SEMISYNC;
 | |
|             router->master->func.write(router->master, buf);
 | |
| 
 | |
|             break;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             /* Continue */
 | |
|             router->master_state = BLRM_REQUEST_BINLOGDUMP;
 | |
|         }
 | |
|     case BLRM_CHECK_SEMISYNC:
 | |
|     {
 | |
|         /**
 | |
|          * This branch could be reached as fallthrough from BLRM_REGISTER
 | |
|          * if request_semi_sync option is false
 | |
|          */
 | |
|          if (router->master_state == BLRM_CHECK_SEMISYNC)
 | |
|          {
 | |
|              /* Get master semi-sync installed, enabled, disabled */
 | |
|              router->master_semi_sync = blr_get_master_semisync(buf);
 | |
| 
 | |
|              /* Discard buffer */
 | |
|              gwbuf_free(buf);
 | |
| 
 | |
|              if (router->master_semi_sync == MASTER_SEMISYNC_NOT_AVAILABLE)
 | |
|              {
 | |
|                  /* not installed */
 | |
|                  MXS_NOTICE("%s: master server %s:%d doesn't have semi_sync capability",
 | |
|                             router->service->name,
 | |
|                             router->service->dbref->server->name,
 | |
|                             router->service->dbref->server->port);
 | |
| 
 | |
|                  /* Continue */
 | |
|                  router->master_state = BLRM_REQUEST_BINLOGDUMP;
 | |
| 
 | |
|              }
 | |
|              else
 | |
|              {
 | |
|                  if (router->master_semi_sync == MASTER_SEMISYNC_DISABLED)
 | |
|                  {
 | |
|                      /* Installed but not enabled,  right now */
 | |
|                      MXS_NOTICE("%s: master server %s:%d doesn't have semi_sync enabled right now, "
 | |
|                                 "Requesting Semi-Sync Replication",
 | |
|                                 router->service->name,
 | |
|                                 router->service->dbref->server->name,
 | |
|                                 router->service->dbref->server->port);
 | |
|                  }
 | |
|                  else
 | |
|                  {
 | |
|                      /* Installed and enabled */
 | |
|                      MXS_NOTICE("%s: master server %s:%d has semi_sync enabled, Requesting Semi-Sync Replication",
 | |
|                                 router->service->name,
 | |
|                                 router->service->dbref->server->name,
 | |
|                                 router->service->dbref->server->port);
 | |
|                  }
 | |
| 
 | |
|                  buf = blr_make_query("SET @rpl_semi_sync_slave = 1");
 | |
|                  router->master_state = BLRM_REQUEST_SEMISYNC;
 | |
|                  router->master->func.write(router->master, buf);
 | |
| 
 | |
|                  break;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     case BLRM_REQUEST_SEMISYNC:
 | |
|         /**
 | |
|          * This branch could be reached as fallthrough from BLRM_REGISTER or BLRM_CHECK_SEMISYNC
 | |
|          * if request_semi_sync option is false or master doesn't support semisync or it's not enabled
 | |
|          */
 | |
|          if (router->master_state == BLRM_REQUEST_SEMISYNC)
 | |
|          {
 | |
|              /* discard master reply */
 | |
|              gwbuf_free(buf);
 | |
| 
 | |
|              /* Continue */
 | |
|              router->master_state = BLRM_REQUEST_BINLOGDUMP;
 | |
|          }
 | |
| 
 | |
|     case BLRM_REQUEST_BINLOGDUMP:
 | |
|         /**
 | |
|          * This branch is reached after semi-sync check/request or
 | |
|          * just after sending COM_REGISTER_SLAVE if request_semi_sync option is false
 | |
|          */
 | |
| 
 | |
|         /* Request now a dump of the binlog file */
 | |
|         buf = blr_make_binlog_dump(router);
 | |
| 
 | |
|         router->master_state = BLRM_BINLOGDUMP;
 | |
| 
 | |
|         router->master->func.write(router->master, buf);
 | |
|         MXS_NOTICE("%s: Request binlog records from %s at "
 | |
|                    "position %lu from master server %s:%d",
 | |
|                    router->service->name, router->binlog_name,
 | |
|                    router->current_pos,
 | |
|                    router->service->dbref->server->name,
 | |
|                    router->service->dbref->server->port);
 | |
| 
 | |
|         /* Log binlog router identity */
 | |
|         blr_log_identity(router);
 | |
| 
 | |
|         break;
 | |
| 
 | |
|     case BLRM_BINLOGDUMP:
 | |
|         /**
 | |
|          * Main body, we have received a binlog record from the master
 | |
|          */
 | |
|         blr_handle_binlog_record(router, buf);
 | |
| 
 | |
|         /**
 | |
|          * Set heartbeat check task
 | |
|          */
 | |
|         snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);
 | |
|         hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat);
 | |
| 
 | |
|         break;
 | |
|     }
 | |
| 
 | |
|     if (router->reconnect_pending)
 | |
|     {
 | |
|         blr_restart_master(router);
 | |
|     }
 | |
|     spinlock_acquire(&router->lock);
 | |
|     router->active_logs = 0;
 | |
|     spinlock_release(&router->lock);
 | |
|     atomic_add(&router->handling_threads, -1);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Build a MySQL query into a GWBUF that we can send to the master database
 | |
|  *
 | |
|  * @param   query       The text of the query to send
 | |
|  */
 | |
| static GWBUF *
 | |
| blr_make_query(char *query)
 | |
| {
 | |
|     GWBUF *buf;
 | |
|     unsigned char *data;
 | |
|     int len;
 | |
| 
 | |
|     if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     data = GWBUF_DATA(buf);
 | |
|     len = strlen(query) + 1;
 | |
|     encode_value(&data[0], len, 24);    // Payload length
 | |
|     data[3] = 0;                // Sequence id
 | |
|     // Payload
 | |
|     data[4] = COM_QUERY;            // Command
 | |
|     memcpy(&data[5], query, strlen(query));
 | |
| 
 | |
|     return buf;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Build a MySQL slave registration into a GWBUF that we can send to the
 | |
|  * master database
 | |
|  *
 | |
|  * @param   router  The router instance
 | |
|  * @return  A MySQL Replication registration message in a GWBUF structure
 | |
|  */
 | |
| static GWBUF *
 | |
| blr_make_registration(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     GWBUF *buf;
 | |
|     unsigned char *data;
 | |
|     int len = 18;
 | |
|     int port = 3306;
 | |
| 
 | |
|     if ((buf = gwbuf_alloc(len + 4)) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     data = GWBUF_DATA(buf);
 | |
|     encode_value(&data[0], len, 24); // Payload length
 | |
|     data[3] = 0;                     // Sequence ID
 | |
|     data[4] = COM_REGISTER_SLAVE;    // Command
 | |
|     encode_value(&data[5], router->serverid, 32);   // Slave Server ID
 | |
|     data[9] = 0;                     // Slave hostname length
 | |
|     data[10] = 0;                    // Slave username length
 | |
|     data[11] = 0;                    // Slave password length
 | |
|     if (router->service->ports)
 | |
|     {
 | |
|         port = router->service->ports->port;
 | |
|     }
 | |
|     encode_value(&data[12], port, 16);  // Slave master port
 | |
|     encode_value(&data[14], 0, 32);         // Replication rank
 | |
|     encode_value(&data[18], router->masterid, 32);  // Master server-id
 | |
| 
 | |
|     return buf;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * Build a Binlog dump command into a GWBUF that we can send to the
 | |
|  * master database
 | |
|  *
 | |
|  * @param   router  The router instance
 | |
|  * @return  A MySQL Replication COM_BINLOG_DUMP message in a GWBUF structure
 | |
|  */
 | |
| static GWBUF *
 | |
| blr_make_binlog_dump(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     GWBUF *buf;
 | |
|     unsigned char *data;
 | |
|     int binlog_file_len = strlen(router->binlog_name);
 | |
|     /* COM_BINLOG_DUMP needs 11 bytes + binlogname (terminating NULL is not required) */
 | |
|     int len = 11 + binlog_file_len;
 | |
| 
 | |
|     if ((buf = gwbuf_alloc(len + 4)) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     data = GWBUF_DATA(buf);
 | |
| 
 | |
|     encode_value(&data[0], len, 24);       // Payload length
 | |
|     data[3] = 0;                           // Sequence ID
 | |
|     data[4] = COM_BINLOG_DUMP;             // Command
 | |
|     encode_value(&data[5],
 | |
|                  router->current_pos, 32); // binlog position
 | |
| 
 | |
|     /* With mariadb10 always ask for annotate rows events */
 | |
|     if (router->mariadb10_compat)
 | |
|     {
 | |
|         // set flag for annotate rows event
 | |
|         encode_value(&data[9], BLR_REQUEST_ANNOTATE_ROWS_EVENT, 16);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         encode_value(&data[9], 0, 16);      // No flag set
 | |
|     }
 | |
| 
 | |
|     encode_value(&data[11],
 | |
|                  router->serverid, 32);    // Server-id of MaxScale
 | |
|     memcpy((char *)&data[15], router->binlog_name,
 | |
|            binlog_file_len);               // binlog filename
 | |
|     return buf;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * Encode a value into a number of bits in a MySQL packet
 | |
|  *
 | |
|  * @param   data    Point to location in target packet
 | |
|  * @param   value   The value to pack
 | |
|  * @param   len Number of bits to encode value into
 | |
|  */
 | |
| void
 | |
| encode_value(unsigned char *data, unsigned int value, int len)
 | |
| {
 | |
|     while (len > 0)
 | |
|     {
 | |
|         *data++ = value & 0xff;
 | |
|         value >>= 8;
 | |
|         len -= 8;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * blr_handle_binlog_record - we have received binlog records from
 | |
|  * the master and we must now work out what to do with them.
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param pkt       The binlog records
 | |
|  */
 | |
| void
 | |
| blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
 | |
| {
 | |
|     uint8_t *msg = NULL, *ptr, *pdata;
 | |
|     REP_HEADER hdr;
 | |
|     unsigned int len = 0, reslen;
 | |
|     unsigned int pkt_length;
 | |
|     int no_residual = 1;
 | |
|     int preslen = -1;
 | |
|     int prev_length = -1;
 | |
|     int n_bufs = -1, pn_bufs = -1;
 | |
|     int check_packet_len;
 | |
|     int semisync_bytes;
 | |
|     int semi_sync_send_ack = 0;
 | |
| 
 | |
|     /*
 | |
|      * Prepend any residual buffer to the buffer chain we have
 | |
|      * been called with.
 | |
|      */
 | |
|     if (router->residual)
 | |
|     {
 | |
|         pkt = gwbuf_append(router->residual, pkt);
 | |
|         router->residual = NULL;
 | |
|         no_residual = 0;
 | |
|     }
 | |
| 
 | |
|     pkt_length = gwbuf_length(pkt);
 | |
|     /*
 | |
|      * Loop over all the packets while we still have some data
 | |
|      * and the packet length is enough to hold a replication event
 | |
|      * header.
 | |
|      */
 | |
|     while (pkt && pkt_length > 24)
 | |
|     {
 | |
|         reslen = GWBUF_LENGTH(pkt);
 | |
|         pdata = GWBUF_DATA(pkt);
 | |
|         if (reslen < 3) // Payload length straddles buffers
 | |
|         {
 | |
|             /* Get the length of the packet from the residual and new packet */
 | |
|             if (reslen >= 3)
 | |
|             {
 | |
|                 len = EXTRACT24(pdata);
 | |
|             }
 | |
|             else if (reslen == 2)
 | |
|             {
 | |
|                 len = EXTRACT16(pdata);
 | |
|                 len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16);
 | |
|             }
 | |
|             else if (reslen == 1)
 | |
|             {
 | |
|                 len = *pdata;
 | |
|                 len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8);
 | |
|             }
 | |
|             len += 4;   // Allow space for the header
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             len = EXTRACT24(pdata) + 4;
 | |
|         }
 | |
|         /* len is now the payload length for the packet we are working on */
 | |
| 
 | |
|         if (reslen < len && pkt_length >= len)
 | |
|         {
 | |
|             /*
 | |
|              * The message is contained in more than the current
 | |
|              * buffer, however we have the complete messasge in
 | |
|              * this buffer and the chain of remaining buffers.
 | |
|              *
 | |
|              * Allocate a contiguous buffer for the binlog message
 | |
|              * and copy the complete message into this buffer.
 | |
|              */
 | |
|             int msg_remainder = len;
 | |
|             GWBUF *p = pkt;
 | |
| 
 | |
|             if ((msg = MXS_MALLOC(len)) == NULL)
 | |
|             {
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|             n_bufs = 0;
 | |
|             ptr = msg;
 | |
|             while (p && msg_remainder > 0)
 | |
|             {
 | |
|                 int plen = GWBUF_LENGTH(p);
 | |
|                 int n = (msg_remainder > plen ? plen : msg_remainder);
 | |
|                 memcpy(ptr, GWBUF_DATA(p), n);
 | |
|                 msg_remainder -= n;
 | |
|                 ptr += n;
 | |
|                 if (msg_remainder > 0)
 | |
|                 {
 | |
|                     p = p->next;
 | |
|                 }
 | |
|                 n_bufs++;
 | |
|             }
 | |
|             if (msg_remainder)
 | |
|             {
 | |
|                 MXS_ERROR("Expected entire message in buffer "
 | |
|                           "chain, but failed to create complete "
 | |
|                           "message as expected. %s @ %lu",
 | |
|                           router->binlog_name,
 | |
|                           router->current_pos);
 | |
|                 MXS_FREE(msg);
 | |
|                 /* msg = NULL; Not needed unless msg will be referred to again */
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|             ptr = msg;
 | |
|         }
 | |
|         else if (reslen < len)
 | |
|         {
 | |
|             /*
 | |
|              * The message is not fully contained in the current
 | |
|              * and we do not have the complete message in the
 | |
|              * buffer chain. Therefore we must stop processing
 | |
|              * until we receive the next buffer.
 | |
|              */
 | |
|             router->stats.n_residuals++;
 | |
|             MXS_DEBUG("Residual data left after %lu records. %s @ %lu",
 | |
|                       router->stats.n_binlogs,
 | |
|                       router->binlog_name, router->current_pos);
 | |
|             break;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             /*
 | |
|              * The message is fully contained in the current buffer
 | |
|              */
 | |
|             ptr = pdata;
 | |
|             n_bufs = 1;
 | |
|         }
 | |
| 
 | |
|         semisync_bytes = 0;
 | |
| 
 | |
|         /*
 | |
|          * ptr now points at the current message in a contiguous buffer,
 | |
|          * this buffer is either within the GWBUF or in a malloc'd
 | |
|          * copy if the message straddles GWBUF's.
 | |
|          */
 | |
| 
 | |
|         if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
 | |
|         {
 | |
|             char *event_msg = "";
 | |
| 
 | |
|             /* Packet is too small to be a binlog event */
 | |
|             if (ptr[4] == 0xfe) /* EOF Packet */
 | |
|             {
 | |
|                 event_msg = "end of file";
 | |
|             }
 | |
|             else if (ptr[4] == 0xff)    /* EOF Packet */
 | |
|             {
 | |
|                 event_msg = "error";
 | |
|             }
 | |
|             MXS_NOTICE("Non-event message (%s) from master.", event_msg);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             if (router->master_event_state == BLR_EVENT_DONE)
 | |
|             {
 | |
|                 spinlock_acquire(&router->lock);
 | |
|                 router->stats.n_binlogs++;
 | |
|                 router->stats.n_binlogs_ses++;
 | |
|                 spinlock_release(&router->lock);
 | |
| 
 | |
|                 /* Check for semi-sync in event with OK byte[4]:
 | |
|                  * move pointer 2 bytes ahead and set check_packet_len accordingly
 | |
|                  */
 | |
|                 if (ptr[4] == 0 && router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE &&
 | |
|                     ptr[5] == BLR_MASTER_SEMI_SYNC_INDICATOR)
 | |
|                 {
 | |
| 
 | |
|                     check_packet_len = MASTER_BYTES_BEFORE_EVENT_SEMI_SYNC;
 | |
|                     semi_sync_send_ack = ptr[6];
 | |
| 
 | |
|                     /* Extract header from the semi-sync event */
 | |
|                     blr_extract_header_semisync(ptr, &hdr);
 | |
| 
 | |
|                     /** Remove the semi-sync bytes */
 | |
|                     memmove(ptr, ptr + 2, 5);
 | |
|                     ptr += 2;
 | |
|                     semisync_bytes = 2;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     semi_sync_send_ack = 0;
 | |
|                     check_packet_len = MASTER_BYTES_BEFORE_EVENT;
 | |
| 
 | |
|                     /* Extract header from thr event */
 | |
|                     blr_extract_header(ptr, &hdr);
 | |
|                 }
 | |
| 
 | |
|                 /* Sanity check */
 | |
|                 if (hdr.ok == 0)
 | |
|                 {
 | |
|                     if (hdr.event_size != len - check_packet_len &&
 | |
|                         (hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN)) < MYSQL_PACKET_LENGTH_MAX)
 | |
|                     {
 | |
|                         MXS_ERROR("Packet length is %d, but event size is %d, "
 | |
|                                   "binlog file %s position %lu "
 | |
|                                   "reslen is %d and preslen is %d, "
 | |
|                                   "length of previous event %d. %s",
 | |
|                                   len, hdr.event_size,
 | |
|                                   router->binlog_name,
 | |
|                                   router->current_pos,
 | |
|                                   reslen, preslen, prev_length,
 | |
|                                   (prev_length == -1 ?
 | |
|                                    (no_residual ? "No residual data from previous call" :
 | |
|                                     "Residual data from previous call") : ""));
 | |
| 
 | |
|                         blr_log_packet(LOG_ERR, "Packet:", ptr, len);
 | |
| 
 | |
|                         MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
 | |
|                                   "the previous events was contained in %d GWBUFs",
 | |
|                                   router->lastEventReceived, n_bufs, pn_bufs);
 | |
| 
 | |
|                         if (msg)
 | |
|                         {
 | |
|                             MXS_FREE(msg);
 | |
|                             /* msg = NULL; Not needed unless msg will be referred to again */
 | |
|                         }
 | |
| 
 | |
|                         break;
 | |
|                     }
 | |
|                     else if (hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN) >= MYSQL_PACKET_LENGTH_MAX)
 | |
|                     {
 | |
|                         router->master_event_state = BLR_EVENT_STARTED;
 | |
| 
 | |
|                         /** Store the header for later use */
 | |
|                         memcpy(&router->stored_header, &hdr, sizeof(hdr));
 | |
|                     }
 | |
| 
 | |
|                     /** Prepare the checksum variables for this event */
 | |
|                     router->stored_checksum = crc32(0L, NULL, 0);
 | |
|                     router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
 | |
|                     router->partial_checksum_bytes = 0;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     blr_terminate_master_replication(router, ptr, len);
 | |
|                 }
 | |
| 
 | |
|                 if (hdr.ok == 0)
 | |
|                 {
 | |
|                     spinlock_acquire(&router->lock);
 | |
| 
 | |
|                     /* set mysql errno to 0 */
 | |
|                     router->m_errno = 0;
 | |
| 
 | |
|                     /* Remove error message */
 | |
|                     if (router->m_errmsg)
 | |
|                     {
 | |
|                         MXS_FREE(router->m_errmsg);
 | |
|                     }
 | |
|                     router->m_errmsg = NULL;
 | |
| 
 | |
|                     spinlock_release(&router->lock);
 | |
| #ifdef SHOW_EVENTS
 | |
|                     printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
 | |
|                            "event size %d, event timestamp %lu\n",
 | |
|                            (unsigned long)len - 4,
 | |
|                            hdr.event_type,
 | |
|                            hdr.flags,
 | |
|                            hdr.event_size,
 | |
|                            (unsigned long)hdr.timestamp);
 | |
| #endif
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             /* pending large event */
 | |
|             if (router->master_event_state != BLR_EVENT_DONE)
 | |
|             {
 | |
|                 if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
 | |
|                 {
 | |
|                     /** This is the last packet, we can now proceed to distribute
 | |
|                      * the event afer it has been written to disk */
 | |
|                     ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
 | |
|                     router->master_event_state = BLR_EVENT_COMPLETE;
 | |
|                     memcpy(&hdr, &router->stored_header, sizeof(hdr));
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     /* current partial event is being written to disk file */
 | |
|                     uint32_t offset = MYSQL_HEADER_LEN;
 | |
|                     uint32_t extra_bytes = MYSQL_HEADER_LEN;
 | |
| 
 | |
|                     /** Don't write the OK byte into the binlog */
 | |
|                     if (router->master_event_state == BLR_EVENT_STARTED)
 | |
|                     {
 | |
|                         offset = MYSQL_HEADER_LEN + 1;
 | |
|                         router->master_event_state = BLR_EVENT_ONGOING;
 | |
|                         extra_bytes = MYSQL_HEADER_LEN + 1;
 | |
|                     }
 | |
| 
 | |
|                     ss_dassert(len - extra_bytes - semisync_bytes > 0);
 | |
|                     uint32_t bytes_available = len - extra_bytes - semisync_bytes;
 | |
| 
 | |
|                     if (router->master_chksum)
 | |
|                     {
 | |
|                         uint32_t size = MIN(len - extra_bytes - semisync_bytes,
 | |
|                                             router->checksum_size);
 | |
| 
 | |
|                         router->stored_checksum = crc32(router->stored_checksum,
 | |
|                                                         ptr + offset,
 | |
|                                                         size);
 | |
|                         router->checksum_size -= size;
 | |
| 
 | |
|                         if (router->checksum_size == 0 && size < bytes_available)
 | |
|                         {
 | |
|                             extract_checksum(router, ptr + offset + size,
 | |
|                                              bytes_available - size);
 | |
|                         }
 | |
|                     }
 | |
| 
 | |
|                     if (blr_write_data_into_binlog(router, bytes_available,
 | |
|                                                    ptr + offset) == 0)
 | |
|                     {
 | |
|                         /** Failed to write to the binlog file, destroy the buffer
 | |
|                          * chain and close the connection with the master */
 | |
|                         while (pkt)
 | |
|                         {
 | |
|                             pkt = GWBUF_CONSUME_ALL(pkt);
 | |
|                         }
 | |
|                         blr_master_close(router);
 | |
|                         blr_master_delayed_connect(router);
 | |
|                         return;
 | |
|                     }
 | |
|                     pkt = gwbuf_consume(pkt, len);
 | |
|                     pkt_length -= len;
 | |
|                     continue;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             /*
 | |
|              * First check that the checksum we calculate matches the
 | |
|              * checksum in the packet we received.
 | |
|              */
 | |
|             if (router->master_chksum)
 | |
|             {
 | |
|                 uint32_t offset = MYSQL_HEADER_LEN;
 | |
|                 uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN;
 | |
| 
 | |
|                 if (router->master_event_state == BLR_EVENT_DONE)
 | |
|                 {
 | |
|                     /** Set the pointer offset to the first byte after
 | |
|                      * the header and OK byte */
 | |
|                     offset = MYSQL_HEADER_LEN + 1;
 | |
|                     size = len - (check_packet_len + MYSQL_CHECKSUM_LEN);
 | |
|                 }
 | |
| 
 | |
|                 size = MIN(size, router->checksum_size);
 | |
| 
 | |
|                 if (router->checksum_size > 0)
 | |
|                 {
 | |
|                     router->stored_checksum = crc32(router->stored_checksum,
 | |
|                                                     ptr + offset, size);
 | |
|                     router->checksum_size -= size;
 | |
|                 }
 | |
| 
 | |
|                 if(router->checksum_size == 0 && size < (len - offset - semisync_bytes))
 | |
|                 {
 | |
|                     extract_checksum(router, ptr + offset + size,
 | |
|                                      len - offset - size - semisync_bytes);
 | |
|                 }
 | |
| 
 | |
|                 if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN)
 | |
|                 {
 | |
|                     uint32_t pktsum = EXTRACT32(router->partial_checksum);
 | |
|                     if (pktsum != router->stored_checksum)
 | |
|                     {
 | |
|                         router->stats.n_badcrc++;
 | |
|                         MXS_FREE(msg);
 | |
|                         /* msg = NULL; Not needed unless msg will be referred to again */
 | |
|                         MXS_ERROR("%s: Checksum error in event from master, "
 | |
|                                   "binlog %s @ %lu. Closing master connection.",
 | |
|                                   router->service->name, router->binlog_name,
 | |
|                                   router->current_pos);
 | |
|                         blr_master_close(router);
 | |
|                         blr_master_delayed_connect(router);
 | |
|                         return;
 | |
|                     }
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     pkt = gwbuf_consume(pkt, len);
 | |
|                     pkt_length -= len;
 | |
|                     continue;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             if (hdr.ok == 0)
 | |
|             {
 | |
|                 router->lastEventReceived = hdr.event_type;
 | |
|                 router->lastEventTimestamp = hdr.timestamp;
 | |
| 
 | |
|                 /**
 | |
|                  * Check for an open transaction, if the option is set
 | |
|                  * Only complete transactions should be sent to sleves
 | |
|                  *
 | |
|                  * If a trasaction is pending router->binlog_position
 | |
|                  * won't be updated to router->current_pos
 | |
|                  */
 | |
| 
 | |
|                 spinlock_acquire(&router->binlog_lock);
 | |
|                 if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == BLRM_NO_TRANSACTION))
 | |
|                 {
 | |
|                     /* no pending transaction: set current_pos to binlog_position */
 | |
|                     router->binlog_position = router->current_pos;
 | |
|                     router->current_safe_event = router->current_pos;
 | |
|                 }
 | |
|                 spinlock_release(&router->binlog_lock);
 | |
| 
 | |
|                 /**
 | |
|                  * Detect transactions in events
 | |
|                  * Only complete transactions should be sent to sleves
 | |
|                 */
 | |
| 
 | |
|                 /* If MariaDB 10 compatibility:
 | |
|                 * check for MARIADB10_GTID_EVENT with flags = 0
 | |
|                   * This marks the transaction starts instead of
 | |
|                   * QUERY_EVENT with "BEGIN"
 | |
|                  */
 | |
|                 if (router->trx_safe && router->master_event_state == BLR_EVENT_DONE)
 | |
|                 {
 | |
|                     if (router->mariadb10_compat)
 | |
|                     {
 | |
|                         if (hdr.event_type == MARIADB10_GTID_EVENT)
 | |
|                         {
 | |
|                             uint64_t n_sequence;
 | |
|                             uint32_t domainid;
 | |
|                             unsigned int flags;
 | |
|                             n_sequence = extract_field(ptr + 4 + 20, 64);
 | |
|                             domainid = extract_field(ptr + 4 + 20 + 8, 32);
 | |
|                             flags = *(ptr + 4 + 20 + 8 + 4);
 | |
| 
 | |
|                             if ((flags & (MARIADB_FL_DDL | MARIADB_FL_STANDALONE)) == 0)
 | |
|                             {
 | |
|                                 spinlock_acquire(&router->binlog_lock);
 | |
| 
 | |
|                                 if (router->pending_transaction > 0)
 | |
|                                 {
 | |
|                                     MXS_ERROR("A MariaDB 10 transaction "
 | |
|                                               "is already open "
 | |
|                                               "@ %lu (GTID %u-%u-%lu) and "
 | |
|                                               "a new one starts @ %lu",
 | |
|                                               router->binlog_position,
 | |
|                                               domainid, hdr.serverid,
 | |
|                                               n_sequence,
 | |
|                                               router->current_pos);
 | |
| 
 | |
|                                     // An action should be taken here
 | |
|                                 }
 | |
| 
 | |
|                                 router->pending_transaction = BLRM_TRANSACTION_START;
 | |
| 
 | |
|                                 spinlock_release(&router->binlog_lock);
 | |
|                             }
 | |
|                         }
 | |
|                     }
 | |
| 
 | |
|                     /**
 | |
|                      * look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT
 | |
|                      */
 | |
| 
 | |
|                     if (hdr.event_type == QUERY_EVENT)
 | |
|                     {
 | |
|                         char *statement_sql;
 | |
|                         int db_name_len, var_block_len, statement_len;
 | |
|                         db_name_len = ptr[4 + 20 + 4 + 4];
 | |
|                         var_block_len = ptr[4 + 20 + 4 + 4 + 1 + 2];
 | |
| 
 | |
|                         statement_len = len - (4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len) - semisync_bytes;
 | |
|                         statement_sql = MXS_CALLOC(1, statement_len + 1);
 | |
|                         MXS_ABORT_IF_NULL(statement_sql);
 | |
|                         memcpy(statement_sql,
 | |
|                                (char *)ptr + 4 + 20 + 4 + 4 + 1 + 2 + 2 + var_block_len + 1 + db_name_len,
 | |
|                                statement_len);
 | |
| 
 | |
|                         spinlock_acquire(&router->binlog_lock);
 | |
| 
 | |
|                         /* Check for BEGIN (it comes for START TRANSACTION too) */
 | |
|                         if (strncmp(statement_sql, "BEGIN", 5) == 0)
 | |
|                         {
 | |
|                             if (router->pending_transaction > BLRM_NO_TRANSACTION)
 | |
|                             {
 | |
|                                 MXS_ERROR("A transaction is already open "
 | |
|                                           "@ %lu and a new one starts @ %lu",
 | |
|                                           router->binlog_position,
 | |
|                                           router->current_pos);
 | |
| 
 | |
|                                 // An action should be taken here
 | |
|                             }
 | |
| 
 | |
|                             router->pending_transaction = BLRM_TRANSACTION_START;
 | |
|                         }
 | |
| 
 | |
|                         /* Check for COMMIT in non transactional store engines */
 | |
|                         if (strncmp(statement_sql, "COMMIT", 6) == 0)
 | |
|                         {
 | |
|                             router->pending_transaction = BLRM_COMMIT_SEEN;
 | |
|                         }
 | |
| 
 | |
|                         spinlock_release(&router->binlog_lock);
 | |
| 
 | |
|                         MXS_FREE(statement_sql);
 | |
|                     }
 | |
| 
 | |
|                     /* Check for COMMIT in Transactional engines, i.e InnoDB */
 | |
|                     if (hdr.event_type == XID_EVENT)
 | |
|                     {
 | |
|                         spinlock_acquire(&router->binlog_lock);
 | |
| 
 | |
|                         if (router->pending_transaction)
 | |
|                         {
 | |
|                             router->pending_transaction = BLRM_XID_EVENT_SEEN;
 | |
|                         }
 | |
|                         spinlock_release(&router->binlog_lock);
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 /** Gather statistics about the replication event types */
 | |
|                 int event_limit = router->mariadb10_compat ?
 | |
|                                   MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
 | |
| 
 | |
|                 if (hdr.event_type <= event_limit)
 | |
|                 {
 | |
|                     router->stats.events[hdr.event_type]++;
 | |
|                 }
 | |
| 
 | |
|                 if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
 | |
|                 {
 | |
|                     // Fake format description message
 | |
|                     MXS_DEBUG("Replication fake event. "
 | |
|                               "Binlog %s @ %lu.",
 | |
|                               router->binlog_name,
 | |
|                               router->current_pos);
 | |
|                     router->stats.n_fakeevents++;
 | |
| 
 | |
|                     if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
 | |
|                     {
 | |
|                         uint8_t *new_fde;
 | |
|                         unsigned int new_fde_len;
 | |
|                         /*
 | |
|                          * We need to save this to replay to new
 | |
|                          * slaves that attach later.
 | |
|                          */
 | |
|                         new_fde_len = hdr.event_size;
 | |
|                         new_fde = MXS_MALLOC(hdr.event_size);
 | |
| 
 | |
|                         if (new_fde)
 | |
|                         {
 | |
|                             memcpy(new_fde, ptr + 5, hdr.event_size);
 | |
|                             if (router->saved_master.fde_event)
 | |
|                             {
 | |
|                                 MXS_FREE(router->saved_master.fde_event);
 | |
|                             }
 | |
|                             router->saved_master.fde_event = new_fde;
 | |
|                             router->saved_master.fde_len = new_fde_len;
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     if (hdr.event_type == HEARTBEAT_EVENT)
 | |
|                     {
 | |
| #ifdef SHOW_EVENTS
 | |
|                         printf("Replication heartbeat\n");
 | |
| #endif
 | |
|                         MXS_DEBUG("Replication heartbeat. "
 | |
|                                   "Binlog %s @ %lu.",
 | |
|                                   router->binlog_name,
 | |
|                                   router->current_pos);
 | |
| 
 | |
|                         router->stats.n_heartbeats++;
 | |
| 
 | |
|                         if (router->pending_transaction)
 | |
|                         {
 | |
|                             router->stats.lastReply = time(0);
 | |
|                         }
 | |
|                     }
 | |
|                     else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
 | |
|                     {
 | |
|                         ptr = ptr + 4; // Skip header
 | |
|                         uint32_t offset = 4;
 | |
| 
 | |
|                         if (router->master_event_state == BLR_EVENT_STARTED ||
 | |
|                             router->master_event_state == BLR_EVENT_DONE)
 | |
|                         {
 | |
|                             ptr++;
 | |
|                             offset++;
 | |
|                         }
 | |
| 
 | |
|                         if (hdr.event_type == ROTATE_EVENT)
 | |
|                         {
 | |
|                             spinlock_acquire(&router->binlog_lock);
 | |
|                             router->rotating = 1;
 | |
|                             spinlock_release(&router->binlog_lock);
 | |
|                         }
 | |
| 
 | |
|                         /* Current event is being written to disk file.
 | |
|                          * It is possible for an empty packet to be sent if an
 | |
|                          * event is exactly 2^24 bytes long. In this case the
 | |
|                          * empty packet should be discarded. */
 | |
|                         if (len > MYSQL_HEADER_LEN &&
 | |
|                             blr_write_binlog_record(router, &hdr, len - offset - semisync_bytes, ptr) == 0)
 | |
|                         {
 | |
|                             /*
 | |
|                              * Failed to write to the
 | |
|                              * binlog file, destroy the
 | |
|                              * buffer chain and close the
 | |
|                              * connection with the master
 | |
|                              */
 | |
|                             while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
 | |
|                             {
 | |
|                                 ;
 | |
|                             }
 | |
|                             blr_master_close(router);
 | |
|                             blr_master_delayed_connect(router);
 | |
|                             return;
 | |
|                         }
 | |
| 
 | |
|                         /* Check for rotete event */
 | |
|                         if (hdr.event_type == ROTATE_EVENT)
 | |
|                         {
 | |
|                             if (!blr_rotate_event(router, ptr, &hdr))
 | |
|                             {
 | |
|                                 /*
 | |
|                                  * Failed to write to the
 | |
|                                  * binlog file, destroy the
 | |
|                                  * buffer chain and close the
 | |
|                                  * connection with the master
 | |
|                                  */
 | |
|                                 while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
 | |
|                                 {
 | |
|                                     ;
 | |
|                                 }
 | |
|                                 blr_master_close(router);
 | |
|                                 blr_master_delayed_connect(router);
 | |
|                                 return;
 | |
|                             }
 | |
|                         }
 | |
| 
 | |
|                         /* Handle semi-sync request fom master */
 | |
|                         if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE &&
 | |
|                             semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ &&
 | |
|                             (router->master_event_state == BLR_EVENT_COMPLETE ||
 | |
|                             router->master_event_state == BLR_EVENT_DONE))
 | |
|                         {
 | |
| 
 | |
|                             MXS_DEBUG("%s: binlog record in file %s, pos %lu has "
 | |
|                                       "SEMI_SYNC_ACK_REQ and needs a Semi-Sync ACK packet to "
 | |
|                                       "be sent to the master server %s:%d",
 | |
|                                       router->service->name, router->binlog_name,
 | |
|                                       router->current_pos,
 | |
|                                       router->service->dbref->server->name,
 | |
|                                       router->service->dbref->server->port);
 | |
| 
 | |
|                             /* Send Semi-Sync ACK packet to master server */
 | |
|                             blr_send_semisync_ack(router, hdr.next_pos);
 | |
| 
 | |
|                             /* Reset ACK sending */
 | |
|                             semi_sync_send_ack = 0;
 | |
|                         }
 | |
| 
 | |
|                         /**
 | |
|                          * Distributing binlog events to slaves
 | |
|                          * may depend on pending transaction
 | |
|                          */
 | |
| 
 | |
|                         spinlock_acquire(&router->binlog_lock);
 | |
| 
 | |
|                         if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == BLRM_NO_TRANSACTION))
 | |
|                         {
 | |
|                             router->binlog_position = router->current_pos;
 | |
|                             router->current_safe_event = router->last_event_pos;
 | |
| 
 | |
|                             spinlock_release(&router->binlog_lock);
 | |
| 
 | |
|                             if (router->master_event_state == BLR_EVENT_COMPLETE)
 | |
|                             {
 | |
|                                 /** Read the complete event from the disk */
 | |
|                                 GWBUF *record = blr_read_events_from_pos(router,
 | |
|                                                                          router->last_event_pos,
 | |
|                                                                          &hdr, hdr.next_pos);
 | |
|                                 if (record)
 | |
|                                 {
 | |
|                                     uint8_t *data = GWBUF_DATA(record);
 | |
|                                     blr_distribute_binlog_record(router, &hdr, data,
 | |
|                                                                  BLR_THREAD_ROLE_MASTER_LARGE_NOTRX);
 | |
|                                     gwbuf_free(record);
 | |
|                                 }
 | |
|                                 else
 | |
|                                 {
 | |
|                                     MXS_ERROR("Failed to read event at position"
 | |
|                                               "%lu with a size of %u bytes.",
 | |
|                                               router->last_event_pos, hdr.event_size);
 | |
|                                 }
 | |
|                             }
 | |
|                             else
 | |
|                             {
 | |
|                                 /* Now distribute events */
 | |
|                                 blr_distribute_binlog_record(router, &hdr, ptr,
 | |
|                                                              BLR_THREAD_ROLE_MASTER_NOTRX);
 | |
|                             }
 | |
|                         }
 | |
|                         else
 | |
|                         {
 | |
|                             /**
 | |
|                              * If transaction is closed:
 | |
|                              *
 | |
|                              * 1) read current binlog starting
 | |
|                              *  from router->binlog_position
 | |
|                              *
 | |
|                              * 2) distribute read event
 | |
|                              *
 | |
|                              * 3) set router->binlog_position to
 | |
|                              *    router->current_pos
 | |
|                              *
 | |
|                              */
 | |
| 
 | |
|                             if (router->pending_transaction > BLRM_TRANSACTION_START)
 | |
|                             {
 | |
|                                 unsigned long long pos;
 | |
|                                 unsigned long long end_pos;
 | |
|                                 GWBUF *record;
 | |
|                                 uint8_t *raw_data;
 | |
|                                 REP_HEADER new_hdr;
 | |
| 
 | |
|                                 pos = router->binlog_position;
 | |
|                                 end_pos = router->current_pos;
 | |
| 
 | |
|                                 spinlock_release(&router->binlog_lock);
 | |
| 
 | |
|                                 while ((record = blr_read_events_from_pos(router,
 | |
|                                                                           pos,
 | |
|                                                                           &new_hdr,
 | |
|                                                                           end_pos)) != NULL)
 | |
|                                 {
 | |
|                                     raw_data = GWBUF_DATA(record);
 | |
| 
 | |
|                                     /* distribute event */
 | |
|                                     blr_distribute_binlog_record(router, &new_hdr, raw_data,
 | |
|                                                                  BLR_THREAD_ROLE_MASTER_TRX);
 | |
| 
 | |
|                                     spinlock_acquire(&router->binlog_lock);
 | |
| 
 | |
|                                     /** The current safe position is only updated
 | |
|                                     * if it points to the event we just distributed. */
 | |
|                                     if (router->current_safe_event == pos)
 | |
|                                     {
 | |
|                                         router->current_safe_event = new_hdr.next_pos;
 | |
|                                     }
 | |
|                                     else
 | |
|                                     {
 | |
|                                         MXS_ERROR("Current safe event (%lu) does"
 | |
|                                                   " not point at the event we "
 | |
|                                                   "just sent (%llu) from binlog file %s. "
 | |
|                                                   "Last commit at %lu, last write at %lu.",
 | |
|                                                   router->current_safe_event, pos,
 | |
|                                                   router->binlog_name, router->last_safe_pos,
 | |
|                                                   router->last_written);
 | |
|                                     }
 | |
| 
 | |
|                                     pos = new_hdr.next_pos;
 | |
| 
 | |
|                                     spinlock_release(&router->binlog_lock);
 | |
| 
 | |
|                                     gwbuf_free(record);
 | |
|                                 }
 | |
| 
 | |
|                                 /* Check whether binlog records has been read in previous loop */
 | |
|                                 if (pos < router->current_pos)
 | |
|                                 {
 | |
|                                     char err_message[BINLOG_ERROR_MSG_LEN + 1];
 | |
| 
 | |
|                                     err_message[BINLOG_ERROR_MSG_LEN] = '\0';
 | |
| 
 | |
|                                     /* No event has been sent */
 | |
|                                     if (pos == router->binlog_position)
 | |
|                                     {
 | |
|                                         MXS_ERROR("No events distributed to slaves for a pending "
 | |
|                                                   "transaction in %s at %lu. "
 | |
|                                                   "Last event from master at %lu",
 | |
|                                                   router->binlog_name,
 | |
|                                                   router->binlog_position,
 | |
|                                                   router->current_pos);
 | |
| 
 | |
|                                         static const char MESSAGE[] = "No transaction events sent";
 | |
|                                         ss_dassert(sizeof(MESSAGE) <= BINLOG_ERROR_MSG_LEN);
 | |
|                                         strcpy(err_message, MESSAGE);
 | |
|                                     }
 | |
|                                     else
 | |
|                                     {
 | |
|                                         /* Some events have been sent */
 | |
|                                         MXS_ERROR("Some events were not distributed to slaves for a "
 | |
|                                                   "pending transaction in %s at %lu. Last distributed "
 | |
|                                                   "even at %llu, last event from master at %lu",
 | |
|                                                   router->binlog_name,
 | |
|                                                   router->binlog_position,
 | |
|                                                   pos,
 | |
|                                                   router->current_pos);
 | |
| 
 | |
|                                         static const char MESSAGE[] = "Incomplete transaction events sent";
 | |
|                                         ss_dassert(sizeof(MESSAGE) <= BINLOG_ERROR_MSG_LEN);
 | |
|                                         strcpy(err_message, MESSAGE);
 | |
|                                     }
 | |
| 
 | |
|                                     /* distribute error message to registered slaves */
 | |
|                                     blr_distribute_error_message(router, err_message, "HY000", 1236);
 | |
|                                 }
 | |
| 
 | |
|                                 /* update binlog_position and set pending to 0 */
 | |
|                                 spinlock_acquire(&router->binlog_lock);
 | |
| 
 | |
|                                 router->binlog_position = router->current_pos;
 | |
|                                 router->pending_transaction = BLRM_NO_TRANSACTION;
 | |
| 
 | |
|                                 spinlock_release(&router->binlog_lock);
 | |
|                             }
 | |
|                             else
 | |
|                             {
 | |
|                                 spinlock_release(&router->binlog_lock);
 | |
|                             }
 | |
|                         }
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         router->stats.n_artificial++;
 | |
|                         MXS_DEBUG("Artificial event not written "
 | |
|                                   "to disk or distributed. "
 | |
|                                   "Type 0x%x, Length %d, Binlog "
 | |
|                                   "%s @ %lu.",
 | |
|                                   hdr.event_type,
 | |
|                                   hdr.event_size,
 | |
|                                   router->binlog_name,
 | |
|                                   router->current_pos);
 | |
|                         ptr += 5;
 | |
|                         if (hdr.event_type == ROTATE_EVENT)
 | |
|                         {
 | |
|                             spinlock_acquire(&router->binlog_lock);
 | |
|                             router->rotating = 1;
 | |
|                             spinlock_release(&router->binlog_lock);
 | |
|                             if (!blr_rotate_event(router, ptr, &hdr))
 | |
|                             {
 | |
|                                 /*
 | |
|                                  * Failed to write to the
 | |
|                                  * binlog file, destroy the
 | |
|                                  * buffer chain and close the
 | |
|                                  * connection with the master
 | |
|                                  */
 | |
|                                 while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
 | |
|                                 {
 | |
|                                     ;
 | |
|                                 }
 | |
|                                 blr_master_close(router);
 | |
|                                 blr_master_delayed_connect(router);
 | |
|                                 return;
 | |
|                             }
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 /** A large event is now fully received and processed */
 | |
|                 if (router->master_event_state == BLR_EVENT_COMPLETE)
 | |
|                 {
 | |
|                     router->master_event_state = BLR_EVENT_DONE;
 | |
|                 }
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 blr_terminate_master_replication(router, ptr, len);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         if (msg)
 | |
|         {
 | |
|             MXS_FREE(msg);
 | |
|             msg = NULL;
 | |
|         }
 | |
|         prev_length = len;
 | |
|         while (len > 0)
 | |
|         {
 | |
|             unsigned int n, plen;
 | |
|             plen = GWBUF_LENGTH(pkt);
 | |
|             n = (plen < len ? plen : len);
 | |
|             pkt = gwbuf_consume(pkt, n);
 | |
|             len -= n;
 | |
|             pkt_length -= n;
 | |
|         }
 | |
|         preslen = reslen;
 | |
|         pn_bufs = n_bufs;
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|      * Check if we have a residual, part binlog message to deal with.
 | |
|      * Just simply store the GWBUF for next time
 | |
|      */
 | |
|     if (pkt)
 | |
|     {
 | |
|         router->residual = pkt;
 | |
|         ss_dassert(pkt_length != 0);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         ss_dassert(pkt_length == 0);
 | |
|     }
 | |
|     blr_file_flush(router);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Populate a header structure for a replication message from a GWBUF structure.
 | |
|  *
 | |
|  * @param pkt   The incoming packet in a GWBUF chain
 | |
|  * @param hdr   The packet header to populate
 | |
|  */
 | |
| void
 | |
| blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
 | |
| {
 | |
| 
 | |
|     hdr->payload_len = EXTRACT24(ptr);
 | |
|     hdr->seqno = ptr[3];
 | |
|     hdr->ok = ptr[4];
 | |
|     hdr->timestamp = EXTRACT32(&ptr[5]);
 | |
|     hdr->event_type = ptr[9];
 | |
|     hdr->serverid = EXTRACT32(&ptr[10]);
 | |
|     hdr->event_size = EXTRACT32(&ptr[14]);
 | |
|     hdr->next_pos = EXTRACT32(&ptr[18]);
 | |
|     hdr->flags = EXTRACT16(&ptr[22]);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Process a binlog rotate event.
 | |
|  *
 | |
|  * @param router    The instance of the router
 | |
|  * @param ptr       The packet containing the rotate event
 | |
|  * @param hdr       The replication message header
 | |
|  * @return          1 if the file could be rotated, 0 otherwise.
 | |
|  */
 | |
| static int
 | |
| blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
 | |
| {
 | |
|     int len, slen;
 | |
|     uint64_t pos;
 | |
|     char file[BINLOG_FNAMELEN + 1];
 | |
| 
 | |
|     ptr += 19;      // Skip event header
 | |
|     len = hdr->event_size - 19; // Event size minus header
 | |
|     pos = extract_field(ptr + 4, 32);
 | |
|     pos <<= 32;
 | |
|     pos |= extract_field(ptr, 32);
 | |
|     slen = len - (8 + 4);       // Allow for position and CRC
 | |
|     if (router->master_chksum == 0)
 | |
|     {
 | |
|         slen += 4;
 | |
|     }
 | |
|     if (slen > BINLOG_FNAMELEN)
 | |
|     {
 | |
|         slen = BINLOG_FNAMELEN;
 | |
|     }
 | |
|     memcpy(file, ptr + 8, slen);
 | |
|     file[slen] = 0;
 | |
| 
 | |
| #ifdef VERBOSE_ROTATE
 | |
|     printf("binlog rotate: ");
 | |
|     while (len--)
 | |
|     {
 | |
|         printf("0x%02x ", *ptr++);
 | |
|     }
 | |
|     printf("\n");
 | |
|     printf("New file: %s @ %ld\n", file, pos);
 | |
| #endif
 | |
| 
 | |
|     strcpy(router->prevbinlog, router->binlog_name);
 | |
| 
 | |
|     int rotated = 1;
 | |
| 
 | |
|     if (strncmp(router->binlog_name, file, slen) != 0)
 | |
|     {
 | |
|         router->stats.n_rotates++;
 | |
|         if (blr_file_rotate(router, file, pos) == 0)
 | |
|         {
 | |
|             rotated = 0;
 | |
|         }
 | |
|     }
 | |
|     spinlock_acquire(&router->binlog_lock);
 | |
|     router->rotating = 0;
 | |
|     spinlock_release(&router->binlog_lock);
 | |
|     return rotated;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Create the auth data needed to be able to call dcb_connect.
 | |
|  *
 | |
|  * This doesn't really belong here and should be moved at some stage.
 | |
|  */
 | |
| static void *
 | |
| CreateMySQLAuthData(char *username, char *password, char *database)
 | |
| {
 | |
|     MYSQL_session *auth_info;
 | |
| 
 | |
|     if (username == NULL || password == NULL)
 | |
|     {
 | |
|         MXS_ERROR("You must specify both username and password for the binlog router.");
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     if (strlen(username) > MYSQL_USER_MAXLEN)
 | |
|     {
 | |
|         MXS_ERROR("Provided user name %s is longer than maximum length %d.",
 | |
|                   username, MYSQL_USER_MAXLEN);
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     if (strlen(database) > MYSQL_DATABASE_MAXLEN)
 | |
|     {
 | |
|         MXS_ERROR("Provided database %s is longer than maximum length %d.",
 | |
|                   database, MYSQL_DATABASE_MAXLEN);
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     if ((auth_info = MXS_CALLOC(1, sizeof(MYSQL_session))) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     strcpy(auth_info->user, username);
 | |
|     strcpy(auth_info->db, database);
 | |
|     gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1);
 | |
| 
 | |
|     return auth_info;
 | |
| }
 | |
| 
 | |
| /** Actions that can be taken when an event is being distributed to the slaves*/
 | |
| typedef enum
 | |
| {
 | |
|     SLAVE_SEND_EVENT, /*< Send the event to the slave */
 | |
|     SLAVE_FORCE_CATCHUP, /*< Force the slave into catchup mode */
 | |
|     SLAVE_EVENT_ALREADY_SENT /*< The slave already has the event, don't send it */
 | |
| } slave_event_action_t;
 | |
| 
 | |
| /**
 | |
|  * Distribute the binlog record we have just received to all the registered slaves.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  * @param   hdr     The replication event header
 | |
|  * @param   ptr     The raw replication event data
 | |
|  */
 | |
| void
 | |
| blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr,
 | |
|                              blr_thread_role_t role)
 | |
| {
 | |
|     ROUTER_SLAVE *slave;
 | |
|     int action;
 | |
|     unsigned int cstate;
 | |
| 
 | |
|     spinlock_acquire(&router->lock);
 | |
|     slave = router->slaves;
 | |
|     while (slave)
 | |
|     {
 | |
|         if (slave->state != BLRS_DUMPING)
 | |
|         {
 | |
|             slave = slave->next;
 | |
|             continue;
 | |
|         }
 | |
|         spinlock_acquire(&slave->catch_lock);
 | |
|         if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == CS_UPTODATE)
 | |
|         {
 | |
|             /*
 | |
|              * This slave is reporting it is to date with the binlog of the
 | |
|              * master running on this slave.
 | |
|              * It has no thread running currently that is sending binlog
 | |
|              * events.
 | |
|              */
 | |
|             action = 1;
 | |
|             slave->cstate |= CS_BUSY;
 | |
|         }
 | |
|         else if ((slave->cstate & (CS_UPTODATE | CS_BUSY)) == (CS_UPTODATE | CS_BUSY))
 | |
|         {
 | |
|             /*
 | |
|              * The slave is up to date with the binlog and a process is
 | |
|              * running on this slave to send binlog events.
 | |
|              */
 | |
|             slave->overrun = 1;
 | |
|             action = 2;
 | |
|         }
 | |
|         else if ((slave->cstate & CS_UPTODATE) == 0)
 | |
|         {
 | |
|             /* Slave is in catchup mode */
 | |
|             action = 3;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("slave->cstate does not contain a meaningful state %d", slave->cstate);
 | |
|             action = 0;
 | |
|         }
 | |
|         slave->stats.n_actions[action - 1]++;
 | |
|         spinlock_release(&slave->catch_lock);
 | |
| 
 | |
|         if (action == 1)
 | |
|         {
 | |
|             spinlock_acquire(&router->binlog_lock);
 | |
| 
 | |
|             slave_event_action_t slave_action = SLAVE_FORCE_CATCHUP;
 | |
|             const bool same_file = strcmp(slave->binlogfile, router->binlog_name) == 0;
 | |
|             const bool rotate = hdr->event_type == ROTATE_EVENT &&
 | |
|                 strcmp(slave->binlogfile, router->prevbinlog) == 0;
 | |
| 
 | |
|             if (router->trx_safe && (same_file || rotate) &&
 | |
|                 slave->binlog_pos == router->current_safe_event)
 | |
|             {
 | |
|                 /** Slave needs the current event being distributed */
 | |
|                 slave_action = SLAVE_SEND_EVENT;
 | |
|             }
 | |
|             else if (!router->trx_safe && (same_file || rotate) &&
 | |
|                      slave->binlog_pos == router->last_event_pos)
 | |
|             {
 | |
|                 /** Transaction safety is off */
 | |
|                 slave_action = SLAVE_SEND_EVENT;
 | |
|             }
 | |
|             else if (same_file)
 | |
|             {
 | |
|                 if (slave->binlog_pos == hdr->next_pos)
 | |
|                 {
 | |
|                     /*
 | |
|                      * Slave has already read record from file, no
 | |
|                      * need to distrbute this event
 | |
|                      */
 | |
|                     slave_action = SLAVE_EVENT_ALREADY_SENT;
 | |
|                 }
 | |
|                 else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size))
 | |
|                 {
 | |
|                     /*
 | |
|                      * The slave is ahead of the master, this should never
 | |
|                      * happen. Force the slave to catchup mode in order to
 | |
|                      * try to resolve the issue.
 | |
|                      */
 | |
|                     MXS_ERROR("Slave %s:%d server ID %d is ahead of expected position %s@%u. "
 | |
|                               "Expected position %d", slave->dcb->remote,
 | |
|                               ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
 | |
|                               slave->binlogfile, slave->binlog_pos,
 | |
|                               hdr->next_pos - hdr->event_size);
 | |
|                 }
 | |
|                 /* If none of the above conditions were met, a slave in catchup
 | |
|                  * mode transitioned into up-to-date mode while we were
 | |
|                  * distributing events. The slave needs to be forced into
 | |
|                  * catchup mode since it isn't up to date anymore. */
 | |
|             }
 | |
|             else if (rotate)
 | |
|             {
 | |
|                 /** Slave is more than one binlog file behind */
 | |
|                 MXS_WARNING("Slave %s:%d server ID %d is behind more than one binlog file "
 | |
|                           "from the master. Slave is using '%s' with position %d "
 | |
|                           "when master binlog file is '%s'.", slave->dcb->remote,
 | |
|                           ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
 | |
|                           slave->binlogfile, slave->binlog_pos, router->binlog_name);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 /** Slave is lagging behind */
 | |
|                 MXS_WARNING("Slave %s:%d server ID %d is using binlog file '%s' with "
 | |
|                           "position %d. Master binlog file is '%s' at position %lu "
 | |
|                           "with last safe event at %lu.", slave->dcb->remote,
 | |
|                           ntohs((slave->dcb->ipv4).sin_port), slave->serverid,
 | |
|                           slave->binlogfile, slave->binlog_pos, router->binlog_name,
 | |
|                           router->current_pos, router->current_safe_event);
 | |
|             }
 | |
| 
 | |
|             spinlock_release(&router->binlog_lock);
 | |
| 
 | |
|             /*
 | |
|              * If slave_action is SLAVE_FORCE_CATCHUP then
 | |
|              * the slave is not at the position it should be. Force it into
 | |
|              * catchup mode rather than send this event.
 | |
|              */
 | |
| 
 | |
|             switch (slave_action)
 | |
|             {
 | |
|                 char binlog_name[BINLOG_FNAMELEN + 1];
 | |
|                 uint32_t binlog_pos;
 | |
| 
 | |
|             case SLAVE_SEND_EVENT:
 | |
|                 /*
 | |
|                  * The slave should be up to date, check that the binlog
 | |
|                  * position matches the event we have to distribute or
 | |
|                  * this is a rotate event. Send the event directly from
 | |
|                  * memory to the slave.
 | |
|                  */
 | |
|                 slave->lastEventTimestamp = hdr->timestamp;
 | |
|                 slave->lastEventReceived = hdr->event_type;
 | |
| 
 | |
|                 /* set lastReply */
 | |
|                 if (router->send_slave_heartbeat)
 | |
|                 {
 | |
|                     slave->lastReply = time(0);
 | |
|                 }
 | |
| 
 | |
|                 strcpy(binlog_name, slave->binlogfile);
 | |
|                 binlog_pos = slave->binlog_pos;
 | |
| 
 | |
|                 if (hdr->event_type == ROTATE_EVENT)
 | |
|                 {
 | |
|                     blr_slave_rotate(router, slave, ptr);
 | |
|                 }
 | |
| 
 | |
|                 if (blr_send_event(role, binlog_name, binlog_pos, slave, hdr, ptr))
 | |
|                 {
 | |
|                     spinlock_acquire(&slave->catch_lock);
 | |
|                     if (hdr->event_type != ROTATE_EVENT)
 | |
|                     {
 | |
|                         slave->binlog_pos = hdr->next_pos;
 | |
|                     }
 | |
|                     if (slave->overrun)
 | |
|                     {
 | |
|                         slave->stats.n_overrun++;
 | |
|                         slave->overrun = 0;
 | |
|                         poll_fake_write_event(slave->dcb);
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         slave->cstate &= ~CS_BUSY;
 | |
|                     }
 | |
|                     spinlock_release(&slave->catch_lock);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     MXS_WARNING("Slave %s:%i, server-id %d, binlog '%s, position %u: "
 | |
|                                 "Master-thread could not send event to slave, closing connection.",
 | |
|                                 slave->dcb->remote,
 | |
|                                 ntohs((slave->dcb->ipv4).sin_port),
 | |
|                                 slave->serverid,
 | |
|                                 binlog_name,
 | |
|                                 binlog_pos);
 | |
|                     slave->state = BLRS_ERRORED;
 | |
|                     dcb_close(slave->dcb);
 | |
|                 }
 | |
|                 break;
 | |
| 
 | |
|             case SLAVE_EVENT_ALREADY_SENT:
 | |
|                 spinlock_acquire(&slave->catch_lock);
 | |
|                 slave->cstate &= ~CS_BUSY;
 | |
|                 spinlock_release(&slave->catch_lock);
 | |
|                 break;
 | |
| 
 | |
|             case SLAVE_FORCE_CATCHUP:
 | |
|                 spinlock_acquire(&slave->catch_lock);
 | |
|                 cstate = slave->cstate;
 | |
|                 slave->cstate &= ~(CS_UPTODATE | CS_BUSY);
 | |
|                 slave->cstate |= CS_EXPECTCB;
 | |
|                 spinlock_release(&slave->catch_lock);
 | |
|                 if ((cstate & CS_UPTODATE) == CS_UPTODATE)
 | |
|                 {
 | |
| #ifdef STATE_CHANGE_LOGGING_ENABLED
 | |
|                     MXS_NOTICE("%s: Slave %s:%d, server-id %d transition from "
 | |
|                                "up-to-date to catch-up in blr_distribute_binlog_record, "
 | |
|                                "binlog file '%s', position %lu.",
 | |
|                                router->service->name,
 | |
|                                slave->dcb->remote,
 | |
|                                ntohs((slave->dcb->ipv4).sin_port),
 | |
|                                slave->serverid,
 | |
|                                slave->binlogfile, (unsigned long)slave->binlog_pos);
 | |
| #endif
 | |
|                 }
 | |
|                 poll_fake_write_event(slave->dcb);
 | |
|                 break;
 | |
|             }
 | |
|         }
 | |
|         else if (action == 3)
 | |
|         {
 | |
|             /* Slave is not up to date
 | |
|              * Check if it is either expecting a callback or
 | |
|              * is busy processing a callback
 | |
|              */
 | |
|             spinlock_acquire(&slave->catch_lock);
 | |
|             if ((slave->cstate & (CS_EXPECTCB | CS_BUSY)) == 0)
 | |
|             {
 | |
|                 slave->cstate |= CS_EXPECTCB;
 | |
|                 spinlock_release(&slave->catch_lock);
 | |
|                 poll_fake_write_event(slave->dcb);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 spinlock_release(&slave->catch_lock);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         slave = slave->next;
 | |
|     }
 | |
|     spinlock_release(&router->lock);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Write a raw event (the first 40 bytes at most) to a log file
 | |
|  *
 | |
|  * @param priority The syslog priority of the message (LOG_ERR, LOG_WARNING, etc.)
 | |
|  * @param msg      A textual message to write before the packet
 | |
|  * @param ptr      Pointer to the message buffer
 | |
|  * @param len      Length of message packet
 | |
|  */
 | |
| static void
 | |
| blr_log_packet(int priority, char *msg, uint8_t *ptr, int len)
 | |
| {
 | |
|     char buf[400] = "";
 | |
|     char *bufp;
 | |
|     int i;
 | |
| 
 | |
|     bufp = buf;
 | |
|     bufp += sprintf(bufp, "%s length = %d: ", msg, len);
 | |
|     for (i = 0; i < len && i < 40; i++)
 | |
|     {
 | |
|         bufp += sprintf(bufp, "0x%02x ", ptr[i]);
 | |
|     }
 | |
|     if (i < len)
 | |
|     {
 | |
|         MXS_LOG_MESSAGE(priority, "%s...", buf);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_LOG_MESSAGE(priority, "%s", buf);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check if the master connection is in place and we
 | |
|  * are downlaoding binlogs
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @return non-zero if we are recivign binlog records
 | |
|  */
 | |
| int
 | |
| blr_master_connected(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     return router->master_state == BLRM_BINLOGDUMP;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Extract a result value from the set of messages that make up a
 | |
|  * MySQL response packet.
 | |
|  *
 | |
|  * @param buf   The GWBUF containing the response
 | |
|  * @param col   The column number to return
 | |
|  * @return  The result form the column or NULL. The caller must free the result
 | |
|  */
 | |
| char *
 | |
| blr_extract_column(GWBUF *buf, int col)
 | |
| {
 | |
|     uint8_t *ptr;
 | |
|     int len, ncol, collen;
 | |
|     char    *rval;
 | |
| 
 | |
|     if (buf == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     ptr = (uint8_t *)GWBUF_DATA(buf);
 | |
|     /* First packet should be the column count */
 | |
|     len = EXTRACT24(ptr);
 | |
|     ptr += 3;
 | |
|     if (*ptr != 1)      // Check sequence number is 1
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     ptr++;
 | |
|     ncol = *ptr++;
 | |
|     if (ncol < col)     // Not that many column in result
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     // Now ptr points at the column definition
 | |
|     while (ncol-- > 0)
 | |
|     {
 | |
|         len = EXTRACT24(ptr);
 | |
|         ptr += 4;   // Skip to payload
 | |
|         ptr += len; // Skip over payload
 | |
|     }
 | |
|     // Now we should have an EOF packet
 | |
|     len = EXTRACT24(ptr);
 | |
|     ptr += 4;       // Skip to payload
 | |
|     if (*ptr != 0xfe)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     ptr += len;
 | |
| 
 | |
|     // Finally we have reached the row
 | |
|     len = EXTRACT24(ptr);
 | |
|     ptr += 4;
 | |
| 
 | |
|     /** The first EOF packet signals the start of the resultset rows and the second
 | |
|      EOF packet signals the end of the result set. If the resultset
 | |
|      contains a second EOF packet right after the first one, the result set is empty and
 | |
|      contains no rows. */
 | |
|     if (len == 5 && *ptr == 0xfe)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     while (--col > 0)
 | |
|     {
 | |
|         collen = *ptr++;
 | |
|         ptr += collen;
 | |
|     }
 | |
|     collen = *ptr++;
 | |
|     if ((rval = MXS_MALLOC(collen + 1)) == NULL)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
|     memcpy(rval, ptr, collen);
 | |
|     rval[collen] = 0;       // NULL terminate
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Read a replication event form current opened binlog into a GWBUF structure.
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  * @param pos       Position of binlog record to read
 | |
|  * @param hdr       Binlog header to populate
 | |
|  * @return      The binlog record wrapped in a GWBUF structure
 | |
|  */
 | |
| GWBUF
 | |
| *blr_read_events_from_pos(ROUTER_INSTANCE *router,
 | |
|                           unsigned long long pos,
 | |
|                           REP_HEADER *hdr,
 | |
|                           unsigned long long pos_end)
 | |
| {
 | |
|     unsigned long long end_pos = 0;
 | |
|     uint8_t hdbuf[19];
 | |
|     uint8_t *data;
 | |
|     GWBUF *result;
 | |
|     int n;
 | |
|     int event_limit;
 | |
| 
 | |
|     /* Get current binnlog position */
 | |
|     end_pos = pos_end;
 | |
| 
 | |
|     /* end of file reached, we're done */
 | |
|     if (pos == end_pos)
 | |
|     {
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     /* error */
 | |
|     if (pos > end_pos)
 | |
|     {
 | |
|         MXS_ERROR("Reading saved events, the specified pos %llu "
 | |
|                   "is ahead of current pos %lu for file %s",
 | |
|                   pos, router->current_pos, router->binlog_name);
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     /* Read the event header information from the file */
 | |
|     if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19)
 | |
|     {
 | |
|         switch (n)
 | |
|         {
 | |
|         case 0:
 | |
|             MXS_DEBUG("Reading saved events: reached end of binlog file at %llu.", pos);
 | |
|             break;
 | |
|         case -1:
 | |
|             {
 | |
|                 char err_msg[STRERROR_BUFLEN];
 | |
|                 MXS_ERROR("Reading saved events: failed to read binlog "
 | |
|                           "file %s at position %llu"
 | |
|                           " (%s).", router->binlog_name,
 | |
|                           pos, strerror_r(errno, err_msg, sizeof(err_msg)));
 | |
| 
 | |
|                 if (errno == EBADF)
 | |
|                 {
 | |
|                     MXS_ERROR("Reading saved events: bad file descriptor for file %s"
 | |
|                               ", descriptor %d.",
 | |
|                               router->binlog_name, router->binlog_fd);
 | |
|                 }
 | |
|             }
 | |
|             break;
 | |
|         default:
 | |
|             MXS_ERROR("Reading saved events: short read when reading the header. "
 | |
|                       "Expected 19 bytes but got %d bytes. "
 | |
|                       "Binlog file is %s, position %llu",
 | |
|                       n, router->binlog_name, pos);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     hdr->timestamp = EXTRACT32(hdbuf);
 | |
|     hdr->event_type = hdbuf[4];
 | |
|     hdr->serverid = EXTRACT32(&hdbuf[5]);
 | |
|     hdr->event_size = extract_field(&hdbuf[9], 32);
 | |
|     hdr->next_pos = EXTRACT32(&hdbuf[13]);
 | |
|     hdr->flags = EXTRACT16(&hdbuf[17]);
 | |
| 
 | |
|     event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
 | |
| 
 | |
|     if (hdr->event_type > event_limit)
 | |
|     {
 | |
|         MXS_ERROR("Reading saved events: invalid event type 0x%x. "
 | |
|                   "Binlog file is %s, position %llu",
 | |
|                   hdr->event_type,
 | |
|                   router->binlog_name, pos);
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
 | |
|     {
 | |
|         MXS_ERROR("Reading saved events: failed to allocate memory for binlog entry, "
 | |
|                   "size %d at %llu.",
 | |
|                   hdr->event_size, pos);
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     /* Copy event header*/
 | |
|     data = GWBUF_DATA(result);
 | |
|     memcpy(data, hdbuf, 19);
 | |
| 
 | |
|     /* Read event data and put int into buffer after header */
 | |
|     if ((n = pread(router->binlog_fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19)
 | |
|     {
 | |
|         if (n == -1)
 | |
|         {
 | |
|             char err_msg[STRERROR_BUFLEN];
 | |
|             MXS_ERROR("Reading saved events: the event at %llu in %s. "
 | |
|                       "%s, expected %d bytes.",
 | |
|                       pos, router->binlog_name,
 | |
|                       strerror_r(errno, err_msg, sizeof(err_msg)), hdr->event_size - 19);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("Reading saved events: short read when reading "
 | |
|                       "the event at %llu in %s. "
 | |
|                       "Expected %d bytes got %d bytes.",
 | |
|                       pos, router->binlog_name, hdr->event_size - 19, n);
 | |
| 
 | |
|             if (end_pos - pos < hdr->event_size)
 | |
|             {
 | |
|                 MXS_ERROR("Reading saved events: binlog event "
 | |
|                           "is close to the end of the binlog file, "
 | |
|                           "current file size is %llu.", end_pos);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         /* free buffer */
 | |
|         gwbuf_free(result);
 | |
| 
 | |
|         return NULL;
 | |
|     }
 | |
| 
 | |
|     return result;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Stop and start the master connection
 | |
|  *
 | |
|  * @param router    The router instance
 | |
|  */
 | |
| void
 | |
| blr_stop_start_master(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     if (router->master)
 | |
|     {
 | |
|         if (router->master->fd != -1 && router->master->state == DCB_STATE_POLLING)
 | |
|         {
 | |
|             blr_master_close(router);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     spinlock_acquire(&router->lock);
 | |
| 
 | |
|     router->master_state = BLRM_SLAVE_STOPPED;
 | |
| 
 | |
|     /* set last_safe_pos */
 | |
|     router->last_safe_pos = router->binlog_position;
 | |
| 
 | |
|     /**
 | |
|      * Set router->prevbinlog to router->binlog_name
 | |
|      * The FDE event with current filename may arrive after STOP SLAVE is received
 | |
|      */
 | |
| 
 | |
|     if (strcmp(router->binlog_name, router->prevbinlog) != 0)
 | |
|     {
 | |
|         strcpy(router->prevbinlog, router->binlog_name); // Same size
 | |
|     }
 | |
| 
 | |
|     if (router->client)
 | |
|     {
 | |
|         if (router->client->fd != -1 && router->client->state == DCB_STATE_POLLING)
 | |
|         {
 | |
|             dcb_close(router->client);
 | |
|             router->client = NULL;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /* Discard the queued residual data */
 | |
|     while (router->residual)
 | |
|     {
 | |
|         router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
 | |
|     }
 | |
|     router->residual = NULL;
 | |
| 
 | |
|     router->master_state = BLRM_UNCONNECTED;
 | |
|     spinlock_release(&router->lock);
 | |
| 
 | |
|     blr_master_reconnect(router);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The heartbeat check function called from the housekeeper.
 | |
|  * We can try a new master connection if current one is seen out of date
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  */
 | |
| 
 | |
| static void
 | |
| blr_check_last_master_event(void *inst)
 | |
| {
 | |
|     ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
 | |
|     int master_check = 1;
 | |
|     int master_state =  BLRM_UNCONNECTED;
 | |
|     char task_name[BLRM_TASK_NAME_LEN + 1] = "";
 | |
| 
 | |
|     spinlock_acquire(&router->lock);
 | |
| 
 | |
|     master_check = blr_check_heartbeat(router);
 | |
| 
 | |
|     master_state = router->master_state;
 | |
| 
 | |
|     spinlock_release(&router->lock);
 | |
| 
 | |
|     if (!master_check)
 | |
|     {
 | |
|         /*
 | |
|          * stop current master connection
 | |
|          * and try a new connection
 | |
|          */
 | |
|         blr_stop_start_master(router);
 | |
|     }
 | |
| 
 | |
|     if ( (!master_check) || (master_state != BLRM_BINLOGDUMP) )
 | |
|     {
 | |
|         /*
 | |
|          * Remove the task, it will be added again
 | |
|          * when master state is back to BLRM_BINLOGDUMP
 | |
|          * by blr_master_response()
 | |
|          */
 | |
|         snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);
 | |
| 
 | |
|         hktask_remove(task_name);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check last heartbeat or last received event against router->heartbeat time interval
 | |
|  *
 | |
|  * checked interval is againts (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME)
 | |
|  * that is currently set to 1
 | |
|  *
 | |
|  * @param router    Current router instance
 | |
|  * @return      0 if master connection must be closed and opened again, 1 otherwise
 | |
|  */
 | |
| 
 | |
| int
 | |
| blr_check_heartbeat(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     time_t t_now = time(0);
 | |
|     char *event_desc = NULL;
 | |
| 
 | |
|     if (router->master_state != BLRM_BINLOGDUMP)
 | |
|     {
 | |
|         return 1;
 | |
|     }
 | |
| 
 | |
|     event_desc = blr_last_event_description(router);
 | |
| 
 | |
|     if (router->master_state == BLRM_BINLOGDUMP && router->lastEventReceived > 0)
 | |
|     {
 | |
|         if ((t_now - router->stats.lastReply) > (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME))
 | |
|         {
 | |
|             MXS_ERROR("No event received from master %s:%d in heartbeat period (%lu seconds), "
 | |
|                       "last event (%s %d) received %lu seconds ago. Assuming connection is dead "
 | |
|                       "and reconnecting.",
 | |
|                       router->service->dbref->server->name,
 | |
|                       router->service->dbref->server->port,
 | |
|                       router->heartbeat,
 | |
|                       event_desc != NULL ? event_desc : "unknown",
 | |
|                       router->lastEventReceived,
 | |
|                       t_now - router->stats.lastReply);
 | |
| 
 | |
|             return 0;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Log binlog router identy after master registration, state is BLRM_BINLOG_DUMP
 | |
|  *
 | |
|  * @param   router  The router instance
 | |
|  */
 | |
| 
 | |
| static void blr_log_identity(ROUTER_INSTANCE *router)
 | |
| {
 | |
|     char *master_uuid;
 | |
|     char *master_hostname;
 | |
|     char *master_version;
 | |
| 
 | |
|     if (router->set_master_version)
 | |
|     {
 | |
|         master_version  = router->set_master_version;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         master_version = blr_extract_column(router->saved_master.selectver, 1);
 | |
|     }
 | |
| 
 | |
|     if (router->set_master_hostname)
 | |
|     {
 | |
|         master_hostname  = router->set_master_hostname;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         master_hostname = blr_extract_column(router->saved_master.selecthostname, 1);
 | |
|     }
 | |
| 
 | |
|     if (router->set_master_uuid)
 | |
|     {
 | |
|         master_uuid = router->master_uuid;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         master_uuid = blr_extract_column(router->saved_master.uuid, 2);
 | |
|     }
 | |
| 
 | |
|     /* Seen by the master */
 | |
|     MXS_NOTICE("%s: identity seen by the master: "
 | |
|                "server_id: %d, uuid: %s",
 | |
|                router->service->name,
 | |
|                router->serverid, (router->uuid == NULL ? "not available" : router->uuid));
 | |
| 
 | |
|     /* Seen by the slaves */
 | |
| 
 | |
|     /* MariaDB 5.5 and MariaDB don't have the MASTER_UUID var */
 | |
|     if (master_uuid == NULL)
 | |
|     {
 | |
|         MXS_NOTICE("%s: identity seen by the slaves: "
 | |
|                    "server_id: %d, hostname: %s, MySQL version: %s",
 | |
|                    router->service->name,
 | |
|                    router->masterid, (master_hostname == NULL ? "not available" : master_hostname),
 | |
|                    (master_version == NULL ? "not available" : master_version));
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_NOTICE("%s: identity seen by the slaves: "
 | |
|                    "server_id: %d, uuid: %s, hostname: %s, MySQL version: %s",
 | |
|                    router->service->name,
 | |
|                    router->masterid, master_uuid,
 | |
|                    (master_hostname == NULL ? "not available" : master_hostname),
 | |
|                    (master_version == NULL ? "not available" : master_version));
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Distribute an error message to all the registered slaves.
 | |
|  *
 | |
|  * @param   router      The router instance
 | |
|  * @param   message     The message to send
 | |
|  * @param   state       The MySQL State for message
 | |
|  * @param   err_code    The MySQL error code for message
 | |
|  */
 | |
| static void
 | |
| blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code)
 | |
| {
 | |
|     ROUTER_SLAVE *slave;
 | |
| 
 | |
|     spinlock_acquire(&router->lock);
 | |
| 
 | |
|     slave = router->slaves;
 | |
|     while (slave)
 | |
|     {
 | |
|         if (slave->state != BLRS_DUMPING)
 | |
|         {
 | |
|             slave = slave->next;
 | |
|             continue;
 | |
|         }
 | |
| 
 | |
|         /* send the error that stops slave replication */
 | |
|         blr_send_custom_error(slave->dcb, slave->seqno++, 0, message, state, err_code);
 | |
| 
 | |
|         slave = slave->next;
 | |
|     }
 | |
| 
 | |
|     spinlock_release(&router->lock);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Write data into binlogs (incomplete event)
 | |
|  *
 | |
|  * Writes @c data_len bytes of data from @c buf into the current binlog being processed.
 | |
|  *
 | |
|  * @param router Router instance
 | |
|  * @param data_len Number of bytes to write
 | |
|  * @param buf Pointer where the data is read
 | |
|  * @return Number of bytes written or 0 on error
 | |
|  */
 | |
| int
 | |
| blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf)
 | |
| {
 | |
|     int n;
 | |
| 
 | |
|     if ((n = pwrite(router->binlog_fd, buf, data_len,
 | |
|                     router->last_written)) != data_len)
 | |
|     {
 | |
|         char err_msg[STRERROR_BUFLEN];
 | |
|         MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
 | |
|                   "Truncating to previous record.",
 | |
|                   router->service->name, router->last_written,
 | |
|                   router->binlog_name,
 | |
|                   strerror_r(errno, err_msg, sizeof(err_msg)));
 | |
| 
 | |
|         /* Remove any partial event that was written */
 | |
|         if (ftruncate(router->binlog_fd, router->last_written))
 | |
|         {
 | |
|             MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
 | |
|                       router->service->name, router->last_written,
 | |
|                       router->binlog_name,
 | |
|                       strerror_r(errno, err_msg, sizeof(err_msg)));
 | |
|         }
 | |
|         return 0;
 | |
|     }
 | |
|     router->last_written += data_len;
 | |
|     return n;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send a replication event packet to a slave
 | |
|  *
 | |
|  * The first replication event packet contains one byte set to either
 | |
|  * 0x0, 0xfe or 0xff which signals what the state of the replication stream is.
 | |
|  * If the data pointed by @c buf is not the start of the replication header
 | |
|  * and part of the replication event is already sent, @c first must be set to
 | |
|  * false so that the first status byte is not sent again.
 | |
|  *
 | |
|  * @param slave Slave where the packet is sent to
 | |
|  * @param buf Buffer containing the data
 | |
|  * @param len Length of the data
 | |
|  * @param first If this is the first packet of a multi-packet event
 | |
|  * @return True on success, false when memory allocation fails
 | |
|  */
 | |
| bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first)
 | |
| {
 | |
|     bool rval = true;
 | |
|     unsigned int datalen = len + (first ? 1 : 0);
 | |
|     GWBUF *buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN);
 | |
|     if (buffer)
 | |
|     {
 | |
|         uint8_t *data = GWBUF_DATA(buffer);
 | |
|         encode_value(data, datalen, 24);
 | |
|         data += 3;
 | |
|         *data++ = slave->seqno++;
 | |
| 
 | |
|         if (first)
 | |
|         {
 | |
|             *data++ = 0; // OK byte
 | |
|         }
 | |
| 
 | |
|         if (len > 0)
 | |
|         {
 | |
|             memcpy(data, buf, len);
 | |
|         }
 | |
| 
 | |
|         slave->stats.n_bytes += GWBUF_LENGTH(buffer);
 | |
|         slave->dcb->func.write(slave->dcb, buffer);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_ERROR("failed to allocate %ld bytes of memory when writing an"
 | |
|                   " event.", datalen + MYSQL_HEADER_LEN);
 | |
|         rval = false;
 | |
|     }
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send a single replication event to a slave
 | |
|  *
 | |
|  * This sends the complete replication event to a slave. If the event size exceeds
 | |
|  * the maximum size of a MySQL packet, it will be sent in multiple packets.
 | |
|  *
 | |
|  * @param role  What is the role of the caller, slave or master.
 | |
|  * @param binlog_name The name of the binlogfile.
 | |
|  * @param binlog_pos The position in the binlogfile.
 | |
|  * @param slave Slave where the event is sent to
 | |
|  * @param hdr   Replication header
 | |
|  * @param buf   Pointer to the replication event as it was read from the disk
 | |
|  * @return True on success, false if memory allocation failed
 | |
|  */
 | |
| bool blr_send_event(blr_thread_role_t role,
 | |
|                     const char* binlog_name,
 | |
|                     uint32_t binlog_pos,
 | |
|                     ROUTER_SLAVE *slave,
 | |
|                     REP_HEADER *hdr,
 | |
|                     uint8_t *buf)
 | |
| {
 | |
|     bool rval = true;
 | |
| 
 | |
|     if ((strcmp(slave->lsi_binlog_name, binlog_name) == 0) &&
 | |
|         (slave->lsi_binlog_pos == binlog_pos))
 | |
|     {
 | |
|         MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s', position %u: "
 | |
|                   "thread %lu in the role of %s could not send the event, "
 | |
|                   "the event has already been sent by thread %lu in the role of %s. "
 | |
|                   "%u bytes buffered for writing in DCB %p. %lu events received from master.",
 | |
|                   slave->dcb->remote,
 | |
|                   ntohs((slave->dcb->ipv4).sin_port),
 | |
|                   slave->serverid,
 | |
|                   binlog_name,
 | |
|                   binlog_pos,
 | |
|                   thread_self(),
 | |
|                   ROLETOSTR(role),
 | |
|                   slave->lsi_sender_tid,
 | |
|                   ROLETOSTR(slave->lsi_sender_role),
 | |
|                   gwbuf_length(slave->dcb->writeq), slave->dcb,
 | |
|                   slave->router->stats.n_binlogs);
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     /** Check if the event and the OK byte fit into a single packet  */
 | |
|     if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
 | |
|     {
 | |
|         rval = blr_send_packet(slave, buf, hdr->event_size, true);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         /** Total size of all the payloads in all the packets */
 | |
|         int64_t len = hdr->event_size + 1;
 | |
|         bool first = true;
 | |
| 
 | |
|         while (rval && len > 0)
 | |
|         {
 | |
|             uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 :
 | |
|                                    MIN(MYSQL_PACKET_LENGTH_MAX, len);
 | |
| 
 | |
|             if (blr_send_packet(slave, buf, payload_len, first))
 | |
|             {
 | |
|                 /** The check for exactly 0x00ffffff bytes needs to be done
 | |
|                  * here as well */
 | |
|                 if (len == MYSQL_PACKET_LENGTH_MAX)
 | |
|                 {
 | |
|                     blr_send_packet(slave, buf, 0, false);
 | |
|                 }
 | |
| 
 | |
|                 /** Add the extra byte written by blr_send_packet */
 | |
|                 len -= first ? payload_len + 1 : payload_len;
 | |
|                 buf += payload_len;
 | |
|                 first = false;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 rval = false;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     slave->stats.n_events++;
 | |
| 
 | |
|     if (rval)
 | |
|     {
 | |
|         strcpy(slave->lsi_binlog_name, binlog_name);
 | |
|         slave->lsi_binlog_pos = binlog_pos;
 | |
|         slave->lsi_sender_role = role;
 | |
|         slave->lsi_sender_tid = thread_self();
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.",
 | |
|                   hdr->event_size, slave->dcb->remote,
 | |
|                   ntohs(slave->dcb->ipv4.sin_port));
 | |
|     }
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Extract the checksum from the binlogs
 | |
|  *
 | |
|  * This updates the internal state of the router and will allow us to detect
 | |
|  * if the checksum is split across two packets.
 | |
|  * @param router Router instance
 | |
|  * @param cksumptr Pointer to the checksum
 | |
|  * @param len How much of the data is readable
 | |
|  */
 | |
| void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
 | |
| {
 | |
|     uint8_t *ptr = cksumptr;
 | |
|     while (ptr - cksumptr < len && router->partial_checksum_bytes < MYSQL_CHECKSUM_LEN)
 | |
|     {
 | |
|         router->partial_checksum[router->partial_checksum_bytes] = *ptr;
 | |
|         ptr++;
 | |
|         router->partial_checksum_bytes++;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Stop the slave connection and log errors
 | |
|  *
 | |
|  * @param router Router instance
 | |
|  * @param ptr Pointer to the start of the packet
 | |
|  * @param len Length of the packet
 | |
|  */
 | |
| static void blr_terminate_master_replication(ROUTER_INSTANCE* router, uint8_t* ptr, int len)
 | |
| {
 | |
|     unsigned long mysql_errno = extract_field(ptr + 5, 16);
 | |
|     int msg_len = len - 7 - 6; // msg len is decreased by 7 and 6
 | |
|     char *msg_err = (char *)MXS_MALLOC(msg_len + 1);
 | |
|     MXS_ABORT_IF_NULL(msg_err);
 | |
| 
 | |
|     memcpy(msg_err, (char *)ptr + 7 + 6, msg_len);
 | |
|     *(msg_err + msg_len) = '\0';
 | |
| 
 | |
|     spinlock_acquire(&router->lock);
 | |
| 
 | |
|     char* old_errmsg = router->m_errmsg;
 | |
|     router->m_errmsg = msg_err;
 | |
|     router->m_errno = mysql_errno;
 | |
|     router->master_state = BLRM_SLAVE_STOPPED;
 | |
|     router->stats.n_binlog_errors++;
 | |
| 
 | |
|     spinlock_release(&router->lock);
 | |
| 
 | |
|     MXS_FREE(old_errmsg);
 | |
|     MXS_ERROR("Error packet in binlog stream.%s @ %lu.",
 | |
|               router->binlog_name, router->current_pos);
 | |
| 
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Populate a header structure for a replication message from a GWBUF structure with semi-sync enabled.
 | |
|  *
 | |
|  * @param pkt   The incoming packet in a GWBUF chain
 | |
|  * @param hdr   The packet header to populate
 | |
|  */
 | |
| static void
 | |
| blr_extract_header_semisync(register uint8_t *ptr, register REP_HEADER *hdr)
 | |
| {
 | |
| 
 | |
|     hdr->payload_len = EXTRACT24(ptr);
 | |
|     hdr->seqno = ptr[3];
 | |
|     hdr->ok = ptr[4];
 | |
|     /* Data available after 2 bytes (the 2 semisync bytes) */
 | |
|     hdr->timestamp = EXTRACT32(&ptr[5 + 2]);
 | |
|     hdr->event_type = ptr[9 + 2];
 | |
|     hdr->serverid = EXTRACT32(&ptr[10 + 2]);
 | |
|     hdr->event_size = EXTRACT32(&ptr[14 + 2]);
 | |
|     hdr->next_pos = EXTRACT32(&ptr[18 + 2]);
 | |
|     hdr->flags = EXTRACT16(&ptr[22 + 2]);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send a MySQL Replication Semi-Sync ACK to the master server.
 | |
|  *
 | |
|  * @param router The router instance.
 | |
|  * @param pos The binlog position for the ACK reply.
 | |
|  * @return 1 if the packect is sent, 0 on errors
 | |
|  */
 | |
| 
 | |
| static int
 | |
| blr_send_semisync_ack(ROUTER_INSTANCE *router, uint64_t pos)
 | |
| {
 | |
|     int seqno = 0;
 | |
|     int semi_sync_flag = BLR_MASTER_SEMI_SYNC_INDICATOR;
 | |
|     GWBUF   *buf;
 | |
|     int     len;
 | |
|     uint8_t *data;
 | |
|     int     binlog_file_len = strlen(router->binlog_name);
 | |
| 
 | |
|     /* payload is: 1 byte semi-sync indicator + 8 bytes position + binlog name len */
 | |
|     len = 1 + 8 + binlog_file_len;
 | |
| 
 | |
|     /* add network header to size */
 | |
|     if ((buf = gwbuf_alloc(len + 4)) == NULL)
 | |
|     {
 | |
|         return 0;
 | |
|     }
 | |
| 
 | |
|     data = GWBUF_DATA(buf);
 | |
| 
 | |
|     encode_value(&data[0], len, 24);  // Payload length
 | |
|     data[3] = 0;                      // Sequence ID
 | |
|     data[4] = semi_sync_flag;         // Semi-sync indicator
 | |
| 
 | |
|     /**
 | |
|      * Next Bytes are: 8 bytes log position + len bin_log filename
 | |
|      */
 | |
| 
 | |
|     /* Position */
 | |
|     encode_value(&data[5], pos, 64);
 | |
| 
 | |
|     /* Binlog filename */
 | |
|     memcpy((char *)&data[13], router->binlog_name, binlog_file_len);
 | |
| 
 | |
|     router->master->func.write(router->master, buf);
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check the master semisync capability.
 | |
|  *
 | |
|  * @param buf The GWBUF data with master reply.
 | |
|  * @return Semisync value: non available, enabled, disabled
 | |
|  */
 | |
| static int
 | |
| blr_get_master_semisync(GWBUF *buf)
 | |
| {
 | |
|     char *key;
 | |
|     char *val = NULL;
 | |
|     int  master_semisync = MASTER_SEMISYNC_NOT_AVAILABLE;
 | |
| 
 | |
|     key = blr_extract_column(buf, 1);
 | |
| 
 | |
|     if (key && strlen(key))
 | |
|     {
 | |
|         val = blr_extract_column(buf, 2);
 | |
|     }
 | |
|     free(key);
 | |
| 
 | |
|     if (val) {
 | |
|         if (strncasecmp(val, "ON", 4) == 0)
 | |
|             master_semisync = MASTER_SEMISYNC_ENABLED;
 | |
|         else
 | |
|             master_semisync = MASTER_SEMISYNC_DISABLED;
 | |
|     }
 | |
|     free(val);
 | |
| 
 | |
|     return master_semisync;
 | |
| }
 | 
