1075 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			1075 lines
		
	
	
		
			29 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
|  * This file is distributed as part of MaxScale.  It is free
 | |
|  * software: you can redistribute it and/or modify it under the terms of the
 | |
|  * GNU General Public License as published by the Free Software Foundation,
 | |
|  * version 2.
 | |
|  *
 | |
|  * This program is distributed in the hope that it will be useful, but WITHOUT
 | |
|  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 | |
|  * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 | |
|  * details.
 | |
|  *
 | |
|  * You should have received a copy of the GNU General Public License along with
 | |
|  * this program; if not, write to the Free Software Foundation, Inc., 51
 | |
|  * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 | |
|  *
 | |
|  * Copyright SkySQL Ab 2014
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * @file blr_master.c - contains code for the router to master communication
 | |
|  *
 | |
|  * The binlog router is designed to be used in replication environments to
 | |
|  * increase the replication fanout of a master server. It provides a transparant
 | |
|  * mechanism to read the binlog entries for multiple slaves while requiring
 | |
|  * only a single connection to the actual master to support the slaves.
 | |
|  *
 | |
|  * The current prototype implement is designed to support MySQL 5.6 and has
 | |
|  * a number of limitations. This prototype is merely a proof of concept and
 | |
|  * should not be considered production ready.
 | |
|  *
 | |
|  * @verbatim
 | |
|  * Revision History
 | |
|  *
 | |
|  * Date		Who		Description
 | |
|  * 02/04/2014	Mark Riddoch		Initial implementation
 | |
|  *
 | |
|  * @endverbatim
 | |
|  */
 | |
| #include <stdio.h>
 | |
| #include <stdlib.h>
 | |
| #include <string.h>
 | |
| #include <service.h>
 | |
| #include <server.h>
 | |
| #include <router.h>
 | |
| #include <atomic.h>
 | |
| #include <session.h>
 | |
| #include <blr.h>
 | |
| #include <dcb.h>
 | |
| #include <spinlock.h>
 | |
| 
 | |
| #include <sys/types.h>
 | |
| #include <sys/socket.h>
 | |
| 
 | |
| #include <skygw_types.h>
 | |
| #include <skygw_utils.h>
 | |
| #include <log_manager.h>
 | |
| 
 | |
| #include <rdtsc.h>
 | |
| 
 | |
| /* Temporary requirement for auth data */
 | |
| #include <mysql_client_server_protocol.h>
 | |
| 
 | |
| #define	SAMPLE_COUNT	10000
 | |
| CYCLES	samples[10][SAMPLE_COUNT];
 | |
| int	sample_index[10] = { 0, 0, 0 };
 | |
| 
 | |
| #define	LOGD_SLAVE_CATCHUP1	0
 | |
| #define	LOGD_SLAVE_CATCHUP2	1
 | |
| #define	LOGD_DISTRIBUTE		2
 | |
| #define	LOGD_FILE_FLUSH		3
 | |
| 
 | |
| SPINLOCK logspin = SPINLOCK_INIT;
 | |
| 
 | |
| void
 | |
| log_duration(int sample, CYCLES duration)
 | |
| {
 | |
| char	fname[100];
 | |
| int	i;
 | |
| FILE	*fp;
 | |
| 
 | |
| 	spinlock_acquire(&logspin);
 | |
| 	samples[sample][sample_index[sample]++] = duration;
 | |
| 	if (sample_index[sample] == SAMPLE_COUNT)
 | |
| 	{
 | |
| 		sprintf(fname, "binlog_profile.%d", sample);
 | |
| 		if ((fp = fopen(fname, "a")) != NULL)
 | |
| 		{
 | |
| 			for (i = 0; i < SAMPLE_COUNT; i++)
 | |
| 				fprintf(fp, "%ld\n", samples[sample][i]);
 | |
| 			fclose(fp);
 | |
| 		}
 | |
| 		sample_index[sample] = 0;
 | |
| 	}
 | |
| 	spinlock_release(&logspin);
 | |
| }
 | |
| 
 | |
| extern int lm_enabled_logfiles_bitmask;
 | |
| 
 | |
| static GWBUF *blr_make_query(char *statement);
 | |
| static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
 | |
| static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
 | |
| void encode_value(unsigned char *data, unsigned int value, int len);
 | |
| void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
 | |
| static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
 | |
| void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
 | |
| static void *CreateMySQLAuthData(char *username, char *password, char *database);
 | |
| void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
 | |
| inline uint32_t extract_field(uint8_t *src, int bits);
 | |
| static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
 | |
| 
 | |
| static int keepalive = 1;
 | |
| 
 | |
| /**
 | |
|  * blr_start_master - controls the connection of the binlog router to the
 | |
|  * master MySQL server and triggers the slave registration process for
 | |
|  * the router.
 | |
|  *
 | |
|  * @param	router		The router instance
 | |
|  */
 | |
| void
 | |
| blr_start_master(ROUTER_INSTANCE *router)
 | |
| {
 | |
| DCB	*client;
 | |
| GWBUF	*buf;
 | |
| 
 | |
| 	if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
 | |
| 	{
 | |
| 		LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
 | |
| 			"Binlog router: failed to create DCB for dummy client\n")));
 | |
| 		return;
 | |
| 	}
 | |
| 	router->client = client;
 | |
| 	client->data = CreateMySQLAuthData(router->user, router->password, "");
 | |
| 	if ((router->session = session_alloc(router->service, client)) == NULL)
 | |
| 	{
 | |
| 		LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
 | |
| 			"Binlog router: failed to create session for connection to master\n")));
 | |
| 		return;
 | |
| 	}
 | |
| 	client->session = router->session;
 | |
| 	if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
 | |
| 	{
 | |
| 		LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
 | |
| 		   "Binlog router: failed to connect to master server '%s'\n",
 | |
| 			router->service->databases->unique_name)));
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
 | |
| perror("setsockopt");
 | |
| 
 | |
| 	router->master_state = BLRM_AUTHENTICATED;
 | |
| 	buf = blr_make_query("SELECT UNIX_TIMESTAMP()");
 | |
| 	router->master->func.write(router->master, buf);
 | |
| 	router->master_state = BLRM_TIMESTAMP;
 | |
| 
 | |
| 	router->stats.n_masterstarts++;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Reconnect to the master server.
 | |
|  *
 | |
|  * IMPORTANT - must be called with router->active_logs set by the
 | |
|  * thread that set active_logs.
 | |
|  *
 | |
|  * @param	router		The router instance
 | |
|  */
 | |
| static void
 | |
| blr_restart_master(ROUTER_INSTANCE *router)
 | |
| {
 | |
| GWBUF	*ptr;
 | |
| 
 | |
| 	dcb_close(router->master);
 | |
| 	dcb_free(router->master);
 | |
| 	dcb_free(router->client);
 | |
| 
 | |
| 	/* Discard the queued residual data */
 | |
| 	ptr = router->residual;
 | |
| 	while (ptr)
 | |
| 	{
 | |
| 		ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr));
 | |
| 	}
 | |
| 	router->residual = NULL;
 | |
| 
 | |
| 	/* Now it is safe to unleash other threads on this router instance */
 | |
| 	spinlock_acquire(&router->lock);
 | |
| 	router->reconnect_pending = 0;
 | |
| 	router->active_logs = 0;
 | |
| 	spinlock_release(&router->lock);
 | |
| 	blr_start_master(router);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Request a reconnect to the master.
 | |
|  *
 | |
|  * If another thread is active processing messages from the master
 | |
|  * then merely set a flag for that thread to do the restart. If no
 | |
|  * threads are active then directly call the restart routine to
 | |
|  * reconnect to the master.
 | |
|  *
 | |
|  * @param	router		The router instance
 | |
|  */
 | |
| void
 | |
| blr_master_reconnect(ROUTER_INSTANCE *router)
 | |
| {
 | |
| int	do_reconnect = 0;
 | |
| 
 | |
| 	spinlock_acquire(&router->lock);
 | |
| 	if (router->active_logs)
 | |
| 	{
 | |
| 		/* Currently processing a response, set a flag
 | |
| 		 * and get the thread that is process a response
 | |
| 		 * to deal with the reconnect.
 | |
| 		 */
 | |
| 		router->reconnect_pending = 1;
 | |
| 		router->stats.n_delayedreconnects++;
 | |
| 	}
 | |
| 	else
 | |
| 	{
 | |
| 		router->active_logs = 1;
 | |
| 		do_reconnect = 1;
 | |
| 	}
 | |
| 	spinlock_release(&router->lock);
 | |
| 	if (do_reconnect)
 | |
| 	{
 | |
| 		blr_restart_master(router);
 | |
| 		spinlock_acquire(&router->lock);
 | |
| 		router->active_logs = 0;
 | |
| 		spinlock_release(&router->lock);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Binlog router master side state machine event handler.
 | |
|  *
 | |
|  * Handles an incoming response from the master server to the binlog
 | |
|  * router.
 | |
|  *
 | |
|  * @param router	The router instance
 | |
|  * @param buf		The incoming packet
 | |
|  */
 | |
| void
 | |
| blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
 | |
| {
 | |
| char	query[128];
 | |
| 
 | |
| 	atomic_add(&router->handling_threads, 1);
 | |
| 	ss_dassert(router->handling_threads == 1);
 | |
| 	spinlock_acquire(&router->lock);
 | |
| 	router->active_logs = 1;
 | |
| 	spinlock_release(&router->lock);
 | |
| 	if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
 | |
| 	{
 | |
|         	LOGIF(LE, (skygw_log_write(
 | |
|                            LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n",
 | |
| 					router->master_state)));
 | |
| 		gwbuf_consume(buf, gwbuf_length(buf));
 | |
| 		spinlock_acquire(&router->lock);
 | |
| 		if (router->reconnect_pending)
 | |
| 		{
 | |
| 			router->active_logs = 0;
 | |
| 			spinlock_release(&router->lock);
 | |
| 			atomic_add(&router->handling_threads, -1);
 | |
| 			blr_restart_master(router);
 | |
| 			return;
 | |
| 		}
 | |
| 		router->active_logs = 0;
 | |
| 		spinlock_release(&router->lock);
 | |
| 		atomic_add(&router->handling_threads, -1);
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf))
 | |
| 	{
 | |
|         	LOGIF(LE, (skygw_log_write(
 | |
|                            LOGFILE_ERROR,
 | |
| 			"Received error: %d, %s from master during %s phase of the master state machine.\n",
 | |
| 			MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
 | |
| 			)));
 | |
| 		gwbuf_consume(buf, gwbuf_length(buf));
 | |
| 		spinlock_acquire(&router->lock);
 | |
| 		router->active_logs = 0;
 | |
| 		if (router->reconnect_pending)
 | |
| 		{
 | |
| 			spinlock_release(&router->lock);
 | |
| 			atomic_add(&router->handling_threads, -1);
 | |
| 			blr_restart_master(router);
 | |
| 			return;
 | |
| 		}
 | |
| 		spinlock_release(&router->lock);
 | |
| 		atomic_add(&router->handling_threads, -1);
 | |
| 		return;
 | |
| 	}
 | |
| 	switch (router->master_state)
 | |
| 	{
 | |
| 	case BLRM_TIMESTAMP:
 | |
| 		// Response to a timestamp message, no need to save this.
 | |
| 		gwbuf_consume(buf, GWBUF_LENGTH(buf));
 | |
| 		buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
 | |
| 		router->master_state = BLRM_SERVERID;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_SERVERID:
 | |
| 		// Response to fetch of master's server-id
 | |
| 		router->saved_master.server_id = buf;
 | |
| 		// TODO: Extract the value of server-id and place in router->master_id
 | |
| 		buf = blr_make_query("SET @master_heartbeat_period = 1799999979520");
 | |
| 		router->master_state = BLRM_HBPERIOD;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_HBPERIOD:
 | |
| 		// Response to set the heartbeat period
 | |
| 		router->saved_master.heartbeat = buf;
 | |
| 		buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
 | |
| 		router->master_state = BLRM_CHKSUM1;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_CHKSUM1:
 | |
| 		// Response to set the master binlog checksum
 | |
| 		router->saved_master.chksum1 = buf;
 | |
| 		buf = blr_make_query("SELECT @master_binlog_checksum");
 | |
| 		router->master_state = BLRM_CHKSUM2;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_CHKSUM2:
 | |
| 		// Response to the master_binlog_checksum, should be stored
 | |
| 		router->saved_master.chksum2 = buf;
 | |
| 		buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
 | |
| 		router->master_state = BLRM_GTIDMODE;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_GTIDMODE:
 | |
| 		// Response to the GTID_MODE, should be stored
 | |
| 		router->saved_master.gtid_mode = buf;
 | |
| 		buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
 | |
| 		router->master_state = BLRM_MUUID;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_MUUID:
 | |
| 		// Response to the SERVER_UUID, should be stored
 | |
| 		router->saved_master.uuid = buf;
 | |
| 		sprintf(query, "SET @slave_uuid='%s'", router->uuid);
 | |
| 		buf = blr_make_query(query);
 | |
| 		router->master_state = BLRM_SUUID;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_SUUID:
 | |
| 		// Response to the SET @server_uuid, should be stored
 | |
| 		router->saved_master.setslaveuuid = buf;
 | |
| 		buf = blr_make_query("SET NAMES latin1");
 | |
| 		router->master_state = BLRM_LATIN1;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_LATIN1:
 | |
| 		// Response to the SET NAMES latin1, should be stored
 | |
| 		router->saved_master.setnames = buf;
 | |
| 		buf = blr_make_query("SET NAMES utf8");
 | |
| 		router->master_state = BLRM_UTF8;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_UTF8:
 | |
| 		// Response to the SET NAMES utf8, should be stored
 | |
| 		router->saved_master.utf8 = buf;
 | |
| 		buf = blr_make_query("SELECT 1");
 | |
| 		router->master_state = BLRM_SELECT1;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_SELECT1:
 | |
| 		// Response to the SELECT 1, should be stored
 | |
| 		router->saved_master.select1 = buf;
 | |
| 		buf = blr_make_query("SELECT VERSION();");
 | |
| 		router->master_state = BLRM_SELECTVER;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_SELECTVER:
 | |
| 		// Response to SELECT VERSION should be stored
 | |
| 		router->saved_master.selectver = buf;
 | |
| 		buf = blr_make_registration(router);
 | |
| 		router->master_state = BLRM_REGISTER;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_REGISTER:
 | |
| 		// Request a dump of the binlog file
 | |
| 		buf = blr_make_binlog_dump(router);
 | |
| 		router->master_state = BLRM_BINLOGDUMP;
 | |
| 		router->master->func.write(router->master, buf);
 | |
| 		break;
 | |
| 	case BLRM_BINLOGDUMP:
 | |
| 		// Main body, we have received a binlog record from the master
 | |
| 		blr_handle_binlog_record(router, buf);
 | |
| 		break;
 | |
| 	}
 | |
| 
 | |
| 	if (router->reconnect_pending)
 | |
| 		blr_restart_master(router);
 | |
| 	spinlock_acquire(&router->lock);
 | |
| 	router->active_logs = 0;
 | |
| 	spinlock_release(&router->lock);
 | |
| 	atomic_add(&router->handling_threads, -1);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Build a MySQL query into a GWBUF that we can send to the master database
 | |
|  *
 | |
|  * @param	query		The text of the query to send
 | |
|  */
 | |
| static GWBUF *
 | |
| blr_make_query(char *query)
 | |
| {
 | |
| GWBUF		*buf;
 | |
| unsigned char	*data;
 | |
| int		len;
 | |
| 
 | |
| 	if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL)
 | |
| 		return NULL;
 | |
| 	data = GWBUF_DATA(buf);
 | |
| 	len = strlen(query) + 1;
 | |
| 	encode_value(&data[0], len, 24);	// Payload length
 | |
| 	data[3] = 0;				// Sequence id
 | |
| 						// Payload
 | |
| 	data[4] = COM_QUERY;			// Command
 | |
| 	memcpy(&data[5], query, strlen(query));
 | |
| 
 | |
| 	return buf;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Build a MySQL slave registration into a GWBUF that we can send to the
 | |
|  * master database
 | |
|  *
 | |
|  * @param	router 	The router instance
 | |
|  * @return	A MySQL Replication registration message in a GWBUF structure
 | |
|  */
 | |
| static GWBUF *
 | |
| blr_make_registration(ROUTER_INSTANCE *router)
 | |
| {
 | |
| GWBUF		*buf;
 | |
| unsigned char	*data;
 | |
| int		len = 18;
 | |
| 
 | |
| 	if ((buf = gwbuf_alloc(len + 4)) == NULL)
 | |
| 		return NULL;
 | |
| 	data = GWBUF_DATA(buf);
 | |
| 	encode_value(&data[0], len, 24);		// Payload length
 | |
| 	data[3] = 0;					// Sequence ID
 | |
| 	data[4] = COM_REGISTER_SLAVE;			// Command
 | |
| 	encode_value(&data[5], router->serverid, 32);	// Slave Server ID
 | |
| 	data[9] = 0;					// Slave hostname length
 | |
| 	data[10] = 0;					// Slave username length
 | |
| 	data[11] = 0;					// Slave password length
 | |
| 	encode_value(&data[12],
 | |
| 		router->service->ports->port, 16);	// Slave master port
 | |
| 	encode_value(&data[14], 0, 32);			// Replication rank
 | |
| 	encode_value(&data[18], router->masterid, 32);	// Master server-id
 | |
| 
 | |
| 	return buf;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * Build a Binlog dump command into a GWBUF that we can send to the
 | |
|  * master database
 | |
|  *
 | |
|  * @param	router 	The router instance
 | |
|  * @return	A MySQL Replication COM_BINLOG_DUMP message in a GWBUF structure
 | |
|  */
 | |
| static GWBUF *
 | |
| blr_make_binlog_dump(ROUTER_INSTANCE *router)
 | |
| {
 | |
| GWBUF		*buf;
 | |
| unsigned char	*data;
 | |
| int		len = 0x1b;
 | |
| 
 | |
| 	if ((buf = gwbuf_alloc(len + 4)) == NULL)
 | |
| 		return NULL;
 | |
| 	data = GWBUF_DATA(buf);
 | |
| 
 | |
| 	encode_value(&data[0], len,24);			// Payload length
 | |
| 	data[3] = 0;					// Sequence ID
 | |
| 	data[4] = COM_BINLOG_DUMP;			// Command
 | |
| 	encode_value(&data[5],
 | |
| 		router->binlog_position, 32);		// binlog position
 | |
| 	encode_value(&data[9], 0, 16);			// Flags
 | |
| 	encode_value(&data[11],
 | |
| 		router->serverid, 32);			// Server-id of MaxScale
 | |
| 	strncpy((char *)&data[15], router->binlog_name,
 | |
| 				BINLOG_FNAMELEN);	// binlog filename
 | |
| 	return buf;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * Encode a value into a number of bits in a MySQL packet
 | |
|  *
 | |
|  * @param	data	Point to location in target packet
 | |
|  * @param	value	The value to pack
 | |
|  * @param	len	Number of bits to encode value into
 | |
|  */
 | |
| void
 | |
| encode_value(unsigned char *data, unsigned int value, int len)
 | |
| {
 | |
| 	while (len > 0)
 | |
| 	{
 | |
| 		*data++ = value & 0xff;
 | |
| 		value >>= 8;
 | |
| 		len -= 8;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * blr_handle_binlog_record - we have received binlog records from
 | |
|  * the master and we must now work out what to do with them.
 | |
|  *
 | |
|  * @param router	The router instance
 | |
|  * @param pkt		The binlog records
 | |
|  */
 | |
| void
 | |
| blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
 | |
| {
 | |
| uint8_t			*msg = NULL, *ptr, *pdata;
 | |
| REP_HEADER		hdr;
 | |
| unsigned int		len, reslen;
 | |
| unsigned int		pkt_length;
 | |
| int			no_residual = 1;
 | |
| int			preslen = -1;
 | |
| int			prev_length = -1;
 | |
| int			n_bufs = -1, pn_bufs = -1;
 | |
| static REP_HEADER	phdr;
 | |
| 
 | |
| 	/*
 | |
| 	 * Prepend any residual buffer to the buffer chain we have
 | |
| 	 * been called with.
 | |
| 	 */
 | |
| 	if (router->residual)
 | |
| 	{
 | |
| 		pkt = gwbuf_append(router->residual, pkt);
 | |
| 		router->residual = NULL;
 | |
| 		no_residual = 0;
 | |
| 	}
 | |
| 
 | |
| 	pkt_length = gwbuf_length(pkt);
 | |
| 	while (pkt && pkt_length > 24)
 | |
| 	{
 | |
| 		reslen = GWBUF_LENGTH(pkt);
 | |
| 		pdata = GWBUF_DATA(pkt);
 | |
| 		if (reslen < 3)	// Payload length straddles buffers
 | |
| 		{
 | |
| 			/* Get the length of the packet from the residual and new packet */
 | |
| 			if (reslen >= 3)
 | |
| 			{
 | |
| 				len = extract_field(pdata, 24);
 | |
| 			}
 | |
| 			else if (reslen == 2)
 | |
| 			{
 | |
| 				len = extract_field(pdata, 16);
 | |
| 				len |= (extract_field(GWBUF_DATA(pkt->next), 8) << 16);
 | |
| 			}
 | |
| 			else if (reslen == 1)
 | |
| 			{
 | |
| 				len = extract_field(pdata, 8);
 | |
| 				len |= (extract_field(GWBUF_DATA(pkt->next), 16) << 8);
 | |
| 			}
 | |
| 			len += 4; 	// Allow space for the header
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			len = extract_field(pdata, 24) + 4;
 | |
| 		}
 | |
| 
 | |
| 		if (reslen < len && pkt_length >= len)
 | |
| 		{
 | |
| 			/*
 | |
| 			 * The message is contained in more than the current
 | |
| 			 * buffer, however we have the complete messasge in
 | |
| 			 * this buffer and the chain of remaining buffers.
 | |
| 			 *
 | |
| 			 * Allocate a contiguous buffer for the binlog message
 | |
| 			 * and copy the complete message into this buffer.
 | |
| 			 */
 | |
| 			int remainder = len;
 | |
| 			GWBUF *p = pkt;
 | |
| 
 | |
| 			if ((msg = malloc(len)) == NULL)
 | |
| 			{
 | |
|         			LOGIF(LE,(skygw_log_write(
 | |
| 		                           LOGFILE_ERROR,
 | |
| 					"Insufficient memory to buffer event "
 | |
| 					"of %d bytes. Binlog %s @ %d\n.",
 | |
| 					len, router->binlog_name,
 | |
| 					router->binlog_position)));
 | |
| 				break;
 | |
| 			}
 | |
| 
 | |
| 			n_bufs = 0;
 | |
| 			ptr = msg;
 | |
| 			while (p && remainder > 0)
 | |
| 			{
 | |
| 				int plen = GWBUF_LENGTH(p);
 | |
| 				int n = (remainder > plen ? plen : remainder);
 | |
| 				memcpy(ptr, GWBUF_DATA(p), n);
 | |
| 				remainder -= n;
 | |
| 				ptr += n;
 | |
| 				if (remainder > 0)
 | |
| 					p = p->next;
 | |
| 			n_bufs++;
 | |
| 			}
 | |
| 			if (remainder)
 | |
| 			{
 | |
|         			LOGIF(LE,(skygw_log_write(
 | |
| 		                           LOGFILE_ERROR,
 | |
| 					"Expected entire message in buffer "
 | |
| 					"chain, but failed to create complete "
 | |
| 					"message as expected. %s @ %d\n",
 | |
| 					router->binlog_name,
 | |
| 					router->binlog_position)));
 | |
| 				free(msg);
 | |
| 				msg = NULL;
 | |
| 				break;
 | |
| 			}
 | |
| 
 | |
| 			ptr = msg;
 | |
| 		}
 | |
| 		else if (reslen < len)
 | |
| 		{
 | |
| 			/*
 | |
| 			 * The message is not fully contained in the current
 | |
| 			 * and we do not have the complete message in the
 | |
| 			 * buffer chain. Therefore we must stop processing
 | |
| 			 * until we receive the next buffer.
 | |
| 			 */
 | |
| 			router->stats.n_residuals++;
 | |
| 	        	LOGIF(LD,(skygw_log_write(
 | |
|                            LOGFILE_DEBUG,
 | |
| 			   "Residual data left after %d records. %s @ %d\n",
 | |
| 					router->stats.n_binlogs,
 | |
| 			   router->binlog_name, router->binlog_position)));
 | |
| 			break;
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			/*
 | |
| 			 * The message is fully contained in the current buffer
 | |
| 			 */
 | |
| 			ptr = pdata;
 | |
| 			n_bufs = 1;
 | |
| 		}
 | |
| 
 | |
| 		blr_extract_header(ptr, &hdr);
 | |
| 
 | |
| 		if (hdr.event_size != len - 5)
 | |
| 		{
 | |
| 	        	LOGIF(LE,(skygw_log_write(
 | |
|                            LOGFILE_ERROR,
 | |
| 				"Packet length is %d, but event size is %d, "
 | |
| 				"binlog file %s position %d"
 | |
| 				"reslen is %d and preslen is %d, "
 | |
| 				"length of previous event %d. %s",
 | |
| 					len, hdr.event_size,
 | |
| 					router->binlog_name,
 | |
| 					router->binlog_position,
 | |
| 					reslen, preslen, prev_length,
 | |
| 				(prev_length == -1 ?
 | |
| 				(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
 | |
| 				)));
 | |
| 			blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
 | |
| 	        	LOGIF(LE,(skygw_log_write(
 | |
|                            LOGFILE_ERROR,
 | |
| 				"This event (0x%x) was contained in %d GWBUFs, "
 | |
| 				"the previous events was contained in %d GWBUFs",
 | |
| 				router->lastEventReceived, n_bufs, pn_bufs)));
 | |
| 			if (msg)
 | |
| 			{
 | |
| 				free(msg);
 | |
| 				msg = NULL;
 | |
| 			}
 | |
| 			break;
 | |
| 		}
 | |
| 		phdr = hdr;
 | |
| 		if (hdr.ok == 0)
 | |
| 		{
 | |
| 			router->stats.n_binlogs++;
 | |
| 			router->lastEventReceived = hdr.event_type;
 | |
| 
 | |
| // #define SHOW_EVENTS
 | |
| #ifdef SHOW_EVENTS
 | |
| 			printf("blr: event type 0x%02x, flags 0x%04x, event size %d\n", hdr.event_type, hdr.flags, hdr.event_size);
 | |
| #endif
 | |
| 			if (hdr.event_type >= 0 && hdr.event_type < 0x24)
 | |
| 				router->stats.events[hdr.event_type]++;
 | |
| 			if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
 | |
| 			{
 | |
| 				// Fake format description message
 | |
|         			LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
 | |
| 					"Replication fake event. "
 | |
| 						"Binlog %s @ %d.\n",
 | |
| 					router->binlog_name,
 | |
| 					router->binlog_position)));
 | |
| 				router->stats.n_fakeevents++;
 | |
| 				if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
 | |
| 				{
 | |
| 					/*
 | |
| 					 * We need to save this to replay to new
 | |
| 					 * slaves that attach later.
 | |
| 					 */
 | |
| 					if (router->saved_master.fde_event)
 | |
| 						free(router->saved_master.fde_event);
 | |
| 					router->saved_master.fde_len = hdr.event_size;
 | |
| 					router->saved_master.fde_event = malloc(hdr.event_size);
 | |
| 					if (router->saved_master.fde_event)
 | |
| 						memcpy(router->saved_master.fde_event,
 | |
| 								ptr + 5, hdr.event_size);
 | |
| 				}
 | |
| 			}
 | |
| 			else
 | |
| 			{
 | |
| 				if (hdr.event_type == HEARTBEAT_EVENT)
 | |
| 				{
 | |
| #ifdef SHOW_EVENTS
 | |
| 					printf("Replication heartbeat\n");
 | |
| #endif
 | |
|         				LOGIF(LD,(skygw_log_write(
 | |
| 			                           LOGFILE_DEBUG,
 | |
| 						"Replication heartbeat. "
 | |
| 						"Binlog %s @ %d.\n",
 | |
| 						router->binlog_name,
 | |
| 						router->binlog_position)));
 | |
| 					router->stats.n_heartbeats++;
 | |
| 				}
 | |
| 				else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
 | |
| 				{
 | |
| 					ptr = ptr + 5;	// We don't put the first byte of the payload
 | |
| 							// into the binlog file
 | |
| 					blr_write_binlog_record(router, &hdr, ptr);
 | |
| 					if (hdr.event_type == ROTATE_EVENT)
 | |
| 					{
 | |
| 						blr_rotate_event(router, ptr, &hdr);
 | |
| 					}
 | |
| 					blr_distribute_binlog_record(router, &hdr, ptr);
 | |
| 				}
 | |
| 				else
 | |
| 				{
 | |
| 					router->stats.n_artificial++;
 | |
|         				LOGIF(LD,(skygw_log_write(
 | |
| 			                           LOGFILE_DEBUG,
 | |
| 					"Artificial event not written "
 | |
| 					"to disk or distributed. "
 | |
| 					"Type 0x%x, Length %d, Binlog "
 | |
| 					"%s @ %d\n.",
 | |
| 						hdr.event_type,
 | |
| 						hdr.event_size,
 | |
| 						router->binlog_name,
 | |
| 						router->binlog_position)));
 | |
| 					ptr += 5;
 | |
| 					if (hdr.event_type == ROTATE_EVENT)
 | |
| 					{
 | |
| 						blr_rotate_event(router, ptr, &hdr);
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			printf("Binlog router error: %s\n", &ptr[7]);
 | |
| 			LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
 | |
| 				"Error packet in binlog stream.%s @ %d\n.",
 | |
| 						router->binlog_name,
 | |
| 						router->binlog_position)));
 | |
| 			blr_log_packet(LOGFILE_ERROR, "Error Packet:",
 | |
| 				ptr, len);
 | |
| 			router->stats.n_binlog_errors++;
 | |
| 		}
 | |
| 
 | |
| 		if (msg)
 | |
| 		{
 | |
| 			free(msg);
 | |
| 			msg = NULL;
 | |
| 		}
 | |
| 		prev_length = len;
 | |
| 		while (len > 0)
 | |
| 		{
 | |
| 			int n, plen;
 | |
| 			plen = GWBUF_LENGTH(pkt);
 | |
| 			n = (plen < len ? plen : len);
 | |
| 			pkt = gwbuf_consume(pkt, n);
 | |
| 			len -= n;
 | |
| 			pkt_length -= n;
 | |
| 		}
 | |
| 		preslen = reslen;
 | |
| 		pn_bufs = n_bufs;
 | |
| 	}
 | |
| 
 | |
| 	/*
 | |
| 	 * Check if we have a residual, part binlog message to deal with.
 | |
| 	 * Just simply store the GWBUF for next time
 | |
| 	 */
 | |
| 	if (pkt)
 | |
| 	{
 | |
| 		router->residual = pkt;
 | |
| 		ss_dassert(pkt_length != 0);
 | |
| 	}
 | |
| 	else
 | |
| 	{
 | |
| 		ss_dassert(pkt_length == 0);
 | |
| 	}
 | |
| { CYCLES start = rdtsc(); 
 | |
| 	blr_file_flush(router);
 | |
| log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Populate a header structure for a replication message from a GWBUF structure.
 | |
|  *
 | |
|  * @param pkt	The incoming packet in a GWBUF chain
 | |
|  * @param hdr	The packet header to populate
 | |
|  */
 | |
| void
 | |
| blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
 | |
| {
 | |
| 
 | |
| 	hdr->payload_len = extract_field(ptr, 24);
 | |
| 	hdr->seqno = ptr[3];
 | |
| 	hdr->ok = ptr[4];
 | |
| 	hdr->timestamp = extract_field(&ptr[5], 32);
 | |
| 	hdr->event_type = ptr[9];
 | |
| 	hdr->serverid = extract_field(&ptr[10], 32);
 | |
| 	hdr->event_size = extract_field(&ptr[14], 32);
 | |
| 	hdr->next_pos = extract_field(&ptr[18], 32);
 | |
| 	hdr->flags = extract_field(&ptr[22], 16);
 | |
| }
 | |
| 
 | |
| /** 
 | |
|  * Extract a numeric field from a packet of the specified number of bits
 | |
|  *
 | |
|  * @param src	The raw packet source
 | |
|  * @param birs	The number of bits to extract (multiple of 8)
 | |
|  */
 | |
| inline uint32_t
 | |
| extract_field(register uint8_t *src, int bits)
 | |
| {
 | |
| register uint32_t	rval = 0, shift = 0;
 | |
| 
 | |
| 	while (bits > 0)
 | |
| 	{
 | |
| 		rval |= (*src++) << shift;
 | |
| 		shift += 8;
 | |
| 		bits -= 8;
 | |
| 	}
 | |
| 	return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Process a binlog rotate event.
 | |
|  *
 | |
|  * @param router	The instance of the router
 | |
|  * @param ptr		The packet containing the rotate event
 | |
|  * @param hdr		The replication message header
 | |
|  */
 | |
| static void
 | |
| blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
 | |
| {
 | |
| int		len, slen;
 | |
| uint64_t	pos;
 | |
| char		file[BINLOG_FNAMELEN+1];
 | |
| 
 | |
| 	ptr += 19;		// Skip event header
 | |
| 	len = hdr->event_size - 19;	// Event size minus header
 | |
| 	pos = extract_field(ptr+4, 32);
 | |
| 	pos <<= 32;
 | |
| 	pos |= extract_field(ptr, 32);
 | |
| 	slen = len - (8 + 4);		// Allow for position and CRC
 | |
| 	if (slen > BINLOG_FNAMELEN)
 | |
| 		slen = BINLOG_FNAMELEN;
 | |
| 	memcpy(file, ptr + 8, slen);
 | |
| 	file[slen] = 0;
 | |
| 
 | |
| #ifdef VERBOSE_ROTATE
 | |
| 	printf("binlog rotate: ");
 | |
| 	while (len--)
 | |
| 		printf("0x%02x ", *ptr++);
 | |
| 	printf("\n");
 | |
| 	printf("New file: %s @ %ld\n", file, pos);
 | |
| #endif
 | |
| 
 | |
| 	if (strncmp(router->binlog_name, file, slen) != 0)
 | |
| 	{
 | |
| 		router->stats.n_rotates++;
 | |
| 		blr_file_rotate(router, file, pos);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Create the auth data needed to be able to call dcb_connect.
 | |
|  * 
 | |
|  * This doesn't really belong here and should be moved at some stage.
 | |
|  */
 | |
| static void *
 | |
| CreateMySQLAuthData(char *username, char *password, char *database)
 | |
| {
 | |
| MYSQL_session	*auth_info;
 | |
| 
 | |
| 	if (username == NULL || password == NULL)
 | |
| 	{
 | |
|         	LOGIF(LE,(skygw_log_write(
 | |
|                            LOGFILE_ERROR,
 | |
| 				"You must specify both username and password for the binlog router.\n")));
 | |
| 		return NULL;
 | |
| 	}
 | |
| 
 | |
| 	if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL)
 | |
| 		return NULL;
 | |
| 	strcpy(auth_info->user, username);
 | |
| 	strcpy(auth_info->db, database);
 | |
| 	gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1);
 | |
| 
 | |
| 	return auth_info;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Distribute the binlog record we have just received to all the registered slaves.
 | |
|  *
 | |
|  * @param	router		The router instance
 | |
|  * @param	hdr		The replication event header
 | |
|  * @param	ptr		The raw replication event data
 | |
|  */
 | |
| void
 | |
| blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
 | |
| {
 | |
| GWBUF		*pkt;
 | |
| uint8_t		*buf;
 | |
| ROUTER_SLAVE	*slave;
 | |
| int		action;
 | |
| CYCLES		entry;
 | |
| 
 | |
| 	entry = rdtsc();
 | |
| 	spinlock_acquire(&router->lock);
 | |
| 	slave = router->slaves;
 | |
| 	while (slave)
 | |
| 	{
 | |
| 		spinlock_acquire(&slave->catch_lock);
 | |
| 		if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == CS_UPTODATE)
 | |
| 		{
 | |
| 			/* Slave is up to date with the binlog and no distribute is
 | |
| 			 * running on this slave.
 | |
| 			 */
 | |
| 			action = 1;
 | |
| 			slave->cstate |= CS_DIST;
 | |
| 		}
 | |
| 		else if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == (CS_UPTODATE|CS_DIST))
 | |
| 		{
 | |
| 			/* Slave is up to date with the binlog and a distribute is
 | |
| 			 * running on this slave.
 | |
| 			 */
 | |
| 			slave->overrun = 1;
 | |
| 			action = 2;
 | |
| 		}
 | |
| 		else if ((slave->cstate & CS_UPTODATE) == 0)
 | |
| 		{
 | |
| 			/* Slave is in catchup mode */
 | |
| 			action = 3;
 | |
| 		}
 | |
| 		slave->stats.n_actions[action-1]++;
 | |
| 		spinlock_release(&slave->catch_lock);
 | |
| 		if (action == 1)
 | |
| 		{
 | |
| 			if ((slave->binlog_pos == hdr->next_pos - hdr->event_size)
 | |
| 				&& (strcmp(slave->binlogfile, router->binlog_name) == 0 ||
 | |
| 					hdr->event_type == ROTATE_EVENT))
 | |
| 			{
 | |
| 				pkt = gwbuf_alloc(hdr->event_size + 5);
 | |
| 				buf = GWBUF_DATA(pkt);
 | |
| 				encode_value(buf, hdr->event_size + 1, 24);
 | |
| 				buf += 3;
 | |
| 				*buf++ = slave->seqno++;
 | |
| 				*buf++ = 0;	// OK
 | |
| 				memcpy(buf, ptr, hdr->event_size);
 | |
| 				if (hdr->event_type == ROTATE_EVENT)
 | |
| 				{
 | |
| 					blr_slave_rotate(slave, ptr);
 | |
| 				}
 | |
| 				slave->dcb->func.write(slave->dcb, pkt);
 | |
| 				if (hdr->event_type != ROTATE_EVENT)
 | |
| 				{
 | |
| 					slave->binlog_pos = hdr->next_pos;
 | |
| 				}
 | |
| 				spinlock_acquire(&slave->catch_lock);
 | |
| 				if (slave->overrun)
 | |
| 				{
 | |
| CYCLES	cycle_start, cycles;
 | |
| 					slave->stats.n_overrun++;
 | |
| 					slave->overrun = 0;
 | |
| 					spinlock_release(&router->lock);
 | |
| 					slave->cstate &= ~(CS_UPTODATE|CS_DIST);
 | |
| 					spinlock_release(&slave->catch_lock);
 | |
| cycle_start = rdtsc();
 | |
| 					blr_slave_catchup(router, slave);
 | |
| cycles = rdtsc() - cycle_start;
 | |
| log_duration(LOGD_SLAVE_CATCHUP2, cycles);
 | |
| 					spinlock_acquire(&router->lock);
 | |
| 					slave = router->slaves;
 | |
| 					if (slave)
 | |
| 						continue;
 | |
| 					else
 | |
| 						break;
 | |
| 				}
 | |
| 				else
 | |
| 				{
 | |
| 					slave->cstate &= ~CS_DIST;
 | |
| 				}
 | |
| 				spinlock_release(&slave->catch_lock);
 | |
| 			}
 | |
| 			else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
 | |
| 				&& strcmp(slave->binlogfile, router->binlog_name) == 0)
 | |
| 			{
 | |
| 				LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
 | |
| 					"Slave %d is ahead of expected position %s@%d. "
 | |
| 					"Expected position %d",
 | |
| 						slave->serverid, slave->binlogfile,
 | |
| 						slave->binlog_pos,
 | |
| 						hdr->next_pos - hdr->event_size)));
 | |
| 			}
 | |
| 			else if ((hdr->event_type != ROTATE_EVENT)
 | |
| 				&& (slave->binlog_pos != hdr->next_pos - hdr->event_size ||
 | |
| 					strcmp(slave->binlogfile, router->binlog_name) != 0))
 | |
| 			{
 | |
| 				/* Check slave is in catchup mode and if not
 | |
| 				 * force it to go into catchup mode.
 | |
| 				 */
 | |
| 				if (slave->cstate & CS_UPTODATE)
 | |
| 				{
 | |
| CYCLES	cycle_start, cycles;
 | |
| 					spinlock_release(&router->lock);
 | |
| 					LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
 | |
| 						"Force slave %d into catchup mode %s@%d\n",
 | |
| 						slave->serverid, slave->binlogfile,
 | |
| 						slave->binlog_pos)));
 | |
| 					spinlock_acquire(&slave->catch_lock);
 | |
| 					slave->cstate &= ~(CS_UPTODATE|CS_DIST);
 | |
| 					spinlock_release(&slave->catch_lock);
 | |
| cycle_start = rdtsc();
 | |
| 					blr_slave_catchup(router, slave);
 | |
| cycles = rdtsc() - cycle_start;
 | |
| log_duration(LOGD_SLAVE_CATCHUP1, cycles);
 | |
| 					spinlock_acquire(&router->lock);
 | |
| 					slave = router->slaves;
 | |
| 					if (slave)
 | |
| 						continue;
 | |
| 					else
 | |
| 						break;
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		slave = slave->next;
 | |
| 	}
 | |
| 	spinlock_release(&router->lock);
 | |
| 	log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
 | |
| }
 | |
| 
 | |
| static void
 | |
| blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len)
 | |
| {
 | |
| char	buf[400], *bufp;
 | |
| int	i;
 | |
| 
 | |
| 	bufp = buf;
 | |
| 	bufp += sprintf(bufp, "%s length = %d: ", msg, len);
 | |
| 	for (i = 0; i < len && i < 40; i++)
 | |
| 		bufp += sprintf(bufp, "0x%02x ", ptr[i]);
 | |
| 	if (i < len)
 | |
| 		skygw_log_write_flush(file, "%s...\n", buf);
 | |
| 	else
 | |
| 		skygw_log_write_flush(file, "%s\n", buf);
 | |
| 	
 | |
| }
 | 
