Initial binlog prototype

Supports pulling binlogs from the master, caching to a local file and relaying to a slave.

Only tested with a single slave and a single MaxScale thread.
This commit is contained in:
Mark Riddoch 2014-05-13 16:06:05 +01:00
parent 5e3ec5b3c8
commit 1d08b0100f
15 changed files with 3136 additions and 22 deletions

View File

@ -183,6 +183,8 @@ getUsers(SERVICE *service, struct users *users)
}
serviceGetUser(service, &service_user, &service_passwd);
if (service_user == NULL || service_passwd == NULL)
return -1;
/** multi-thread environment requires that thread init succeeds. */
if (mysql_thread_init()) {

View File

@ -47,6 +47,7 @@
* error and 0 bytes to read.
* This fixes a bug with many reads from
* backend
* 07/05/2014 Mark Riddoch Addition of callback mechanism
*
* @endverbatim
*/
@ -80,6 +81,7 @@ static bool dcb_set_state_nomutex(
DCB* dcb,
const dcb_state_t new_state,
dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
DCB* dcb_get_zombies(void)
{
@ -94,8 +96,8 @@ DCB* dcb_get_zombies(void)
*
* @return A newly allocated DCB or NULL if non could be allocated.
*/
DCB * dcb_alloc(
dcb_role_t role)
DCB *
dcb_alloc(dcb_role_t role)
{
DCB *rval;
@ -118,11 +120,16 @@ DCB *rval;
spinlock_init(&rval->writeqlock);
spinlock_init(&rval->delayqlock);
spinlock_init(&rval->authlock);
spinlock_init(&rval->cb_lock);
rval->fd = -1;
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
rval->state = DCB_STATE_ALLOC;
bitmask_init(&rval->memdata.bitmask);
rval->writeqlen = 0;
rval->high_water = 0;
rval->low_water = 0;
rval->next = NULL;
rval->callbacks = NULL;
spinlock_acquire(&dcbspin);
if (allDCBs == NULL)
@ -676,9 +683,11 @@ return_n:
int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w;
int saved_errno = 0;
int w, qlen;
int saved_errno = 0;
int below_water;
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
ss_dassert(queue != NULL);
if (queue == NULL ||
@ -712,6 +721,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
@ -824,6 +835,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
* for suspended write.
*/
dcb->writeq = queue;
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL)
{
@ -847,6 +860,13 @@ dcb_write(DCB *dcb, GWBUF *queue)
return 0;
}
spinlock_release(&dcb->writeqlock);
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
{
atomic_add(&dcb->stats.n_high_water, 1);
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
}
return 1;
}
@ -861,9 +881,12 @@ dcb_write(DCB *dcb, GWBUF *queue)
int
dcb_drain_writeq(DCB *dcb)
{
int n = 0;
int w;
int saved_errno = 0;
int n = 0;
int w;
int saved_errno = 0;
int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
@ -924,6 +947,17 @@ int saved_errno = 0;
}
}
spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
return n;
}
@ -966,6 +1000,8 @@ dcb_close(DCB *dcb)
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -1008,6 +1044,8 @@ printDCB(DCB *dcb)
printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
printf("\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
printf("\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1054,6 +1092,8 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
dcb = dcb->next;
}
spinlock_release(&dcbspin);
@ -1079,6 +1119,8 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1415,4 +1457,135 @@ int gw_write(
return w;
}
/**
* Add a callback
*
* Duplicate registrations are not allowed, therefore an error will be returned if
* the specific function, reason and userdata triple are already registered.
* An error will also be returned if the is insufficient memeory available to
* create the registration.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was added
*/
int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
{
DCB_CALLBACK *cb, *ptr;
int rval = 1;
if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL)
{
return 0;
}
ptr->reason = reason;
ptr->cb = callback;
ptr->userdata = userdata;
ptr->next = NULL;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
dcb->callbacks = ptr;
spinlock_release(&dcb->cb_lock);
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback &&
cb->userdata == userdata)
{
free(ptr);
spinlock_release(&dcb->cb_lock);
return 0;
}
if (cb->next == NULL)
cb->next = ptr;
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}
return rval;
}
/**
* Remove a callback from the callback list for the DCB
*
* Searches down the linked list to find he callback with a matching reason, function
* and userdata.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was removed
*/
int
dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON), void *userdata)
{
DCB_CALLBACK *cb, *pcb = NULL;
int rval = 0;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
rval = 0;
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback
&& cb->userdata == userdata)
{
if (pcb == NULL)
pcb->next = cb->next;
else
dcb->callbacks = cb->next;
spinlock_release(&dcb->cb_lock);
free(cb);
rval = 1;
break;
}
pcb = cb;
cb = cb->next;
}
}
if (!rval)
spinlock_release(&dcb->cb_lock);
return rval;
}
/**
* Call the set of callbacks registered for a particular reason.
*
* @param dcb The DCB to call the callbacks regarding
* @param reason The reason that has triggered the call
*/
static void
dcb_call_callback(DCB *dcb, DCB_REASON reason)
{
DCB_CALLBACK *cb, *nextcb;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
while (cb)
{
if (cb->reason == reason)
{
nextcb = cb->next;
spinlock_release(&dcb->cb_lock);
cb->cb(dcb, reason, cb->userdata);
spinlock_acquire(&dcb->cb_lock);
cb = nextcb;
}
else
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}

View File

@ -114,7 +114,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
/*
* Only create a router session if we are not the listening
* DCB. Creating a router session may create a connection to a
* DCB or an internal DCB. Creating a router session may create a connection to a
* backend server, depending upon the router module implementation
* and should be avoided for the listener session
*
@ -122,7 +122,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
* session, therefore it is important that the session lock is
* relinquished beforethe router call.
*/
if (client_dcb->state != DCB_STATE_LISTENING)
if (client_dcb->state != DCB_STATE_LISTENING && client_dcb->dcb_role != DCB_ROLE_INTERNAL)
{
session->router_session =
service->router->newSession(service->router_instance,

View File

@ -48,6 +48,8 @@ struct service;
* 15/07/2013 Massimiliano Pinto Added session entry point
* 16/07/2013 Massimiliano Pinto Added command type for dcb
* 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb
* 07/05/2014 Mark Riddoch Addition of callback mechanism
* 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks
*
* @endverbatim
*/
@ -99,6 +101,8 @@ typedef struct dcbstats {
int n_writes; /*< Number of writes on this descriptor */
int n_accepts; /*< Number of accepts on this descriptor */
int n_buffered; /*< Number of buffered writes */
int n_high_water; /*< Number of crosses of high water mark */
int n_low_water; /*< Number of crosses of low water mark */
} DCBSTATS;
/**
@ -137,10 +141,35 @@ typedef enum {
} dcb_state_t;
typedef enum {
DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */
DCB_ROLE_REQUEST_HANDLER /*< Serves dedicated client */
DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */
DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */
DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */
} dcb_role_t;
/**
* Callback reasons for the DCB callback mechanism.
*/
typedef enum {
DCB_REASON_CLOSE, /*< The DCB is closing */
DCB_REASON_DRAINED, /*< The write delay queue has drained */
DCB_REASON_HIGH_WATER, /*< Cross high water mark */
DCB_REASON_LOW_WATER, /*< Cross low water mark */
DCB_REASON_ERROR, /*< An error was flagged on the connection */
DCB_REASON_HUP /*< A hangup was detected */
} DCB_REASON;
/**
* Callback structure - used to track callbacks registered on a DCB
*/
typedef struct dcb_callback {
DCB_REASON reason; /*< The reason for the callback */
int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata);
void *userdata; /*< User data to be sent in the callback */
struct dcb_callback
*next; /*< Next callback for this DCB */
} DCB_CALLBACK;
/**
* Descriptor Control Block
*
@ -172,6 +201,7 @@ typedef struct dcb {
struct session *session; /**< The owning session */
GWPROTOCOL func; /**< The functions for this descriptor */
unsigned int writeqlen; /**< Current number of byes in the write queue */
SPINLOCK writeqlock; /**< Write Queue spinlock */
GWBUF *writeq; /**< Write Data Queue */
SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */
@ -185,6 +215,11 @@ typedef struct dcb {
void *data; /**< Specific client data */
DCBMM memdata; /**< The data related to DCB memory management */
int command; /**< Specific client command type */
SPINLOCK cb_lock; /**< The lock for the callbacks linked list */
DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */
unsigned int high_water; /**< High water mark */
unsigned int low_water; /**< Low water mark */
#if defined(SS_DEBUG)
skygw_chk_t dcb_chk_tail;
#endif
@ -203,6 +238,11 @@ int fail_accept_errno;
#define DCB_SESSION(x) (x)->session
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
#define DCB_WRITEQLEN(x) (x)->writeqlen
#define DCB_SET_LOW_WATER(x, lo) (x)->low_water = (lo);
#define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi);
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
DCB *dcb_get_zombies(void);
int gw_write(
@ -229,6 +269,10 @@ void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */
int dcb_isclient(DCB *); /* the DCB is the client of the session */
void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */
void dcb_add_to_zombieslist(DCB* dcb);
int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *),
void *);
int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON),
void *);
bool dcb_set_state(
DCB* dcb,

View File

@ -0,0 +1,325 @@
#ifndef _BLR_H
#define _BLR_H
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file blr.h - The binlog router header file
*
* @verbatim
* Revision History
*
* Date Who Description
* 02/04/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <dcb.h>
#include <buffer.h>
#define BINLOG_FNAMELEN 16
#define BLR_PROTOCOL "MySQLBackend"
#define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e }
#define BINLOG_NAMEFMT "mysql-bin.%06d"
/**
* High and Low water marks for the slave dcb. These values can be overriden
* by the router options highwater and lowwater.
*/
#define DEF_LOW_WATER 20000
#define DEF_HIGH_WATER 100000
/**
* Some useful macros for examining the MySQL Response packets
*/
#define MYSQL_RESPONSE_OK(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4) == 0x00)
#define MYSQL_RESPONSE_EOF(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4) == 0xfe)
#define MYSQL_RESPONSE_ERR(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4) == 0xff)
#define MYSQL_ERROR_CODE(buf) (*((uint8_t *)GWBUF_DATA(buf) + 5))
#define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 6)
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
/**
* Slave statistics
*/
typedef struct {
unsigned int n_events; /*< Number of events sent */
unsigned int n_bursts; /*< Number of bursts sent */
unsigned int n_requests; /*< Number of requests received */
unsigned int n_flows; /*< Number of flow control restarts */
} SLAVE_STATS;
/**
* The client session structure used within this router. This represents
* the slaves that are replicating binlogs from MaxScale.
*/
typedef struct router_slave {
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_top;
#endif
DCB *dcb; /*< The slave server DCB */
int state; /*< The state of this slave */
int binlog_pos; /*< Binlog position for this slave */
char binlogfile[BINLOG_FNAMELEN+1];
/*< Current binlog file for this slave */
int serverid; /*< Server-id of the slave */
char *hostname; /*< Hostname of the slave, if known */
char *user; /*< Username if given */
char *passwd; /*< Password if given */
short port; /*< MySQL port */
uint32_t rank; /*< Replication rank */
uint8_t seqno; /*< Replication dump sequence no */
SPINLOCK catch_lock; /*< Event catchup lock */
unsigned int cstate; /*< Catch up state */
SPINLOCK rses_lock; /*< Protects rses_deleted */
struct router_instance
*router; /*< Pointer to the owning router */
struct router_slave *next;
SLAVE_STATS stats; /*< Slave statistics */
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
} ROUTER_SLAVE;
/**
* The statistics for this router instance
*/
typedef struct {
unsigned int n_slaves; /*< Number slave sessions created */
unsigned int n_reads; /*< Number of record reads */
uint64_t n_binlogs; /*< Number of binlog records from master */
uint64_t n_binlog_errors;/*< Number of binlog records from master */
uint64_t n_rotates; /*< Number of binlog rotate events */
uint64_t n_cachehits; /*< Number of hits on the binlog cache */
uint64_t n_cachemisses; /*< Number of misses on the binlog cache */
unsigned int n_registered; /*< Number of registered slaves */
uint64_t n_fakeevents; /*< Fake events not written to disk */
uint64_t events[0x24]; /*< Per event counters */
} ROUTER_STATS;
/**
* Saved responses from the master that will be forwarded to slaves
*/
typedef struct {
GWBUF *server_id; /*< Master server id */
GWBUF *heartbeat; /*< Heartbeat period */
GWBUF *chksum1; /*< Binlog checksum 1st response */
GWBUF *chksum2; /*< Binlog checksum 2nd response */
GWBUF *gtid_mode; /*< GTID Mode response */
GWBUF *uuid; /*< Master UUID */
GWBUF *setslaveuuid; /*< Set Slave UUID */
GWBUF *setnames; /*< Set NAMES latin1 */
uint8_t *fde_event; /*< Format Description Event */
int fde_len; /*< Length of fde_event */
} MASTER_RESPONSES;
/**
* The binlog record structure. This contains the actual packet received from the
* master, the binlog position of the data in the packet, a point to the data and
* the length of the binlog record.
*
* This allows requests for binlog records in the cache to be serviced by simply
* sending the exact same packet as was received by MaxScale from the master.
* Items are written to the backing file as soon as they are received. The binlog
* cache is flushed of old records periodically, releasing the GWBUF's back to the
* free memory pool.
*/
typedef struct {
unsigned long position; /*< binlog record position for this cache entry */
GWBUF *pkt; /*< The packet received from the master */
unsigned char *data; /*< Pointer to the data within the packet */
unsigned int record_len; /*< Binlog record length */
} BLCACHE_RECORD;
/**
* The binlog cache. A cache exists for each file that hold cached bin log records.
* Typically the router will hold two binlog caches, one for the current file and one
* for the previous file.
*/
typedef struct {
char filename[BINLOG_FNAMELEN+1];
BLCACHE_RECORD *first;
BLCACHE_RECORD *current;
int cnt;
} BLCACHE;
/**
* The per instance data for the router.
*/
typedef struct router_instance {
SERVICE *service; /*< Pointer to the service using this router */
ROUTER_SLAVE *slaves; /*< Link list of all the slave connections */
SPINLOCK lock; /*< Spinlock for the instance data */
char *uuid; /*< UUID for the router to use w/master */
int masterid; /*< Server ID of the master */
int serverid; /*< Server ID to use with master */
char *user; /*< User name to use with master */
char *password; /*< Password to use with master */
DCB *master; /*< DCB for master connection */
SESSION *session; /*< Fake session for master connection */
unsigned int master_state; /*< State of the master FSM */
GWBUF *residual; /*< Any residual binlog event */
MASTER_RESPONSES saved_master; /*< Saved master responses */
char binlog_name[BINLOG_FNAMELEN+1];
/*< Name of the current binlog file */
uint64_t binlog_position;
/*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog
* file being written
*/
unsigned int low_water; /*< Low water mark for client DCB */
unsigned int high_water; /*< High water mark for client DCB */
BLCACHE *cache[2];
ROUTER_STATS stats; /*< Statistics for this router */
int active_logs;
struct router_instance
*next;
} ROUTER_INSTANCE;
/**
* Packet header for replication messages
*/
typedef struct rep_header {
int payload_len; /*< Payload length (24 bits) */
uint8_t seqno; /*< Response sequence number */
uint8_t ok; /*< OK Byte from packet */
uint32_t timestamp; /*< Timestamp - start of binlog record */
uint8_t event_type; /*< Binlog event type */
uint32_t serverid; /*< Server id of master */
uint32_t event_size; /*< Size of header, post-header and body */
uint32_t next_pos; /*< Position of next event */
uint16_t flags; /*< Event flags */
} REP_HEADER;
/**
* State machine for the master to MaxScale replication
*/
#define BLRM_UNCONNECTED 0x0000
#define BLRM_AUTHENTICATED 0x0001
#define BLRM_TIMESTAMP 0x0002
#define BLRM_SERVERID 0x0003
#define BLRM_HBPERIOD 0x0004
#define BLRM_CHKSUM1 0x0005
#define BLRM_CHKSUM2 0x0006
#define BLRM_GTIDMODE 0x0007
#define BLRM_MUUID 0x0008
#define BLRM_SUUID 0x0009
#define BLRM_LATIN1 0x000A
#define BLRM_REGISTER 0x000B
#define BLRM_BINLOGDUMP 0x000C
#define BLRM_MAXSTATE 0x000C
static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrieval",
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
"Set Slave UUID", "Set Names", "Register slave", "Binlog Dump" };
#define BLRS_CREATED 0x0000
#define BLRS_UNREGISTERED 0x0001
#define BLRS_REGISTERED 0x0002
#define BLRS_DUMPING 0x0003
#define BLRS_MAXSTATE 0x0003
static char *blrs_states[] = { "Created", "Unregistered", "Registered",
"Sending binlogs" };
/**
* Slave catch-up status
*/
#define CS_READING 0x0001
#define CS_INNERLOOP 0x0002
/**
* MySQL protocol OpCodes needed for replication
*/
#define COM_QUERY 0x03
#define COM_REGISTER_SLAVE 0x15
#define COM_BINLOG_DUMP 0x12
/**
* Binlog event types
*/
#define START_EVENT_V3 0x01
#define QUERY_EVENT 0x02
#define STOP_EVENT 0x03
#define ROTATE_EVENT 0x04
#define INTVAR_EVENT 0x05
#define LOAD_EVENT 0x06
#define SLAVE_EVENT 0x07
#define CREATE_FILE_EVENT 0x08
#define APPEND_BLOCK_EVENT 0x09
#define EXEC_LOAD_EVENT 0x0A
#define DELETE_FILE_EVENT 0x0B
#define NEW_LOAD_EVENT 0x0C
#define RAND_EVENT 0x0D
#define USER_VAR_EVENT 0x0E
#define FORMAT_DESCRIPTION_EVENT 0x0F
#define XID_EVENT 0x10
#define BEGIN_LOAD_QUERY_EVENT 0x11
#define EXECUTE_LOAD_QUERY_EVENT 0x12
#define TABLE_MAP_EVENT 0x13
#define WRITE_ROWS_EVENTv0 0x14
#define UPDATE_ROWS_EVENTv0 0x15
#define DELETE_ROWS_EVENTv0 0x16
#define WRITE_ROWS_EVENTv1 0x17
#define UPDATE_ROWS_EVENTv1 0x18
#define DELETE_ROWS_EVENTv1 0x19
#define INCIDENT_EVENT 0x1A
#define HEARTBEAT_EVENT 0x1B
#define IGNORABLE_EVENT 0x1C
#define ROWS_QUERY_EVENT 0x1D
#define WRITE_ROWS_EVENTv2 0x1E
#define UPDATE_ROWS_EVENTv2 0x1F
#define DELETE_ROWS_EVENTv2 0x20
#define GTID_EVENT 0x21
#define ANONYMOUS_GTID_EVENT 0x22
#define PREVIOUS_GTIDS_EVENT 0x23
/**
* Binlog event flags
*/
#define LOG_EVENT_BINLOG_IN_USE_F 0x0001
#define LOG_EVENT_FORCED_ROTATE_F 0x0002
#define LOG_EVENT_THREAD_SPECIFIC_F 0x0004
#define LOG_EVENT_SUPPRESS_USE_F 0x0008
#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x0010
#define LOG_EVENT_ARTIFICIAL_F 0x0020
#define LOG_EVENT_RELAY_LOG_F 0x0040
#define LOG_EVENT_IGNORABLE_F 0x0080
#define LOG_EVENT_NO_FILTER_F 0x0100
#define LOG_EVENT_MTS_ISOLATE_F 0x0200
/*
* Externals within the router
*/
extern void blr_start_master(ROUTER_INSTANCE *);
extern void blr_master_response(ROUTER_INSTANCE *, GWBUF *);
extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);
extern void blr_init_cache(ROUTER_INSTANCE *);
extern void blr_file_init(ROUTER_INSTANCE *);
extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
extern GWBUF *blr_read_binlog(int, unsigned int, REP_HEADER *);
#endif

View File

@ -414,21 +414,23 @@ static int gw_read_backend_event(DCB *dcb) {
if (dcb->session->client != NULL) {
client_protocol = SESSION_PROTOCOL(dcb->session,
MySQLProtocol);
}
if (client_protocol != NULL) {
CHK_PROTOCOL(client_protocol);
if (client_protocol != NULL) {
CHK_PROTOCOL(client_protocol);
if (client_protocol->state == MYSQL_IDLE)
{
router->clientReply(router_instance,
if (client_protocol->state == MYSQL_IDLE)
{
router->clientReply(router_instance,
rsession,
writebuf,
dcb);
rc = 1;
}
goto return_rc;
}
rc = 1;
}
goto return_rc;
} else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) {
router->clientReply(router_instance, rsession, writebuf, dcb);
rc = 1;
}
}
}
return_rc:

View File

@ -49,6 +49,8 @@ MODULES= libdebugcli.so libreadconnroute.so libtestroute.so
all: $(MODULES)
(cd readwritesplit; make )
(cd binlog; make )
libtestroute.so: $(TESTOBJ)
$(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@
@ -68,18 +70,22 @@ libreadwritesplit.so:
clean:
rm -f $(OBJ) $(MODULES)
(cd readwritesplit; touch depend.mk; make clean)
(cd binlog; touch depend.mk; make clean)
tags:
ctags $(SRCS) $(HDRS)
(cd readwritesplit; make tags)
(cd binlog; make tags)
depend:
@rm -f depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
(cd readwritesplit; touch depend.mk ; make depend)
(cd binlog; touch depend.mk ; make depend)
install: $(MODULES)
install -D $(MODULES) $(DEST)/MaxScale/modules
(cd readwritesplit; make DEST=$(DEST) install)
(cd binlog; make DEST=$(DEST) install)
include depend.mk

View File

@ -0,0 +1,65 @@
# This file is distributed as part of the SkySQL Gateway. It is free
# software: you can redistribute it and/or modify it under the terms of the
# GNU General Public License as published by the Free Software Foundation,
# version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright SkySQL Ab 2013
#
# Revision History
# Date Who Description
# 2/04/14 Mark Riddoch Initial framework put in place
include ../../../../build_gateway.inc
LOGPATH := $(ROOT_PATH)/log_manager
UTILSPATH := $(ROOT_PATH)/utils
QCLASSPATH := $(ROOT_PATH)/query_classifier
CC=cc
CFLAGS=-c -fPIC -I/usr/include -I../../include -I../../../include \
-I$(LOGPATH) -I$(UTILSPATH) -I$(QCLASSPATH) \
$(MYSQL_HEADERS) -Wall -g
include ../../../../makefile.inc
LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \
-Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \
-Wl,-rpath,$(EMBEDDED_LIB)
SRCS=blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c
OBJ=$(SRCS:.c=.o)
LIBS=-lssl -pthread -llog_manager -lmysqld
MODULES=libbinlogrouter.so
all: $(MODULES)
$(MODULES): $(OBJ)
$(CC) $(LDFLAGS) $(OBJ) $(UTILSPATH)/skygw_utils.o $(LIBS) -o $@
.c.o:
$(CC) $(CFLAGS) $< -o $@
clean:
rm -f $(OBJ) $(MODULES)
tags:
ctags $(SRCS) $(HDRS)
depend:
@rm -f depend.mk
cc -M $(CFLAGS) $(SRCS) > depend.mk
install: $(MODULES)
install -D $(MODULES) $(DEST)/MaxScale/modules
include depend.mk

View File

@ -0,0 +1,53 @@
The binlog router is not a "normal" MaxScale router, it is not
designed to be used to route client requests to a database in the
usual proxy fashion. Rather it is designed to allow MaxScale to be
used as a relay server in a MySQL replication environment.
In this environment MaxScale sits between a master MySQL server and
a set of slave servers. The slaves servers execute a change master
to the MaxScale server, otehrwise they are configured in exactly
the same way as a normal MySQL slave server.
The master server configuration is unaltered, it simply sees a
single slave server.
MaxScale is configured as usual, with a service definition that
references the binlog router. The major configuration option to
consider is the router_options paramter, in the binlog router this
provides the binlog specific configuration parameters.
uuid=
This is the UUID that MaxScale uses when it connects
to the real master. It will report the master's
UUID to slaves that connect to it.
server-id=
The server-id that MaxScale uses when it connects
to the real master server. Again it will reports
the master's server-id to the slaves that connect
to it.
user=
The user that MaxScale uses to login to the real
master
password=
The password that MaxScale uses to login to the
real master
master-id=
The server-id of the real master. MaxScale should
get this by sending a query, but at the moment it
is in the configuration file for ease of implementation
An example binlog service configuration is shown below:
[Binlog Service]
type=service
router=binlogrouter
servers=master
router_options=uuid=f12fcb7f-b97b-11e3-bc5e-0401152c4c22,server-id=3,user=repl,password=slavepass,master-id=1
user=maxscale
passwd=Mhu87p2D
The servers list for a binlog router service should contain just
the master server. In future a list will be given and the monitor
used to determine which server is the current master server.

View File

@ -0,0 +1,18 @@
The binlog router contained here is a prototype implementation and
should not be consider as production ready.
The router has been written and tested with MySQL 5.6 as a reference
for the replication behaviour, more investigation and implementation
is likely to be needed in order to use other versions of MySQL,
MariaDB or Percona Server.
To Do List:
1. Thread safety needs to be examine, currently MaxScale has been
run with a single thread when testing this router.
2. Binlog rotate events have yet to be tested.
3. The router does not implement the replication heartbeat mechanism.
4. Performance measurements have yet to be made.

View File

@ -0,0 +1,654 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file blr.c - binlog router, allows MaxScale to act as an intermediatory for replication
*
* The binlog router is designed to be used in replication environments to
* increase the replication fanout of a master server. It provides a transparant
* mechanism to read the binlog entries for multiple slaves while requiring
* only a single connection to the actual master to support the slaves.
*
* The current prototype implement is designed to support MySQL 5.6 and has
* a number of limitations. This prototype is merely a proof of concept and
* should not be considered production ready.
*
* @verbatim
* Revision History
*
* Date Who Description
* 02/04/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <server.h>
#include <router.h>
#include <atomic.h>
#include <spinlock.h>
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <mysql_client_server_protocol.h>
extern int lm_enabled_logfiles_bitmask;
static char *version_str = "V1.0.0";
/* The router entry points */
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *router_session);
static void freeSession(ROUTER *instance, void *router_session);
static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
static void clientReply(
ROUTER *instance,
void *router_session,
GWBUF *queue,
DCB *backend_dcb);
static void errorReply(
ROUTER *instance,
void *router_session,
char *message,
DCB *backend_dcb,
int action);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
/** The module object definition */
static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostics,
clientReply,
errorReply,
getCapabilities
};
static bool rses_begin_locked_router_action(ROUTER_SLAVE *);
static void rses_end_locked_router_action(ROUTER_SLAVE *);
static SPINLOCK instlock;
static ROUTER_INSTANCE *instances;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"Initialise binlog router module %s.\n", version_str)));
spinlock_init(&instlock);
instances = NULL;
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
ROUTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the router for a particular service
* within MaxScale.
*
* The process of creating the instance causes the router to register
* with the master server and begin replication of the binlogs from
* the master server to MaxScale.
*
* @param service The service this router is being create for
* @param options An array of options for this query router
*
* @return The instance data for this new instance
*/
static ROUTER *
createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE *inst;
char *value;
int i;
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
return NULL;
}
memset(&inst->stats, 0, sizeof(ROUTER_STATS));
memset(&inst->saved_master, 0, sizeof(MASTER_RESPONSES));
inst->service = service;
spinlock_init(&inst->lock);
inst->low_water = DEF_LOW_WATER;
inst->high_water = DEF_HIGH_WATER;
/*
* We only support one server behind this router, since the server is
* the master from which we replicate binlog records. Therefore check
* that only one server has been defined.
*
* A later improvement will be to define multiple servers and have the
* router use the information that is supplied by the monitor to find
* which of these servers is currently the master and replicate from
* that server.
*/
if (service->databases == NULL || service->databases->nextdb != NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : Exactly one database server may be "
"for use with the binlog router.")));
}
/*
* Process the options.
* We have an array of attrbute values passed to us that we must
* examine. Supported attributes are:
* uuid=
* server-id=
* user=
* password=
* master-id=
*/
if (options)
{
for (i = 0; options[i]; i++)
{
if ((value = strchr(options[i], '=')) == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Warning : Unsupported router "
"option %s for binlog router.",
options[i])));
}
else
{
*value = 0;
value++;
if (strcmp(options[i], "uuid") == 0)
{
inst->uuid = strdup(value);
}
else if (strcmp(options[i], "server-id") == 0)
{
inst->serverid = atoi(value);
}
else if (strcmp(options[i], "user") == 0)
{
inst->user = strdup(value);
}
else if (strcmp(options[i], "password") == 0)
{
inst->password = strdup(value);
}
else if (strcmp(options[i], "master-id") == 0)
{
inst->masterid = atoi(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{
inst->low_water = atoi(value);
}
else if (strcmp(options[i], "highwater") == 0)
{
inst->high_water = atoi(value);
}
else
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Warning : Unsupported router "
"option %s for binlog router.",
options[i])));
}
}
}
}
/*
* We have completed the creation of the instance data, so now
* insert this router instance into the linked list of routers
* that have been created with this module.
*/
spinlock_acquire(&instlock);
inst->next = instances;
instances = inst;
spinlock_release(&instlock);
inst->active_logs = 0;
/*
* Initialise the binlog file and position
*/
blr_file_init(inst);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Binlog router: current binlog file is: %s, current position %u\n",
inst->binlog_name, inst->binlog_position)));
/*
* Initialise the binlog cache for this router instance
*/
blr_init_cache(inst);
/*
* Now start the replication from the master to MaxScale
*/
blr_start_master(inst);
return (ROUTER *)inst;
}
/**
* Associate a new session with this instance of the router.
*
* In the case of the binlog router a new session equates to a new slave
* connecting to MaxScale and requesting binlog records. We need to go
* through the slave registration process for this new slave.
*
* @param instance The router instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(ROUTER *instance, SESSION *session)
{
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
ROUTER_SLAVE *slave;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"binlog router: %lu [newSession] new router session with "
"session %p, and inst %p.",
pthread_self(),
session,
inst)));
if ((slave = (ROUTER_SLAVE *)calloc(1, sizeof(ROUTER_SLAVE))) == NULL)
{
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_ERROR,
"Insufficient memory to create new slave session for binlog router")));
return NULL;
}
#if defined(SS_DEBUG)
slave->rses_chk_top = CHK_NUM_ROUTER_SES;
slave->rses_chk_tail = CHK_NUM_ROUTER_SES;
#endif
memset(&slave->stats, 0, sizeof(SLAVE_STATS));
atomic_add(&inst->stats.n_slaves, 1);
slave->state = BLRS_CREATED; /* Set initial state of the slave */
slave->dcb = session->client;
slave->router = instance;
/**
* Add this session to the list of active sessions.
*/
spinlock_acquire(&inst->lock);
slave->next = inst->slaves;
inst->slaves = slave;
spinlock_release(&inst->lock);
CHK_CLIENT_RSES(slave);
return (void *)slave;
}
/**
* The session is no longer required. Shutdown all operation and free memory
* associated with this session. In this case a single session is associated
* to a slave of MaxScale. Therefore this is called when that slave is no
* longer active and should remove of reference to that slave, free memory
* and prevent any further forwarding of binlog records to that slave.
*
* Parameters:
* @param router_instance The instance of the router
* @param router_cli_ses The particular session to free
*
*/
static void freeSession(
ROUTER* router_instance,
void* router_client_ses)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)router_instance;
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_client_ses;
int prev_val;
prev_val = atomic_add(&router->stats.n_slaves, -1);
ss_dassert(prev_val > 0);
/*
* Remove the slave session form the list of slaves that are using the
* router currently.
*/
spinlock_acquire(&router->lock);
if (router->slaves == slave) {
router->slaves = slave->next;
} else {
ROUTER_SLAVE *ptr = router->slaves;
while (ptr != NULL && ptr->next != slave) {
ptr = ptr->next;
}
if (ptr != NULL) {
ptr->next = slave->next;
}
}
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [freeSession] Unlinked router_client_session %p from "
"router %p. Connections : %d. ",
pthread_self(),
slave,
router,
prev_val-1)));
free(slave);
}
/**
* Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc.
*
* @param instance The router instance data
* @param router_session The session being closed
*/
static void
closeSession(ROUTER *instance, void *router_session)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
if (slave == NULL)
{
/*
* We must be closing the master session.
*
* TODO: Handle closure of master session
*/
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_ERROR, "Binlog router close session with master")));
return;
}
CHK_CLIENT_RSES(slave);
/**
* Lock router client session for secure read and update.
*/
if (rses_begin_locked_router_action(slave))
{
/* decrease server registered slaves counter */
atomic_add(&router->stats.n_registered, -1);
/*
* Mark the slave as unregistered to prevent the forwarding
* of any more binlog records to this slave.
*/
slave->state = BLRS_UNREGISTERED;
/* Unlock */
rses_end_locked_router_action(slave);
}
}
/**
* We have data from the client, this is likely to be packets related to
* the registration of the slave to receive binlog records. Unlike most
* MaxScale routers there is no forwarding to the backend database, merely
* the return of either predefined server responses that have been cached
* or binlog records.
*
* @param instance The router instance
* @param router_session The router session returned from the newSession call
* @param queue The queue of data buffers to route
* @return The number of bytes sent
*/
static int
routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
return blr_slave_request(router, slave, queue);
}
static char *event_names[] = {
"Invalid", "Start Event V3", "Query Event", "Stop Event", "Rotate Event",
"Integer Session Variable", "Load Event", "Slave Event", "Create File Event",
"Append Block Event", "Exec Load Event", "Delete File Event",
"New Load Event", "Rand Event", "User Variable Event", "Format Description Event",
"Transaction ID Event (2 Phase Commit)", "Begin Load Query Event",
"Execute Load Query Event", "Table Map Event", "Write Rows Event (v0)",
"Update Rows Event (v0)", "Delete Rows Event (v0)", "Write Rows Event (v1)",
"Update Rows Event (v1)", "Delete Rows Event (v1)", "Incident Event",
"Heartbeat Event", "Ignorable Event", "Rows Query Event", "Write Rows Event (v2)",
"Update Rows Event (v2)", "Delete Rows Event (v2)", "GTID Event",
"Anonymous GTID Event", "Previous GTIDS Event"
};
/**
* Display router diagnostics
*
* @param instance Instance of the router
* @param dcb DCB to send diagnostics to
*/
static void
diagnostics(ROUTER *router, DCB *dcb)
{
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
ROUTER_SLAVE *session;
int i = 0;
spinlock_acquire(&router_inst->lock);
session = router_inst->slaves;
while (session)
{
i++;
session = session->next;
}
spinlock_release(&router_inst->lock);
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
router_inst->binlog_name);
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
router_inst->binlog_position);
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
router_inst->stats.n_slaves);
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
router_inst->stats.n_binlogs);
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
router_inst->stats.n_fakeevents);
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
router_inst->stats.n_binlog_errors);
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
router_inst->stats.n_rotates);
dcb_printf(dcb, "\tNumber of binlog cache hits: %u\n",
router_inst->stats.n_cachehits);
dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n",
router_inst->stats.n_cachemisses);
dcb_printf(dcb, "\tNumber of packets received: %u\n",
router_inst->stats.n_reads);
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
(double)router_inst->stats.n_binlogs / router_inst->stats.n_reads);
dcb_printf(dcb, "\tEvents received:\n");
for (i = 0; i < 0x24; i++)
{
dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]);
}
if (router_inst->slaves)
{
dcb_printf(dcb, "\tSlaves:\n");
spinlock_acquire(&router_inst->lock);
session = router_inst->slaves;
while (session)
{
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb, "\t\tSlave DCB: %x\n", session->dcb);
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);
dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos);
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
session = session->next;
}
spinlock_release(&router_inst->lock);
}
}
/**
* Client Reply routine - in this case this is a message from the
* master server, It should be sent to the state machine that manages
* master packets as it may be binlog records or part of the registration
* handshake that takes part during connection establishment.
*
*
* @param instance The router instance
* @param router_session The router session
* @param master_dcb The DCB for the connection to the master
* @param queue The GWBUF with reply data
*/
static void
clientReply(ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
atomic_add(&router->stats.n_reads, 1);
blr_master_response(router, queue);
}
/**
* Error Reply routine
*
* The routine will reply to client errors and/or closing the session
* or try to open a new backend connection.
*
* @param instance The router instance
* @param router_session The router session
* @param message The error message to reply
* @param backend_dcb The backend DCB
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
*
*/
static void
errorReply(
ROUTER *instance,
void *router_session,
char *message,
DCB *backend_dcb,
int action)
{
}
/** to be inline'd */
/**
* @node Acquires lock to router client session if it is not closed.
*
* Parameters:
* @param rses - in, use
*
*
* @return true if router session was not closed. If return value is true
* it means that router is locked, and must be unlocked later. False, if
* router was closed before lock was acquired.
*
*
* @details (write detailed description here)
*
*/
static bool rses_begin_locked_router_action(ROUTER_SLAVE *rses)
{
bool succp = false;
CHK_CLIENT_RSES(rses);
spinlock_acquire(&rses->rses_lock);
succp = true;
return succp;
}
/** to be inline'd */
/**
* @node Releases router client session lock.
*
* Parameters:
* @param rses - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
static void rses_end_locked_router_action(ROUTER_SLAVE * rses)
{
CHK_CLIENT_RSES(rses);
spinlock_release(&rses->rses_lock);
}
static uint8_t getCapabilities(ROUTER *inst, void *router_session)
{
return 0;
}

View File

@ -0,0 +1,69 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file blr_cache.c - binlog router cache, manage the binlog cache
*
* The binlog router is designed to be used in replication environments to
* increase the replication fanout of a master server. It provides a transparant
* mechanism to read the binlog entries for multiple slaves while requiring
* only a single connection to the actual master to support the slaves.
*
* The current prototype implement is designed to support MySQL 5.6 and has
* a number of limitations. This prototype is merely a proof of concept and
* should not be considered production ready.
*
* @verbatim
* Revision History
*
* Date Who Description
* 07/04/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <server.h>
#include <router.h>
#include <atomic.h>
#include <spinlock.h>
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
/**
* Initialise the cache for this instanceof the binlog router. As a side
* effect also determine the binlog file to read and the position to read
* from.
*
* @param router The router instance
*/
void
blr_init_cache(ROUTER_INSTANCE *router)
{
}

View File

@ -0,0 +1,282 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file blr_file.c - contains code for the router binlog file management
*
*
* @verbatim
* Revision History
*
* Date Who Description
* 14/04/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <service.h>
#include <server.h>
#include <router.h>
#include <atomic.h>
#include <spinlock.h>
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
static uint32_t extract_field(uint8_t *src, int bits);
/**
* Initialise the binlog file for this instance. MaxScale will look
* for all the binlogs that it has on local disk, determien the next
* binlog to use and initialise it for writing, determining the
* next record to be fetched from the real master.
*
* @param router The router instance this defines the master for this replication chain
*/
void
blr_file_init(ROUTER_INSTANCE *router)
{
char *ptr, path[1024], filename[1050];
int file_found, n = 1;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
if (access(path, R_OK) == -1)
mkdir(path, 0777);
file_found = 0;
do {
sprintf(filename, "%s/" BINLOG_NAMEFMT, path, n);
if (access(filename, R_OK) != -1)
{
file_found = 1;
n++;
}
else
file_found = 0;
} while (file_found);
n--;
if (n == 0) // No binlog files found
{
sprintf(filename, BINLOG_NAMEFMT, 1);
blr_file_create(router, filename);
}
else
{
sprintf(filename, BINLOG_NAMEFMT, n);
blr_file_append(router, filename);
}
}
void
blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
{
blr_file_create(router, file);
}
/**
* Create a new binlog file for the router to use.
*
* @param router The router instance
* @param file The binlog file name
*/
static void
blr_file_create(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
int fd;
unsigned char magic[] = BINLOG_MAGIC;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcat(path, "/");
strcat(path, file);
if ((fd = open(path, O_RDWR|O_CREAT, 0666)) != -1)
{
write(fd, magic, 4);
}
fsync(fd);
close(router->binlog_fd);
strcpy(router->binlog_name, file);
router->binlog_position = 4; /* Initial position after the magic number */
router->binlog_fd = fd;
}
/**
* Prepare an existing binlog file to be appened to.
*
* @param router The router instance
* @param file The binlog file name
*/
static void
blr_file_append(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
int fd;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcat(path, "/");
strcat(path, file);
fd = open(path, O_RDWR|O_APPEND, 0666);
fsync(fd);
close(router->binlog_fd);
strcpy(router->binlog_name, file);
router->binlog_position = lseek(fd, 0L, SEEK_END);
router->binlog_fd = fd;
}
/**
* Write a binlog entry to disk.
*
* @param router The router instance
* @param buf The binlog record
* @param len The length of the binlog record
*/
void
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
{
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
router->binlog_position = hdr->next_pos;
}
/**
* Flush the content of the binlog file to disk.
*
* @param router The binlog router
*/
void
blr_file_flush(ROUTER_INSTANCE *router)
{
fsync(router->binlog_fd);
}
int
blr_open_binlog(ROUTER_INSTANCE *router, char *binlog)
{
char *ptr, path[1024];
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcat(path, "/");
strcat(path, binlog);
return open(path, O_RDONLY, 0666);
}
/**
* Read a replication event into a GWBUF structure.
*
* @param fd File descriptor of the binlog file
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF *
blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr)
{
uint8_t hdbuf[19];
GWBUF *result;
unsigned char *data;
if (lseek(fd, pos, SEEK_SET) != pos)
{
return NULL;
}
/* Read the header information from the file */
if (read(fd, hdbuf, 19) != 19)
return NULL;
hdr->timestamp = extract_field(hdbuf, 32);
hdr->event_type = hdbuf[4];
hdr->serverid = extract_field(&hdbuf[5], 32);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = extract_field(&hdbuf[13], 32);
hdr->flags = extract_field(&hdbuf[17], 16);
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to allocate memory for binlog entry.\n")));
return NULL;
}
data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19); // Copy the header in
read(fd, &data[19], hdr->event_size - 19); // Read the balance
return result;
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
*/
static uint32_t
extract_field(uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
while (bits > 0)
{
rval |= (*src++) << shift;
shift += 8;
bits -= 8;
}
return rval;
}

View File

@ -0,0 +1,654 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file blr_master.c - contains code for the router to master communication
*
* The binlog router is designed to be used in replication environments to
* increase the replication fanout of a master server. It provides a transparant
* mechanism to read the binlog entries for multiple slaves while requiring
* only a single connection to the actual master to support the slaves.
*
* The current prototype implement is designed to support MySQL 5.6 and has
* a number of limitations. This prototype is merely a proof of concept and
* should not be considered production ready.
*
* @verbatim
* Revision History
*
* Date Who Description
* 02/04/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <server.h>
#include <router.h>
#include <atomic.h>
#include <session.h>
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
/* Temporary requirement for auth data */
#include <mysql_client_server_protocol.h>
extern int lm_enabled_logfiles_bitmask;
static GWBUF *blr_make_query(char *statement);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
static void encode_value(unsigned char *data, unsigned int value, int len);
static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
static void *CreateMySQLAuthData(char *username, char *password, char *database);
static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static uint32_t extract_field(uint8_t *src, int bits);
/**
* blr_start_master - controls the connection of the binlog router to the
* master MySQL server and triggers the slave registration process for
* the router.
*
* @param router The router instance
*/
void
blr_start_master(ROUTER_INSTANCE *router)
{
DCB *client;
GWBUF *buf;
client = dcb_alloc(DCB_ROLE_INTERNAL);
client->data = CreateMySQLAuthData(router->user, router->password, "");
router->session = session_alloc(router->service, client);
client->session = router->session;
router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL);
router->master_state = BLRM_AUTHENTICATED;
buf = blr_make_query("SELECT UNIX_TIMESTAMP()");
router->master->func.write(router->master, buf);
router->master_state = BLRM_TIMESTAMP;
}
/**
* Binlog router master side state machine event handler.
*
* Handles an incoming response from the master server to the binlog
* router.
*
* @param router The router instance
* @param buf The incoming packet
*/
void
blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
{
char query[128];
/*
* We need to make sure that incoming packets (gwbufs) are
* strictly processed in order and that we do not have packets
* from the same master being processed on multiple threads.
* to do this we create a queue of the GWBUF structures and have
* a flag that indicates if this routine is processing a packet
* on another thread. Items will be added to the queue if the
* routine is running in another thread. That thread will read
* the queue before returning.
* The action of adding items to the queue is protected by a
* spinlock and a flag that inidicates if the routine running
* in the other thread has reached the point at which it will
* no longer check the queue before returning. In order to
* manipulate the queue or the flag then router spinlock must
* be held.
*/
spinlock_acquire(&router->lock);
if (router->active_logs)
{
/*
* Thread already processing a packet and has not got
* to the point that it will not look at new packets
* added to the queue.
*/
router->queue = gwbuf_append(router->queue, buf);
spinlock_release(&router->lock);
return;
}
else
{
router->active_logs = 1;
}
spinlock_release(&router->lock);
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n",
router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
router->active_logs = 0;
spinlock_release(&router->lock);
return;
}
if (MYSQL_RESPONSE_ERR(buf))
{
LOGIF(LM, (skygw_log_write(
LOGFILE_ERROR,
"Received error: %d, %s from master during %s phase of the master state machine.\n",
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
router->active_logs = 0;
spinlock_release(&router->lock);
return;
}
do {
switch (router->master_state)
{
case BLRM_TIMESTAMP:
// Response to a timestamp message, no need to save this.
gwbuf_consume(buf, GWBUF_LENGTH(buf));
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
router->master_state = BLRM_SERVERID;
router->master->func.write(router->master, buf);
break;
case BLRM_SERVERID:
// Response to fetch of master's server-id
router->saved_master.server_id = buf;
// TODO: Extract the value of server-id and place in router->master_id
buf = blr_make_query("SET @master_heartbeat_period = 1799999979520");
router->master_state = BLRM_HBPERIOD;
router->master->func.write(router->master, buf);
break;
case BLRM_HBPERIOD:
// Response to set the heartbeat period
router->saved_master.heartbeat = buf;
gwbuf_consume(buf, GWBUF_LENGTH(buf));
buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum");
router->master_state = BLRM_CHKSUM1;
router->master->func.write(router->master, buf);
break;
case BLRM_CHKSUM1:
// Response to set the master binlog checksum
router->saved_master.chksum1 = buf;
gwbuf_consume(buf, GWBUF_LENGTH(buf));
buf = blr_make_query("SELECT @master_binlog_checksum");
router->master_state = BLRM_CHKSUM2;
router->master->func.write(router->master, buf);
break;
case BLRM_CHKSUM2:
// Response to the master_binlog_checksum, should be stored
router->saved_master.chksum2 = buf;
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
router->master_state = BLRM_GTIDMODE;
router->master->func.write(router->master, buf);
break;
case BLRM_GTIDMODE:
// Response to the GTID_MODE, should be stored
router->saved_master.gtid_mode = buf;
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
router->master_state = BLRM_MUUID;
router->master->func.write(router->master, buf);
break;
case BLRM_MUUID:
// Response to the SERVER_UUID, should be stored
router->saved_master.uuid = buf;
sprintf(query, "SET @slave_uuid='%s'", router->uuid);
buf = blr_make_query(query);
router->master_state = BLRM_SUUID;
router->master->func.write(router->master, buf);
break;
case BLRM_SUUID:
// Response to the SET @server_uuid, should be stored
router->saved_master.setslaveuuid = buf;
gwbuf_consume(buf, GWBUF_LENGTH(buf));
buf = blr_make_query("SET NAMES latin1");
router->master_state = BLRM_LATIN1;
router->master->func.write(router->master, buf);
break;
case BLRM_LATIN1:
// Response to the SET NAMES latin1, should be stored
router->saved_master.setnames = buf;
gwbuf_consume(buf, GWBUF_LENGTH(buf));
buf = blr_make_registration(router);
router->master_state = BLRM_REGISTER;
router->master->func.write(router->master, buf);
break;
case BLRM_REGISTER:
// Request a dump of the binlog file
buf = blr_make_binlog_dump(router);
router->master_state = BLRM_BINLOGDUMP;
router->master->func.write(router->master, buf);
break;
case BLRM_BINLOGDUMP:
// Main body, we have received a binlog record from the master
blr_handle_binlog_record(router, buf);
break;
}
/*
* Check for messages queued by other threads.
*/
spinlock_acquire(&router->lock);
if ((buf = router->queue) != NULL)
{
router->queue = buf->next;
buf->next = NULL;
}
else
{
router->active_logs = 0;
}
spinlock_release(&router->lock);
} while (buf != NULL);
}
/**
* Build a MySQL query into a GWBUF that we can send to the master database
*
* @param query The text of the query to send
*/
static GWBUF *
blr_make_query(char *query)
{
GWBUF *buf;
unsigned char *data;
int len;
if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL)
return NULL;
data = GWBUF_DATA(buf);
len = strlen(query) + 1;
encode_value(&data[0], len, 24); // Payload length
data[3] = 0; // Sequence id
// Payload
data[4] = COM_QUERY; // Command
memcpy(&data[5], query, strlen(query));
return buf;
}
/**
* Build a MySQL slave registration into a GWBUF that we can send to the
* master database
*
* @param router The router instance
* @return A MySQL Replication registration message in a GWBUF structure
*/
static GWBUF *
blr_make_registration(ROUTER_INSTANCE *router)
{
GWBUF *buf;
unsigned char *data;
int len = 18;
if ((buf = gwbuf_alloc(len + 4)) == NULL)
return NULL;
data = GWBUF_DATA(buf);
encode_value(&data[0], len, 24); // Payload length
data[3] = 0; // Sequence ID
data[4] = COM_REGISTER_SLAVE; // Command
encode_value(&data[5], router->serverid, 32); // Slave Server ID
data[9] = 0; // Slave hostname length
data[10] = 0; // Slave username length
data[11] = 0; // Slave password length
encode_value(&data[12], router->service->ports->port, 16); // Slave master port
encode_value(&data[14], 0, 32); // Replication rank
encode_value(&data[18], router->masterid, 32); // Master server-id
return buf;
}
/**
* Build a Binlog dump command into a GWBUF that we can send to the
* master database
*
* @param router The router instance
* @return A MySQL Replication COM_BINLOG_DUMP message in a GWBUF structure
*/
static GWBUF *
blr_make_binlog_dump(ROUTER_INSTANCE *router)
{
GWBUF *buf;
unsigned char *data;
int len = 0x1b;
if ((buf = gwbuf_alloc(len + 4)) == NULL)
return NULL;
data = GWBUF_DATA(buf);
encode_value(&data[0], len,24); // Payload length
data[3] = 0; // Sequence ID
data[4] = COM_BINLOG_DUMP; // Command
encode_value(&data[5], router->binlog_position, 32); // binlog position
encode_value(&data[9], 0, 16); // Flags
encode_value(&data[11], router->serverid, 32); // Server-id of MaxScale
strncpy((char *)&data[15], router->binlog_name,
BINLOG_FNAMELEN); // binlog filename
return buf;
}
/**
* Encode a value into a number of bits in a MySQL packet
*
* @param data Point to location in target packet
* @param value The value to pack
* @param len Number of bits to encode value into
*/
static void
encode_value(unsigned char *data, unsigned int value, int len)
{
while (len > 0)
{
*data++ = value & 0xff;
value >>= 8;
len -= 8;
}
}
/**
* blr_handle_binlog_record - we have received binlog records from
* the master and we must now work out what to do with them.
*
* @param router The router instance
* @param pkt The binlog records
*/
static void
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
uint8_t *msg, *ptr, *pdata;
REP_HEADER hdr;
int len, reslen;
/* Prepend any residual buffer to the buffer chain we have been called with. */
if (router->residual)
{
pkt = gwbuf_append(router->residual, pkt);
router->residual = NULL;
}
while (pkt && gwbuf_length(pkt) > 24)
{
reslen = GWBUF_LENGTH(pkt);
pdata = GWBUF_DATA(pkt);
if (reslen < 3) // Payload length straddles buffers
{
/* Get the length of the packet from the residual and new packet */
if (reslen >= 3)
{
len = extract_field(pdata, 24);
}
else if (reslen == 2)
{
len = extract_field(pdata, 16);
len |= (extract_field(GWBUF_DATA(pkt->next), 8) << 16);
}
else if (reslen == 1)
{
len = extract_field(pdata, 8);
len |= (extract_field(GWBUF_DATA(pkt->next), 16) << 8);
}
len += 4; // Allow space for the header
}
else
{
len = extract_field(pdata, 24) + 4;
}
if (reslen < len && pkt->next) // Message straddles buffers
{
/* Allocate a contiguous buffer for the binlog message */
msg = malloc(len);
memcpy(msg, pdata, reslen);
memcpy(&msg[reslen], GWBUF_DATA(pkt->next), len - reslen);
ptr = msg;
}
else if (reslen < len) // Message straddles buffers
{
break;
}
else
{
ptr = pdata;
msg = NULL;
}
blr_extract_header(ptr, &hdr);
if (hdr.ok == 0)
{
router->stats.n_binlogs++;
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d\n", hdr.event_type, hdr.flags, hdr.event_size);
#endif
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
// Fake format description message
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
/*
* We need to save this to replay to new
* slaves that attach later.
*/
if (router->saved_master.fde_event)
free(router->saved_master.fde_event);
router->saved_master.fde_len = hdr.event_size;
router->saved_master.fde_event = malloc(hdr.event_size);
if (router->saved_master.fde_event)
memcpy(router->saved_master.fde_event,
ptr + 5, hdr.event_size);
}
}
else
{
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
blr_write_binlog_record(router, &hdr, ptr);
blr_distribute_binlog_record(router, &hdr, ptr);
}
}
}
else
{
printf("Binlog router error: %s\n", &ptr[7]);
router->stats.n_binlog_errors++;
}
if (msg)
{
free(msg);
pkt = gwbuf_consume(pkt, reslen);
pkt = gwbuf_consume(pkt, len - reslen);
}
else
{
pkt = gwbuf_consume(pkt, 4 + hdr.payload_len);
}
}
/*
* Check if we have a residual, part binlog message to deal with.
* Just simply store the GWBUF for next time
*/
if (pkt)
{
router->residual = pkt;
}
blr_file_flush(router);
}
/**
* Populate a header structure for a replication message from a GWBUF structure.
*
* @param pkt The incoming packet in a GWBUF chain
* @param hdr The packet header to populate
*/
static void
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
{
hdr->payload_len = extract_field(ptr, 24);
hdr->seqno = ptr[3];
hdr->ok = ptr[4];
hdr->timestamp = extract_field(&ptr[5], 32);
hdr->event_type = ptr[9];
hdr->serverid = extract_field(&ptr[10], 32);
hdr->event_size = extract_field(&ptr[14], 32);
hdr->next_pos = extract_field(&ptr[18], 32);
hdr->flags = extract_field(&ptr[22], 16);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
*/
static uint32_t
extract_field(uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
while (bits > 0)
{
rval |= (*src++) << shift;
shift += 8;
bits -= 8;
}
return rval;
}
/**
* Process a binlog rotate event.
*
* @param router The instance of the router
* @param ptr The packet containing the rotate event
* @param hdr The replication message header
*/
static void
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
{
int len;
uint64_t pos;
char file[BINLOG_FNAMELEN+1];
ptr += 4; // Skip packet header
ptr++; // Skip the OK
ptr += 19; // Skip event header
len = hdr->event_size - 19; // Event size minus header
pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32);
memcpy(file, ptr + 8, BINLOG_FNAMELEN);
file[BINLOG_FNAMELEN] = 0;
#ifdef VEBOSE_ROTATE
printf("binlog rotate: ");
while (len--)
printf("0x%02x ", *ptr++);
printf("\n");
printf("New file: %s @ %ld\n", file, pos);
#endif
if (strncmp(router->binlog_name, file, BINLOG_FNAMELEN) != 0)
{
router->stats.n_rotates++;
blr_file_rotate(router, file, pos);
}
}
/**
* Create the auth data needed to be able to call dcb_connect.
*
* This doesn't really belong here and should be moved at some stage.
*/
static void *
CreateMySQLAuthData(char *username, char *password, char *database)
{
MYSQL_session *auth_info;
if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL)
return NULL;
strcpy(auth_info->user, username);
strcpy(auth_info->db, database);
gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1);
return auth_info;
}
/**
* Distribute the binlog record we have just received to all the registered slaves.
*
* @param router The router instance
* @param hdr The replication event header
* @param ptr The raw replication eent data
*/
static void
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
GWBUF *pkt;
uint8_t *buf;
ROUTER_SLAVE *slave;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
if (slave->binlog_pos == hdr->next_pos - hdr->event_size)
{
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24);
buf += 3;
*buf++ = slave->seqno++;
*buf++ = 0; // OK
memcpy(buf, ptr, hdr->event_size);
slave->dcb->func.write(slave->dcb, pkt);
slave->binlog_pos = hdr->next_pos;
}
slave = slave->next;
}
spinlock_release(&router->lock);
}

View File

@ -0,0 +1,767 @@
/*
* This file is distributed as part of MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2014
*/
/**
* @file blr_slave.c - contains code for the router to slave communication
*
* The binlog router is designed to be used in replication environments to
* increase the replication fanout of a master server. It provides a transparant
* mechanism to read the binlog entries for multiple slaves while requiring
* only a single connection to the actual master to support the slaves.
*
* The current prototype implement is designed to support MySQL 5.6 and has
* a number of limitations. This prototype is merely a proof of concept and
* should not be considered production ready.
*
* @verbatim
* Revision History
*
* Date Who Description
* 14/04/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <service.h>
#include <server.h>
#include <router.h>
#include <atomic.h>
#include <spinlock.h>
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
static uint32_t extract_field(uint8_t *src, int bits);
static void encode_value(unsigned char *data, unsigned int value, int len);
static int blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_replay(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *master);
static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *msg);
static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
extern int lm_enabled_logfiles_bitmask;
/**
* Process a request packet from the slave server.
*
* The router can handle a limited subset of requests from the slave, these
* include a subset of general SQL queries, a slave registeration command and
* the binlog dump command.
*
* The strategy for responding to these commands is to use caches responses
* for the the same commands that have previously been made to the real master
* if this is possible, if it is not then the router itself will synthesize a
* response.
*
* @param router The router instance this defines the master for this replication chain
* @param slave The slave specific data
* @param queue The incoming request packet
*/
int
blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
if (slave->state < 0 || slave->state > BLRS_MAXSTATE)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n",
slave->state)));
gwbuf_consume(queue, gwbuf_length(queue));
return 0;
}
atomic_add(&slave->stats.n_requests, 1);
switch (MYSQL_COMMAND(queue))
{
case COM_QUERY:
return blr_slave_query(router, slave, queue);
break;
case COM_REGISTER_SLAVE:
return blr_slave_register(router, slave, queue);
break;
case COM_BINLOG_DUMP:
return blr_slave_binlog_dump(router, slave, queue);
break;
default:
break;
}
return 0;
}
/**
* Handle a query from the slave. This is expected to be one of the "standard"
* queries we expect as part of the registraton process. Most of these can
* be dealt with by replying the stored responses we got from the master
* when MaxScale registered as a slave. The exception to the rule is the
* request to obtain the current timestamp value of the server.
*
* Three select statements are currently supported:
* SELECT UNIX_TIMESTAMP();
* SELECT @master_binlog_checksum
* SELECT @@GLOBAL.GTID_MODE
*
* Two show commands are supported:
* SHOW VARIABLES LIKE 'SERVER_ID'
* SHOW VARIABLES LIKE 'SERVER_UUID'
*
* Four set commands are supported:
* SET @master_binlog_checksum = @@global.binlog_checksum
* SET @master_heartbeat_period=...
* SET @slave_slave_uuid=...
* SET NAMES latin1
*
* @param router The router instance this defines the master for this replication chain
* @param slave The slave specific data
* @param queue The incoming request packet
* @return Non-zero if data has been sent
*/
static int
blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
char *qtext, *query_text;
char *sep = " ,=";
char *word, *brkb;
int query_len;
qtext = GWBUF_DATA(queue);
query_len = extract_field(qtext, 24) - 1;
qtext += 5; // Skip header and first byte of the payload
query_text = strndup(qtext, query_len);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, "Execute statement from the slave '%s'\n", query_text)));
/*
* Implement a very rudimental "parsing" of the query text by extarcting the
* words from the statement and matchng them against the subset of queries we
* are expecting from the slave. We already have responses to these commands,
* except for the select of UNIX_TIMESTAMP(), that we have saved from MaxScale's
* own interaction with the real master. We simply replay these saved responses
* to the slave.
*/
word = strtok_r(query_text, sep, &brkb);
if (strcasecmp(word, "SELECT") == 0)
{
word = strtok_r(NULL, sep, &brkb);
if (strcasecmp(word, "UNIX_TIMESTAMP()") == 0)
{
free(query_text);
return blr_slave_send_timestamp(router, slave);
}
else if (strcasecmp(word, "@master_binlog_checksum") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.chksum2);
}
else if (strcasecmp(word, "@@GLOBAL.GTID_MODE") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.gtid_mode);
}
}
else if (strcasecmp(word, "SHOW") == 0)
{
word = strtok_r(NULL, sep, &brkb);
if (strcasecmp(word, "VARIABLES") == 0)
{
word = strtok_r(NULL, sep, &brkb);
if (strcasecmp(word, "LIKE") == 0)
{
word = strtok_r(NULL, sep, &brkb);
if (strcasecmp(word, "'SERVER_ID'") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.server_id);
}
else if (strcasecmp(word, "'SERVER_UUID'") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.uuid);
}
}
}
}
else if (strcasecmp(query_text, "SET") == 0)
{
word = strtok_r(NULL, sep, &brkb);
if (strcasecmp(word, "@master_heartbeat_period") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.heartbeat);
}
else if (strcasecmp(word, "@master_binlog_checksum") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.chksum1);
}
else if (strcasecmp(word, "@slave_uuid") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.setslaveuuid);
}
else if (strcasecmp(word, "NAMES") == 0)
{
word = strtok_r(NULL, sep, &brkb);
if (strcasecmp(word, "latin1") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.setnames);
}
}
}
free(query_text);
query_text = strndup(qtext, query_len);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Unexpected query from slave server %s\n", query_text)));
free(query_text);
blr_slave_send_error(router, slave, "Unexpected SQL query received from slave.");
return 0;
}
/**
* Send a reply to a command we have received from the slave. The reply itself
* is merely a copy of a previous message we received from the master when we
* registered as a slave. Hence we just replay this saved reply.
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @param master The saved master response
* @return Non-zero if data was sent
*/
static int
blr_slave_replay(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *master)
{
GWBUF *clone;
if (!master)
return 0;
if ((clone = gwbuf_clone(master)) != NULL)
{
return slave->dcb->func.write(slave->dcb, clone);
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to clone server response to send to slave.\n")));
return 0;
}
}
/**
* Construct an error response
*
* @param router The router instance
* @param slave The slave server instance
* @param msg The error message to send
*/
static void
blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *msg)
{
GWBUF *pkt;
unsigned char *data;
int len;
if ((pkt = gwbuf_alloc(strlen(msg) + 13)) == NULL)
return NULL;
data = GWBUF_DATA(pkt);
len = strlen(msg) + 1;
encode_value(&data[0], len, 24); // Payload length
data[3] = 0; // Sequence id
// Payload
data[4] = 0xff; // Error indicator
data[5] = 0; // Error Code
data[6] = 0; // Error Code
strncpy(&data[7], "#00000", 6);
memcpy(&data[13], msg, strlen(msg)); // Error Message
slave->dcb->func.write(slave->dcb, pkt);
}
/*
* Some standard packets that have been captured from a network trace of server
* interactions. These packets are the schema definition sent in response to
* a SELECT UNIX_TIMESTAMP() statement and the EOF packet that marks the end
* of transmission of the result set.
*/
static uint8_t timestamp_def[] = {
0x01, 0x00, 0x00, 0x01, 0x01, 0x26, 0x00, 0x00, 0x02, 0x03, 0x64, 0x65, 0x66, 0x00, 0x00, 0x00,
0x10, 0x55, 0x4e, 0x49, 0x58, 0x5f, 0x54, 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, 0x4d, 0x50, 0x28,
0x29, 0x00, 0x0c, 0x3f, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x08, 0x81, 0x00, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x03, 0xfe, 0x00, 0x00, 0x02, 0x00
};
static uint8_t timestamp_eof[] = { 0x05, 0x00, 0x00, 0x05, 0xfe, 0x00, 0x00, 0x02, 0x00 };
/**
* Send a response to a "SELECT UNIX_TIMESTAMP()" request. This differs from the other
* requests since we do not save a copy of the original interaction with the master
* and simply replay it. We want to always send the current time. We have stored a typcial
* response, which gives us the schema information normally returned. This is sent to the
* client and then we add a dynamic part that will insert the current timestamp data.
* Finally we send a preprepaed EOF packet to end the response stream.
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @return Non-zero if data was sent
*/
static int
blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
GWBUF *pkt;
char timestamp[20];
uint8_t *ptr;
int len, ts_len;
sprintf(timestamp, "%ld", time(0));
ts_len = strlen(timestamp);
len = sizeof(timestamp_def) + sizeof(timestamp_eof) + 5 + ts_len;
if ((pkt = gwbuf_alloc(len)) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
memcpy(ptr, timestamp_def, sizeof(timestamp_def)); // Fixed preamble
ptr += sizeof(timestamp_def);
encode_value(ptr, ts_len + 1, 24); // Add length of data packet
ptr += 3;
*ptr++ = 0x04; // Sequence number in response
*ptr++ = ts_len; // Length of result string
strncpy(ptr, timestamp, ts_len); // Result string
ptr += ts_len;
memcpy(ptr, timestamp_eof, sizeof(timestamp_eof)); // EOF packet to terminate result
return slave->dcb->func.write(slave->dcb, pkt);
}
/**
* Process a slave replication registration message.
*
* We store the various bits of information the slave gives us and generate
* a reply message.
*
* @param router The router instance
* @param slave The slave server
* @param queue The BINLOG_DUMP packet
* @return Non-zero if data was sent
*/
static int
blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
GWBUF *resp;
uint8_t *ptr;
int len, slen;
ptr = GWBUF_DATA(queue);
len = extract_field(ptr, 24);
ptr += 4; // Skip length and sequence number
if (*ptr++ != COM_REGISTER_SLAVE)
return 0;
slave->serverid = extract_field(ptr, 32);
ptr += 4;
slen = *ptr++;
if (slen != 0)
{
slave->hostname = strndup(ptr, slen);
ptr += slen;
}
else
slave->hostname = NULL;
slen = *ptr++;
if (slen != 0)
{
ptr += slen;
slave->user = strndup(ptr, slen);
}
else
slave->user = NULL;
slen = *ptr++;
if (slen != 0)
{
slave->passwd = strndup(ptr, slen);
ptr += slen;
}
else
slave->passwd = NULL;
slave->port = extract_field(ptr, 16);
ptr += 2;
slave->rank = extract_field(ptr, 32);
/*
* Now construct a response
*/
if ((resp = gwbuf_alloc(11)) == NULL)
return 0;
ptr = GWBUF_DATA(resp);
encode_value(ptr, 7, 24); // Payload length
ptr += 3;
*ptr++ = 1; // Sequence number
encode_value(ptr, 0, 24);
ptr += 3;
encode_value(ptr, slave->serverid, 32);
slave->state = BLRS_REGISTERED;
return slave->dcb->func.write(slave->dcb, resp);
}
/**
* Process a COM_BINLOG_DUMP message from the slave. This is the
* final step in the process of registration. The new master, MaxScale
* must send a response packet and generate a fake BINLOG_ROTATE event
* with the binlog file requested by the slave. And then send a
* FORMAT_DESCRIPTION_EVENT that has been saved from the real master.
*
* Once send MaxScale must continue to send binlog events to the slave.
*
* @param router The router instance
* @param slave The slave server
* @param queue The BINLOG_DUMP packet
* @return The number of bytes written to the slave
*/
static int
blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
GWBUF *resp;
uint8_t *ptr;
int len, flags, serverid, rval;
REP_HEADER hdr;
uint32_t chksum;
ptr = GWBUF_DATA(queue);
len = extract_field(ptr, 24);
ptr += 4; // Skip length and sequence number
if (*ptr++ != COM_BINLOG_DUMP)
return 0;
slave->binlog_pos = extract_field(ptr, 32);
ptr += 4;
flags = extract_field(ptr, 16);
ptr += 2;
serverid = extract_field(ptr, 32);
ptr += 4;
strncpy(slave->binlogfile, ptr, BINLOG_FNAMELEN);
slave->state = BLRS_DUMPING;
slave->seqno = 1;
// Build a fake rotate event
resp = gwbuf_alloc(0x34);
hdr.payload_len = 0x30;
hdr.seqno = slave->seqno++;
hdr.ok = 0;
hdr.timestamp = 0L;
hdr.event_type = ROTATE_EVENT;
hdr.serverid = router->masterid;
hdr.event_size = 0x2f;
hdr.next_pos = slave->binlog_pos;
hdr.flags = 0;
ptr = blr_build_header(resp, &hdr);
encode_value(ptr, slave->binlog_pos, 64);
ptr += 8;
memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN);
ptr += BINLOG_FNAMELEN;
/*
* Now add the CRC to the fake binlog rotate event.
*
* The algorithm is first to compute the checksum of an empty buffer
* and then the checksum of the event portion of the message, ie we do not
* include the length, sequence number and ok byte that makes up the first
* 5 bytes of the message. We also do not include the 4 byte checksum itself.
*/
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4);
encode_value(ptr, chksum, 32);
rval = slave->dcb->func.write(slave->dcb, resp);
/* Send the FORMAT_DESCRIPTION_EVENT */
if (router->saved_master.fde_event)
{
resp = gwbuf_alloc(router->saved_master.fde_len + 5);
ptr = GWBUF_DATA(resp);
encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len);
encode_value(ptr, time(0), 32); // Overwrite timestamp
/*
* Since we have changed the timestamp we must recalculate the CRC
*
* Position ptr to the start of the event header,
* calculate a new checksum
* and write it into the header
*/
ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4);
encode_value(ptr, chksum, 32);
rval = slave->dcb->func.write(slave->dcb, resp);
}
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave);
rval = blr_slave_catchup(router, slave);
return rval;
}
/**
* Extract a numeric field from a packet of the specified number of bits,
* the number of bits must be a multiple of 8.
*
* @param src The raw packet source
* @param bits The number of bits to extract (multiple of 8)
* @return The extracted value
*/
static uint32_t
extract_field(uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
while (bits > 0)
{
rval |= (*src++) << shift;
shift += 8;
bits -= 8;
}
return rval;
}
/**
* Encode a value into a number of bits in a MySQL packet
*
* @param data Pointer to location in target packet
* @param value The value to encode into the buffer
* @param len Number of bits to encode value into
*/
static void
encode_value(unsigned char *data, unsigned int value, int len)
{
while (len > 0)
{
*data++ = value & 0xff;
value >>= 8;
len -= 8;
}
}
/**
* Populate a header structure for a replication message from a GWBUF structure.
*
* @param pkt The incoming packet in a GWBUF chain
* @param hdr The packet header to populate
* @return A pointer to the first byte following the event header
*/
static uint8_t *
blr_build_header(GWBUF *pkt, REP_HEADER *hdr)
{
uint8_t *ptr;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, hdr->payload_len, 24);
ptr += 3;
*ptr++ = hdr->seqno;
*ptr++ = hdr->ok;
encode_value(ptr, hdr->timestamp, 32);
ptr += 4;
*ptr++ = hdr->event_type;
encode_value(ptr, hdr->serverid, 32);
ptr += 4;
encode_value(ptr, hdr->event_size, 32);
ptr += 4;
encode_value(ptr, hdr->next_pos, 32);
ptr += 4;
encode_value(ptr, hdr->flags, 16);
ptr += 2;
return ptr;
}
/**
* We have a registered slave that is behind the current leading edge of the
* binlog. We must replay the log entries to bring this node up to speed.
*
* There may be a large numebr of records to send to the slave, the process
* is triggered by the slave COM_BINLOG_DUMP message and all the events must
* be sent without receiving any new event. This measn there is no trigger into
* MaxScale other than this initial message. However, if we simply send all the
* events we end up with an extremely long write queue on the DCB and risk running
* the server out of resources.
*
* To resolve this the concept of high and low water marks within the DCB has been
* added, with the ability for the DCB code to call user defined callbacks when the
* write queue is completely drained, when it crosses above the high water mark and
* when it crosses below the low water mark.
*
* The blr_slave_catchup routine will send binlog events to the slave until the high
* water mark is reached, at which point it will return. Later, when a low water mark
* callback is generated by the code that drains the DCB of data the blr_slave_catchup
* routine will again be called to write more events. The process is repeated until
* the slave has caught up with the master.
*
* Note: an additional check that the DCB is still above the low water mark is done
* prior to the return from this function to allow for any delays due to the call to
* the close system call, since this may cause thread rescheduling.
*
* @param router The binlog router
* @param slave The slave that is behind
* @return The number of bytes written
*/
static int
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
GWBUF *head, *record;
REP_HEADER hdr;
int written, fd, rval = 0, burst = 0;
uint8_t *ptr;
struct timespec req;
/*
* We have a slightly complex syncronisation mechansim here,
* we need to make sure that we do not have multiple threads
* running the catchup loop, but we need to be very careful
* that we do not loose a call that is coming via a callback
* call as this will stall the binlog catchup process.
*
* We don't want to simply use a traditional mutex here for
* the loop, since this would block a MaxScale thread for
* an unacceptable length of time.
*
* We have two status bits, the CS_READING that says we are
* in the outer loop and the CS_INNERLOOP, to say we are in
* the inner loop.
*
* If just CS_READING is set the thread other may be about to
* enter the inner loop or may be about to exit the function
* completely. therefore we have to wait to see if CS_READING
* is cleared or CS_INNERLOOP is set.
*
* If CS_READING gets cleared then this thread should proceed
* into the loop.
*
* If CS_INNERLOOP get's set then this thread does not need to
* proceed.
*
* If CS_READING is not set then this thread simply enters the
* loop.
*/
req.tv_sec = 0;
req.tv_nsec = 1000;
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_READING)
{
while ((slave->cstate & (CS_READING|CS_INNERLOOP)) == CS_READING)
{
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
if (slave->cstate & CS_READING)
{
spinlock_release(&slave->catch_lock);
return 1; // We cheat here and return 1 because otherwise
// an error would be sent and we do not want that
}
}
slave->cstate |= CS_READING;
spinlock_release(&slave->catch_lock);
do {
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
return 0;
}
atomic_add(&slave->stats.n_bursts, 1);
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
while ((!DCB_ABOVE_HIGH_WATER(slave->dcb)) &&
(record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL)
{
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
written = slave->dcb->func.write(slave->dcb, head);
if (written)
slave->binlog_pos = hdr.next_pos;
rval = written;
atomic_add(&slave->stats.n_events, 1);
burst++;
}
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
close(fd);
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
if (record)
atomic_add(&slave->stats.n_flows, 1);
return rval;
}
/**
* The DCB callback used by the slave to obtain DCB_REASON_LOW_WATER callbacks
* when the server sends all the the queue data for a DCB. This is the mechanism
* that is used to implement the flow control mechanism for the sending of
* large quantities of binlog records during the catchup process.
*
* @param dcb The DCB of the slave connection
* @param reason The reason the callback was called
* @param data The user data, in this case the server structure
*/
static int
blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
{
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data;
ROUTER_INSTANCE *router = slave->router;
if (reason != DCB_REASON_LOW_WATER)
return 0;
if (slave->state == BLRS_DUMPING)
{
atomic_add(&slave->stats.n_events, 1);
blr_slave_catchup(router, slave);
}
}