diff --git a/server/core/dbusers.c b/server/core/dbusers.c index e5289a3a0..da079b3f7 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -183,6 +183,8 @@ getUsers(SERVICE *service, struct users *users) } serviceGetUser(service, &service_user, &service_passwd); + if (service_user == NULL || service_passwd == NULL) + return -1; /** multi-thread environment requires that thread init succeeds. */ if (mysql_thread_init()) { diff --git a/server/core/dcb.c b/server/core/dcb.c index 7c648ccd6..5ac95e0db 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -47,6 +47,7 @@ * error and 0 bytes to read. * This fixes a bug with many reads from * backend + * 07/05/2014 Mark Riddoch Addition of callback mechanism * * @endverbatim */ @@ -80,6 +81,7 @@ static bool dcb_set_state_nomutex( DCB* dcb, const dcb_state_t new_state, dcb_state_t* old_state); +static void dcb_call_callback(DCB *dcb, DCB_REASON reason); DCB* dcb_get_zombies(void) { @@ -94,8 +96,8 @@ DCB* dcb_get_zombies(void) * * @return A newly allocated DCB or NULL if non could be allocated. */ -DCB * dcb_alloc( - dcb_role_t role) +DCB * +dcb_alloc(dcb_role_t role) { DCB *rval; @@ -118,11 +120,16 @@ DCB *rval; spinlock_init(&rval->writeqlock); spinlock_init(&rval->delayqlock); spinlock_init(&rval->authlock); + spinlock_init(&rval->cb_lock); rval->fd = -1; memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics rval->state = DCB_STATE_ALLOC; bitmask_init(&rval->memdata.bitmask); + rval->writeqlen = 0; + rval->high_water = 0; + rval->low_water = 0; rval->next = NULL; + rval->callbacks = NULL; spinlock_acquire(&dcbspin); if (allDCBs == NULL) @@ -676,9 +683,11 @@ return_n: int dcb_write(DCB *dcb, GWBUF *queue) { - int w; - int saved_errno = 0; +int w, qlen; +int saved_errno = 0; +int below_water; + below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; ss_dassert(queue != NULL); if (queue == NULL || @@ -712,6 +721,8 @@ dcb_write(DCB *dcb, GWBUF *queue) * the routine that drains the queue data, so we should * not have a race condition on the event. */ + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); dcb->writeq = gwbuf_append(dcb->writeq, queue); dcb->stats.n_buffered++; LOGIF(LD, (skygw_log_write( @@ -824,6 +835,8 @@ dcb_write(DCB *dcb, GWBUF *queue) * for suspended write. */ dcb->writeq = queue; + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); if (queue != NULL) { @@ -847,6 +860,13 @@ dcb_write(DCB *dcb, GWBUF *queue) return 0; } spinlock_release(&dcb->writeqlock); + + if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water) + { + atomic_add(&dcb->stats.n_high_water, 1); + dcb_call_callback(dcb, DCB_REASON_HIGH_WATER); + } + return 1; } @@ -861,9 +881,12 @@ dcb_write(DCB *dcb, GWBUF *queue) int dcb_drain_writeq(DCB *dcb) { -int n = 0; -int w; -int saved_errno = 0; +int n = 0; +int w; +int saved_errno = 0; +int above_water; + + above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; spinlock_acquire(&dcb->writeqlock); if (dcb->writeq) @@ -924,6 +947,17 @@ int saved_errno = 0; } } spinlock_release(&dcb->writeqlock); + atomic_add(&dcb->writeqlen, -n); + /* The write queue has drained, potentially need to call a callback function */ + if (dcb->writeq == NULL) + dcb_call_callback(dcb, DCB_REASON_DRAINED); + if (above_water && dcb->writeqlen < dcb->low_water) + { + atomic_add(&dcb->stats.n_low_water, 1); + dcb_call_callback(dcb, DCB_REASON_LOW_WATER); + } + + return n; } @@ -966,6 +1000,8 @@ dcb_close(DCB *dcb) ss_dassert(dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); + dcb_call_callback(dcb, DCB_REASON_CLOSE); + if (rc == 0) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -1008,6 +1044,8 @@ printDCB(DCB *dcb) printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes); printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + printf("\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + printf("\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); } /** @@ -1054,6 +1092,8 @@ DCB *dcb; dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); dcb = dcb->next; } spinlock_release(&dcbspin); @@ -1079,6 +1119,8 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); } /** @@ -1415,4 +1457,135 @@ int gw_write( return w; } +/** + * Add a callback + * + * Duplicate registrations are not allowed, therefore an error will be returned if + * the specific function, reason and userdata triple are already registered. + * An error will also be returned if the is insufficient memeory available to + * create the registration. + * + * @param dcb The DCB to add the callback to + * @param reason The callback reason + * @param cb The callback function to call + * @param userdata User data to send in the call + * @return Non-zero (true) if the callback was added + */ +int +dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) +{ +DCB_CALLBACK *cb, *ptr; +int rval = 1; + if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL) + { + return 0; + } + ptr->reason = reason; + ptr->cb = callback; + ptr->userdata = userdata; + ptr->next = NULL; + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + if (cb == NULL) + { + dcb->callbacks = ptr; + spinlock_release(&dcb->cb_lock); + } + else + { + while (cb) + { + if (cb->reason == reason && cb->cb == callback && + cb->userdata == userdata) + { + free(ptr); + spinlock_release(&dcb->cb_lock); + return 0; + } + if (cb->next == NULL) + cb->next = ptr; + cb = cb->next; + } + spinlock_release(&dcb->cb_lock); + } + return rval; +} + +/** + * Remove a callback from the callback list for the DCB + * + * Searches down the linked list to find he callback with a matching reason, function + * and userdata. + * + * @param dcb The DCB to add the callback to + * @param reason The callback reason + * @param cb The callback function to call + * @param userdata User data to send in the call + * @return Non-zero (true) if the callback was removed + */ +int +dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON), void *userdata) +{ +DCB_CALLBACK *cb, *pcb = NULL; +int rval = 0; + + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + if (cb == NULL) + { + rval = 0; + } + else + { + while (cb) + { + if (cb->reason == reason && cb->cb == callback + && cb->userdata == userdata) + { + if (pcb == NULL) + pcb->next = cb->next; + else + dcb->callbacks = cb->next; + spinlock_release(&dcb->cb_lock); + free(cb); + rval = 1; + break; + } + pcb = cb; + cb = cb->next; + } + } + if (!rval) + spinlock_release(&dcb->cb_lock); + return rval; +} + +/** + * Call the set of callbacks registered for a particular reason. + * + * @param dcb The DCB to call the callbacks regarding + * @param reason The reason that has triggered the call + */ +static void +dcb_call_callback(DCB *dcb, DCB_REASON reason) +{ +DCB_CALLBACK *cb, *nextcb; + + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + while (cb) + { + if (cb->reason == reason) + { + nextcb = cb->next; + spinlock_release(&dcb->cb_lock); + cb->cb(dcb, reason, cb->userdata); + spinlock_acquire(&dcb->cb_lock); + cb = nextcb; + } + else + cb = cb->next; + } + spinlock_release(&dcb->cb_lock); +} diff --git a/server/core/session.c b/server/core/session.c index 062c63f27..1e0d49771 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -114,7 +114,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) /* * Only create a router session if we are not the listening - * DCB. Creating a router session may create a connection to a + * DCB or an internal DCB. Creating a router session may create a connection to a * backend server, depending upon the router module implementation * and should be avoided for the listener session * @@ -122,7 +122,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) * session, therefore it is important that the session lock is * relinquished beforethe router call. */ - if (client_dcb->state != DCB_STATE_LISTENING) + if (client_dcb->state != DCB_STATE_LISTENING && client_dcb->dcb_role != DCB_ROLE_INTERNAL) { session->router_session = service->router->newSession(service->router_instance, diff --git a/server/include/dcb.h b/server/include/dcb.h index 9ca60e004..625ae6026 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -48,6 +48,8 @@ struct service; * 15/07/2013 Massimiliano Pinto Added session entry point * 16/07/2013 Massimiliano Pinto Added command type for dcb * 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb + * 07/05/2014 Mark Riddoch Addition of callback mechanism + * 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks * * @endverbatim */ @@ -99,6 +101,8 @@ typedef struct dcbstats { int n_writes; /*< Number of writes on this descriptor */ int n_accepts; /*< Number of accepts on this descriptor */ int n_buffered; /*< Number of buffered writes */ + int n_high_water; /*< Number of crosses of high water mark */ + int n_low_water; /*< Number of crosses of low water mark */ } DCBSTATS; /** @@ -137,10 +141,35 @@ typedef enum { } dcb_state_t; typedef enum { - DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */ - DCB_ROLE_REQUEST_HANDLER /*< Serves dedicated client */ + DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */ + DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */ + DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */ } dcb_role_t; +/** + * Callback reasons for the DCB callback mechanism. + */ +typedef enum { + DCB_REASON_CLOSE, /*< The DCB is closing */ + DCB_REASON_DRAINED, /*< The write delay queue has drained */ + DCB_REASON_HIGH_WATER, /*< Cross high water mark */ + DCB_REASON_LOW_WATER, /*< Cross low water mark */ + DCB_REASON_ERROR, /*< An error was flagged on the connection */ + DCB_REASON_HUP /*< A hangup was detected */ +} DCB_REASON; + +/** + * Callback structure - used to track callbacks registered on a DCB + */ +typedef struct dcb_callback { + DCB_REASON reason; /*< The reason for the callback */ + int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata); + void *userdata; /*< User data to be sent in the callback */ + struct dcb_callback + *next; /*< Next callback for this DCB */ +} DCB_CALLBACK; + + /** * Descriptor Control Block * @@ -172,6 +201,7 @@ typedef struct dcb { struct session *session; /**< The owning session */ GWPROTOCOL func; /**< The functions for this descriptor */ + unsigned int writeqlen; /**< Current number of byes in the write queue */ SPINLOCK writeqlock; /**< Write Queue spinlock */ GWBUF *writeq; /**< Write Data Queue */ SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ @@ -185,6 +215,11 @@ typedef struct dcb { void *data; /**< Specific client data */ DCBMM memdata; /**< The data related to DCB memory management */ int command; /**< Specific client command type */ + SPINLOCK cb_lock; /**< The lock for the callbacks linked list */ + DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ + + unsigned int high_water; /**< High water mark */ + unsigned int low_water; /**< Low water mark */ #if defined(SS_DEBUG) skygw_chk_t dcb_chk_tail; #endif @@ -203,6 +238,11 @@ int fail_accept_errno; #define DCB_SESSION(x) (x)->session #define DCB_PROTOCOL(x, type) (type *)((x)->protocol) #define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE) +#define DCB_WRITEQLEN(x) (x)->writeqlen +#define DCB_SET_LOW_WATER(x, lo) (x)->low_water = (lo); +#define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi); +#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) +#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) DCB *dcb_get_zombies(void); int gw_write( @@ -229,6 +269,10 @@ void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */ int dcb_isclient(DCB *); /* the DCB is the client of the session */ void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ void dcb_add_to_zombieslist(DCB* dcb); +int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), + void *); +int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON), + void *); bool dcb_set_state( DCB* dcb, diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h new file mode 100644 index 000000000..47dc77fad --- /dev/null +++ b/server/modules/include/blr.h @@ -0,0 +1,325 @@ +#ifndef _BLR_H +#define _BLR_H +/* + * This file is distributed as part of MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file blr.h - The binlog router header file + * + * @verbatim + * Revision History + * + * Date Who Description + * 02/04/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include + +#define BINLOG_FNAMELEN 16 +#define BLR_PROTOCOL "MySQLBackend" +#define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e } +#define BINLOG_NAMEFMT "mysql-bin.%06d" + +/** + * High and Low water marks for the slave dcb. These values can be overriden + * by the router options highwater and lowwater. + */ +#define DEF_LOW_WATER 20000 +#define DEF_HIGH_WATER 100000 + +/** + * Some useful macros for examining the MySQL Response packets + */ +#define MYSQL_RESPONSE_OK(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4) == 0x00) +#define MYSQL_RESPONSE_EOF(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4) == 0xfe) +#define MYSQL_RESPONSE_ERR(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4) == 0xff) +#define MYSQL_ERROR_CODE(buf) (*((uint8_t *)GWBUF_DATA(buf) + 5)) +#define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 6) +#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4)) + +/** + * Slave statistics + */ +typedef struct { + unsigned int n_events; /*< Number of events sent */ + unsigned int n_bursts; /*< Number of bursts sent */ + unsigned int n_requests; /*< Number of requests received */ + unsigned int n_flows; /*< Number of flow control restarts */ +} SLAVE_STATS; + +/** + * The client session structure used within this router. This represents + * the slaves that are replicating binlogs from MaxScale. + */ +typedef struct router_slave { +#if defined(SS_DEBUG) + skygw_chk_t rses_chk_top; +#endif + DCB *dcb; /*< The slave server DCB */ + int state; /*< The state of this slave */ + int binlog_pos; /*< Binlog position for this slave */ + char binlogfile[BINLOG_FNAMELEN+1]; + /*< Current binlog file for this slave */ + int serverid; /*< Server-id of the slave */ + char *hostname; /*< Hostname of the slave, if known */ + char *user; /*< Username if given */ + char *passwd; /*< Password if given */ + short port; /*< MySQL port */ + uint32_t rank; /*< Replication rank */ + uint8_t seqno; /*< Replication dump sequence no */ + SPINLOCK catch_lock; /*< Event catchup lock */ + unsigned int cstate; /*< Catch up state */ + SPINLOCK rses_lock; /*< Protects rses_deleted */ + struct router_instance + *router; /*< Pointer to the owning router */ + struct router_slave *next; + SLAVE_STATS stats; /*< Slave statistics */ +#if defined(SS_DEBUG) + skygw_chk_t rses_chk_tail; +#endif +} ROUTER_SLAVE; + + +/** + * The statistics for this router instance + */ +typedef struct { + unsigned int n_slaves; /*< Number slave sessions created */ + unsigned int n_reads; /*< Number of record reads */ + uint64_t n_binlogs; /*< Number of binlog records from master */ + uint64_t n_binlog_errors;/*< Number of binlog records from master */ + uint64_t n_rotates; /*< Number of binlog rotate events */ + uint64_t n_cachehits; /*< Number of hits on the binlog cache */ + uint64_t n_cachemisses; /*< Number of misses on the binlog cache */ + unsigned int n_registered; /*< Number of registered slaves */ + uint64_t n_fakeevents; /*< Fake events not written to disk */ + uint64_t events[0x24]; /*< Per event counters */ +} ROUTER_STATS; + +/** + * Saved responses from the master that will be forwarded to slaves + */ +typedef struct { + GWBUF *server_id; /*< Master server id */ + GWBUF *heartbeat; /*< Heartbeat period */ + GWBUF *chksum1; /*< Binlog checksum 1st response */ + GWBUF *chksum2; /*< Binlog checksum 2nd response */ + GWBUF *gtid_mode; /*< GTID Mode response */ + GWBUF *uuid; /*< Master UUID */ + GWBUF *setslaveuuid; /*< Set Slave UUID */ + GWBUF *setnames; /*< Set NAMES latin1 */ + uint8_t *fde_event; /*< Format Description Event */ + int fde_len; /*< Length of fde_event */ +} MASTER_RESPONSES; + +/** + * The binlog record structure. This contains the actual packet received from the + * master, the binlog position of the data in the packet, a point to the data and + * the length of the binlog record. + * + * This allows requests for binlog records in the cache to be serviced by simply + * sending the exact same packet as was received by MaxScale from the master. + * Items are written to the backing file as soon as they are received. The binlog + * cache is flushed of old records periodically, releasing the GWBUF's back to the + * free memory pool. + */ +typedef struct { + unsigned long position; /*< binlog record position for this cache entry */ + GWBUF *pkt; /*< The packet received from the master */ + unsigned char *data; /*< Pointer to the data within the packet */ + unsigned int record_len; /*< Binlog record length */ +} BLCACHE_RECORD; + +/** + * The binlog cache. A cache exists for each file that hold cached bin log records. + * Typically the router will hold two binlog caches, one for the current file and one + * for the previous file. + */ +typedef struct { + char filename[BINLOG_FNAMELEN+1]; + BLCACHE_RECORD *first; + BLCACHE_RECORD *current; + int cnt; +} BLCACHE; + + +/** + * The per instance data for the router. + */ +typedef struct router_instance { + SERVICE *service; /*< Pointer to the service using this router */ + ROUTER_SLAVE *slaves; /*< Link list of all the slave connections */ + SPINLOCK lock; /*< Spinlock for the instance data */ + char *uuid; /*< UUID for the router to use w/master */ + int masterid; /*< Server ID of the master */ + int serverid; /*< Server ID to use with master */ + char *user; /*< User name to use with master */ + char *password; /*< Password to use with master */ + DCB *master; /*< DCB for master connection */ + SESSION *session; /*< Fake session for master connection */ + unsigned int master_state; /*< State of the master FSM */ + GWBUF *residual; /*< Any residual binlog event */ + MASTER_RESPONSES saved_master; /*< Saved master responses */ + char binlog_name[BINLOG_FNAMELEN+1]; + /*< Name of the current binlog file */ + uint64_t binlog_position; + /*< Current binlog position */ + int binlog_fd; /*< File descriptor of the binlog + * file being written + */ + unsigned int low_water; /*< Low water mark for client DCB */ + unsigned int high_water; /*< High water mark for client DCB */ + BLCACHE *cache[2]; + ROUTER_STATS stats; /*< Statistics for this router */ + int active_logs; + struct router_instance + *next; +} ROUTER_INSTANCE; + +/** + * Packet header for replication messages + */ +typedef struct rep_header { + int payload_len; /*< Payload length (24 bits) */ + uint8_t seqno; /*< Response sequence number */ + uint8_t ok; /*< OK Byte from packet */ + uint32_t timestamp; /*< Timestamp - start of binlog record */ + uint8_t event_type; /*< Binlog event type */ + uint32_t serverid; /*< Server id of master */ + uint32_t event_size; /*< Size of header, post-header and body */ + uint32_t next_pos; /*< Position of next event */ + uint16_t flags; /*< Event flags */ +} REP_HEADER; + +/** + * State machine for the master to MaxScale replication + */ +#define BLRM_UNCONNECTED 0x0000 +#define BLRM_AUTHENTICATED 0x0001 +#define BLRM_TIMESTAMP 0x0002 +#define BLRM_SERVERID 0x0003 +#define BLRM_HBPERIOD 0x0004 +#define BLRM_CHKSUM1 0x0005 +#define BLRM_CHKSUM2 0x0006 +#define BLRM_GTIDMODE 0x0007 +#define BLRM_MUUID 0x0008 +#define BLRM_SUUID 0x0009 +#define BLRM_LATIN1 0x000A +#define BLRM_REGISTER 0x000B +#define BLRM_BINLOGDUMP 0x000C + +#define BLRM_MAXSTATE 0x000C + +static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrieval", + "Server ID retrieval", "HeartBeat Period setup", "binlog checksum config", + "binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval", + "Set Slave UUID", "Set Names", "Register slave", "Binlog Dump" }; + +#define BLRS_CREATED 0x0000 +#define BLRS_UNREGISTERED 0x0001 +#define BLRS_REGISTERED 0x0002 +#define BLRS_DUMPING 0x0003 + +#define BLRS_MAXSTATE 0x0003 + +static char *blrs_states[] = { "Created", "Unregistered", "Registered", + "Sending binlogs" }; + +/** + * Slave catch-up status + */ +#define CS_READING 0x0001 +#define CS_INNERLOOP 0x0002 + +/** + * MySQL protocol OpCodes needed for replication + */ +#define COM_QUERY 0x03 +#define COM_REGISTER_SLAVE 0x15 +#define COM_BINLOG_DUMP 0x12 + +/** + * Binlog event types + */ +#define START_EVENT_V3 0x01 +#define QUERY_EVENT 0x02 +#define STOP_EVENT 0x03 +#define ROTATE_EVENT 0x04 +#define INTVAR_EVENT 0x05 +#define LOAD_EVENT 0x06 +#define SLAVE_EVENT 0x07 +#define CREATE_FILE_EVENT 0x08 +#define APPEND_BLOCK_EVENT 0x09 +#define EXEC_LOAD_EVENT 0x0A +#define DELETE_FILE_EVENT 0x0B +#define NEW_LOAD_EVENT 0x0C +#define RAND_EVENT 0x0D +#define USER_VAR_EVENT 0x0E +#define FORMAT_DESCRIPTION_EVENT 0x0F +#define XID_EVENT 0x10 +#define BEGIN_LOAD_QUERY_EVENT 0x11 +#define EXECUTE_LOAD_QUERY_EVENT 0x12 +#define TABLE_MAP_EVENT 0x13 +#define WRITE_ROWS_EVENTv0 0x14 +#define UPDATE_ROWS_EVENTv0 0x15 +#define DELETE_ROWS_EVENTv0 0x16 +#define WRITE_ROWS_EVENTv1 0x17 +#define UPDATE_ROWS_EVENTv1 0x18 +#define DELETE_ROWS_EVENTv1 0x19 +#define INCIDENT_EVENT 0x1A +#define HEARTBEAT_EVENT 0x1B +#define IGNORABLE_EVENT 0x1C +#define ROWS_QUERY_EVENT 0x1D +#define WRITE_ROWS_EVENTv2 0x1E +#define UPDATE_ROWS_EVENTv2 0x1F +#define DELETE_ROWS_EVENTv2 0x20 +#define GTID_EVENT 0x21 +#define ANONYMOUS_GTID_EVENT 0x22 +#define PREVIOUS_GTIDS_EVENT 0x23 + +/** + * Binlog event flags + */ +#define LOG_EVENT_BINLOG_IN_USE_F 0x0001 +#define LOG_EVENT_FORCED_ROTATE_F 0x0002 +#define LOG_EVENT_THREAD_SPECIFIC_F 0x0004 +#define LOG_EVENT_SUPPRESS_USE_F 0x0008 +#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x0010 +#define LOG_EVENT_ARTIFICIAL_F 0x0020 +#define LOG_EVENT_RELAY_LOG_F 0x0040 +#define LOG_EVENT_IGNORABLE_F 0x0080 +#define LOG_EVENT_NO_FILTER_F 0x0100 +#define LOG_EVENT_MTS_ISOLATE_F 0x0200 + +/* + * Externals within the router + */ +extern void blr_start_master(ROUTER_INSTANCE *); +extern void blr_master_response(ROUTER_INSTANCE *, GWBUF *); + +extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *); + +extern void blr_init_cache(ROUTER_INSTANCE *); + +extern void blr_file_init(ROUTER_INSTANCE *); +extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); +extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); +extern GWBUF *blr_read_binlog(int, unsigned int, REP_HEADER *); +#endif diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index acc2dacdd..f1774485d 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -414,21 +414,23 @@ static int gw_read_backend_event(DCB *dcb) { if (dcb->session->client != NULL) { client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol); - } - - if (client_protocol != NULL) { - CHK_PROTOCOL(client_protocol); + if (client_protocol != NULL) { + CHK_PROTOCOL(client_protocol); - if (client_protocol->state == MYSQL_IDLE) - { - router->clientReply(router_instance, + if (client_protocol->state == MYSQL_IDLE) + { + router->clientReply(router_instance, rsession, writebuf, dcb); - rc = 1; - } - goto return_rc; - } + rc = 1; + } + goto return_rc; + } else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) { + router->clientReply(router_instance, rsession, writebuf, dcb); + rc = 1; + } + } } return_rc: diff --git a/server/modules/routing/Makefile b/server/modules/routing/Makefile index c07b7b438..8bbc45c3b 100644 --- a/server/modules/routing/Makefile +++ b/server/modules/routing/Makefile @@ -49,6 +49,8 @@ MODULES= libdebugcli.so libreadconnroute.so libtestroute.so all: $(MODULES) + (cd readwritesplit; make ) + (cd binlog; make ) libtestroute.so: $(TESTOBJ) $(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@ @@ -68,18 +70,22 @@ libreadwritesplit.so: clean: rm -f $(OBJ) $(MODULES) (cd readwritesplit; touch depend.mk; make clean) + (cd binlog; touch depend.mk; make clean) tags: ctags $(SRCS) $(HDRS) (cd readwritesplit; make tags) + (cd binlog; make tags) depend: @rm -f depend.mk cc -M $(CFLAGS) $(SRCS) > depend.mk (cd readwritesplit; touch depend.mk ; make depend) + (cd binlog; touch depend.mk ; make depend) install: $(MODULES) install -D $(MODULES) $(DEST)/MaxScale/modules (cd readwritesplit; make DEST=$(DEST) install) + (cd binlog; make DEST=$(DEST) install) include depend.mk diff --git a/server/modules/routing/binlog/Makefile b/server/modules/routing/binlog/Makefile new file mode 100644 index 000000000..6e9282ea1 --- /dev/null +++ b/server/modules/routing/binlog/Makefile @@ -0,0 +1,65 @@ +# This file is distributed as part of the SkySQL Gateway. It is free +# software: you can redistribute it and/or modify it under the terms of the +# GNU General Public License as published by the Free Software Foundation, +# version 2. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., 51 +# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Copyright SkySQL Ab 2013 +# +# Revision History +# Date Who Description +# 2/04/14 Mark Riddoch Initial framework put in place + +include ../../../../build_gateway.inc + +LOGPATH := $(ROOT_PATH)/log_manager +UTILSPATH := $(ROOT_PATH)/utils +QCLASSPATH := $(ROOT_PATH)/query_classifier + +CC=cc +CFLAGS=-c -fPIC -I/usr/include -I../../include -I../../../include \ + -I$(LOGPATH) -I$(UTILSPATH) -I$(QCLASSPATH) \ + $(MYSQL_HEADERS) -Wall -g + +include ../../../../makefile.inc + +LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \ + -Wl,-rpath,$(DEST)/lib \ + -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \ + -Wl,-rpath,$(EMBEDDED_LIB) + +SRCS=blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c +OBJ=$(SRCS:.c=.o) +LIBS=-lssl -pthread -llog_manager -lmysqld +MODULES=libbinlogrouter.so + +all: $(MODULES) + +$(MODULES): $(OBJ) + $(CC) $(LDFLAGS) $(OBJ) $(UTILSPATH)/skygw_utils.o $(LIBS) -o $@ + +.c.o: + $(CC) $(CFLAGS) $< -o $@ + +clean: + rm -f $(OBJ) $(MODULES) + +tags: + ctags $(SRCS) $(HDRS) + +depend: + @rm -f depend.mk + cc -M $(CFLAGS) $(SRCS) > depend.mk + +install: $(MODULES) + install -D $(MODULES) $(DEST)/MaxScale/modules + +include depend.mk diff --git a/server/modules/routing/binlog/README b/server/modules/routing/binlog/README new file mode 100644 index 000000000..514b48341 --- /dev/null +++ b/server/modules/routing/binlog/README @@ -0,0 +1,53 @@ +The binlog router is not a "normal" MaxScale router, it is not +designed to be used to route client requests to a database in the +usual proxy fashion. Rather it is designed to allow MaxScale to be +used as a relay server in a MySQL replication environment. + +In this environment MaxScale sits between a master MySQL server and +a set of slave servers. The slaves servers execute a change master +to the MaxScale server, otehrwise they are configured in exactly +the same way as a normal MySQL slave server. + +The master server configuration is unaltered, it simply sees a +single slave server. + +MaxScale is configured as usual, with a service definition that +references the binlog router. The major configuration option to +consider is the router_options paramter, in the binlog router this +provides the binlog specific configuration parameters. + + uuid= + This is the UUID that MaxScale uses when it connects + to the real master. It will report the master's + UUID to slaves that connect to it. + + server-id= + The server-id that MaxScale uses when it connects + to the real master server. Again it will reports + the master's server-id to the slaves that connect + to it. + user= + The user that MaxScale uses to login to the real + master + password= + The password that MaxScale uses to login to the + real master + master-id= + The server-id of the real master. MaxScale should + get this by sending a query, but at the moment it + is in the configuration file for ease of implementation + + +An example binlog service configuration is shown below: + +[Binlog Service] +type=service +router=binlogrouter +servers=master +router_options=uuid=f12fcb7f-b97b-11e3-bc5e-0401152c4c22,server-id=3,user=repl,password=slavepass,master-id=1 +user=maxscale +passwd=Mhu87p2D + +The servers list for a binlog router service should contain just +the master server. In future a list will be given and the monitor +used to determine which server is the current master server. diff --git a/server/modules/routing/binlog/STATUS b/server/modules/routing/binlog/STATUS new file mode 100644 index 000000000..bd981306b --- /dev/null +++ b/server/modules/routing/binlog/STATUS @@ -0,0 +1,18 @@ +The binlog router contained here is a prototype implementation and +should not be consider as production ready. + +The router has been written and tested with MySQL 5.6 as a reference +for the replication behaviour, more investigation and implementation +is likely to be needed in order to use other versions of MySQL, +MariaDB or Percona Server. + +To Do List: + +1. Thread safety needs to be examine, currently MaxScale has been +run with a single thread when testing this router. + +2. Binlog rotate events have yet to be tested. + +3. The router does not implement the replication heartbeat mechanism. + +4. Performance measurements have yet to be made. diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c new file mode 100644 index 000000000..005f360dc --- /dev/null +++ b/server/modules/routing/binlog/blr.c @@ -0,0 +1,654 @@ +/* + * This file is distributed as part of MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file blr.c - binlog router, allows MaxScale to act as an intermediatory for replication + * + * The binlog router is designed to be used in replication environments to + * increase the replication fanout of a master server. It provides a transparant + * mechanism to read the binlog entries for multiple slaves while requiring + * only a single connection to the actual master to support the slaves. + * + * The current prototype implement is designed to support MySQL 5.6 and has + * a number of limitations. This prototype is merely a proof of concept and + * should not be considered production ready. + * + * @verbatim + * Revision History + * + * Date Who Description + * 02/04/2014 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +extern int lm_enabled_logfiles_bitmask; + +static char *version_str = "V1.0.0"; + +/* The router entry points */ +static ROUTER *createInstance(SERVICE *service, char **options); +static void *newSession(ROUTER *instance, SESSION *session); +static void closeSession(ROUTER *instance, void *router_session); +static void freeSession(ROUTER *instance, void *router_session); +static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); +static void diagnostics(ROUTER *instance, DCB *dcb); +static void clientReply( + ROUTER *instance, + void *router_session, + GWBUF *queue, + DCB *backend_dcb); +static void errorReply( + ROUTER *instance, + void *router_session, + char *message, + DCB *backend_dcb, + int action); +static uint8_t getCapabilities (ROUTER* inst, void* router_session); + + +/** The module object definition */ +static ROUTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + routeQuery, + diagnostics, + clientReply, + errorReply, + getCapabilities +}; + +static bool rses_begin_locked_router_action(ROUTER_SLAVE *); +static void rses_end_locked_router_action(ROUTER_SLAVE *); + +static SPINLOCK instlock; +static ROUTER_INSTANCE *instances; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "Initialise binlog router module %s.\n", version_str))); + spinlock_init(&instlock); + instances = NULL; +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +ROUTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the router for a particular service + * within MaxScale. + * + * The process of creating the instance causes the router to register + * with the master server and begin replication of the binlogs from + * the master server to MaxScale. + * + * @param service The service this router is being create for + * @param options An array of options for this query router + * + * @return The instance data for this new instance + */ +static ROUTER * +createInstance(SERVICE *service, char **options) +{ +ROUTER_INSTANCE *inst; +char *value; +int i; + + if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { + return NULL; + } + + memset(&inst->stats, 0, sizeof(ROUTER_STATS)); + memset(&inst->saved_master, 0, sizeof(MASTER_RESPONSES)); + + inst->service = service; + spinlock_init(&inst->lock); + + inst->low_water = DEF_LOW_WATER; + inst->high_water = DEF_HIGH_WATER; + + /* + * We only support one server behind this router, since the server is + * the master from which we replicate binlog records. Therefore check + * that only one server has been defined. + * + * A later improvement will be to define multiple servers and have the + * router use the information that is supplied by the monitor to find + * which of these servers is currently the master and replicate from + * that server. + */ + if (service->databases == NULL || service->databases->nextdb != NULL) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Exactly one database server may be " + "for use with the binlog router."))); + } + + + /* + * Process the options. + * We have an array of attrbute values passed to us that we must + * examine. Supported attributes are: + * uuid= + * server-id= + * user= + * password= + * master-id= + */ + if (options) + { + for (i = 0; options[i]; i++) + { + if ((value = strchr(options[i], '=')) == NULL) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Warning : Unsupported router " + "option %s for binlog router.", + options[i]))); + } + else + { + *value = 0; + value++; + if (strcmp(options[i], "uuid") == 0) + { + inst->uuid = strdup(value); + } + else if (strcmp(options[i], "server-id") == 0) + { + inst->serverid = atoi(value); + } + else if (strcmp(options[i], "user") == 0) + { + inst->user = strdup(value); + } + else if (strcmp(options[i], "password") == 0) + { + inst->password = strdup(value); + } + else if (strcmp(options[i], "master-id") == 0) + { + inst->masterid = atoi(value); + } + else if (strcmp(options[i], "lowwater") == 0) + { + inst->low_water = atoi(value); + } + else if (strcmp(options[i], "highwater") == 0) + { + inst->high_water = atoi(value); + } + else + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Unsupported router " + "option %s for binlog router.", + options[i]))); + } + } + } + } + + /* + * We have completed the creation of the instance data, so now + * insert this router instance into the linked list of routers + * that have been created with this module. + */ + spinlock_acquire(&instlock); + inst->next = instances; + instances = inst; + spinlock_release(&instlock); + + inst->active_logs = 0; + + /* + * Initialise the binlog file and position + */ + blr_file_init(inst); + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Binlog router: current binlog file is: %s, current position %u\n", + inst->binlog_name, inst->binlog_position))); + + /* + * Initialise the binlog cache for this router instance + */ + blr_init_cache(inst); + + /* + * Now start the replication from the master to MaxScale + */ + blr_start_master(inst); + + return (ROUTER *)inst; +} + +/** + * Associate a new session with this instance of the router. + * + * In the case of the binlog router a new session equates to a new slave + * connecting to MaxScale and requesting binlog records. We need to go + * through the slave registration process for this new slave. + * + * @param instance The router instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(ROUTER *instance, SESSION *session) +{ +ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; +ROUTER_SLAVE *slave; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "binlog router: %lu [newSession] new router session with " + "session %p, and inst %p.", + pthread_self(), + session, + inst))); + + + if ((slave = (ROUTER_SLAVE *)calloc(1, sizeof(ROUTER_SLAVE))) == NULL) + { + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_ERROR, + "Insufficient memory to create new slave session for binlog router"))); + return NULL; + } + +#if defined(SS_DEBUG) + slave->rses_chk_top = CHK_NUM_ROUTER_SES; + slave->rses_chk_tail = CHK_NUM_ROUTER_SES; +#endif + + memset(&slave->stats, 0, sizeof(SLAVE_STATS)); + atomic_add(&inst->stats.n_slaves, 1); + slave->state = BLRS_CREATED; /* Set initial state of the slave */ + slave->dcb = session->client; + slave->router = instance; + + /** + * Add this session to the list of active sessions. + */ + spinlock_acquire(&inst->lock); + slave->next = inst->slaves; + inst->slaves = slave; + spinlock_release(&inst->lock); + + CHK_CLIENT_RSES(slave); + + return (void *)slave; +} + +/** + * The session is no longer required. Shutdown all operation and free memory + * associated with this session. In this case a single session is associated + * to a slave of MaxScale. Therefore this is called when that slave is no + * longer active and should remove of reference to that slave, free memory + * and prevent any further forwarding of binlog records to that slave. + * + * Parameters: + * @param router_instance The instance of the router + * @param router_cli_ses The particular session to free + * + */ +static void freeSession( + ROUTER* router_instance, + void* router_client_ses) +{ +ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)router_instance; +ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_client_ses; +int prev_val; + + prev_val = atomic_add(&router->stats.n_slaves, -1); + ss_dassert(prev_val > 0); + + /* + * Remove the slave session form the list of slaves that are using the + * router currently. + */ + spinlock_acquire(&router->lock); + if (router->slaves == slave) { + router->slaves = slave->next; + } else { + ROUTER_SLAVE *ptr = router->slaves; + + while (ptr != NULL && ptr->next != slave) { + ptr = ptr->next; + } + + if (ptr != NULL) { + ptr->next = slave->next; + } + } + spinlock_release(&router->lock); + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [freeSession] Unlinked router_client_session %p from " + "router %p. Connections : %d. ", + pthread_self(), + slave, + router, + prev_val-1))); + + free(slave); +} + + +/** + * Close a session with the router, this is the mechanism + * by which a router may cleanup data structure etc. + * + * @param instance The router instance data + * @param router_session The session being closed + */ +static void +closeSession(ROUTER *instance, void *router_session) +{ +ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; +ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; + + if (slave == NULL) + { + /* + * We must be closing the master session. + * + * TODO: Handle closure of master session + */ + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_ERROR, "Binlog router close session with master"))); + return; + } + CHK_CLIENT_RSES(slave); + /** + * Lock router client session for secure read and update. + */ + if (rses_begin_locked_router_action(slave)) + { + /* decrease server registered slaves counter */ + atomic_add(&router->stats.n_registered, -1); + + /* + * Mark the slave as unregistered to prevent the forwarding + * of any more binlog records to this slave. + */ + slave->state = BLRS_UNREGISTERED; + + /* Unlock */ + rses_end_locked_router_action(slave); + } +} + +/** + * We have data from the client, this is likely to be packets related to + * the registration of the slave to receive binlog records. Unlike most + * MaxScale routers there is no forwarding to the backend database, merely + * the return of either predefined server responses that have been cached + * or binlog records. + * + * @param instance The router instance + * @param router_session The router session returned from the newSession call + * @param queue The queue of data buffers to route + * @return The number of bytes sent + */ +static int +routeQuery(ROUTER *instance, void *router_session, GWBUF *queue) +{ +ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; +ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; + + return blr_slave_request(router, slave, queue); +} + +static char *event_names[] = { + "Invalid", "Start Event V3", "Query Event", "Stop Event", "Rotate Event", + "Integer Session Variable", "Load Event", "Slave Event", "Create File Event", + "Append Block Event", "Exec Load Event", "Delete File Event", + "New Load Event", "Rand Event", "User Variable Event", "Format Description Event", + "Transaction ID Event (2 Phase Commit)", "Begin Load Query Event", + "Execute Load Query Event", "Table Map Event", "Write Rows Event (v0)", + "Update Rows Event (v0)", "Delete Rows Event (v0)", "Write Rows Event (v1)", + "Update Rows Event (v1)", "Delete Rows Event (v1)", "Incident Event", + "Heartbeat Event", "Ignorable Event", "Rows Query Event", "Write Rows Event (v2)", + "Update Rows Event (v2)", "Delete Rows Event (v2)", "GTID Event", + "Anonymous GTID Event", "Previous GTIDS Event" +}; +/** + * Display router diagnostics + * + * @param instance Instance of the router + * @param dcb DCB to send diagnostics to + */ +static void +diagnostics(ROUTER *router, DCB *dcb) +{ +ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; +ROUTER_SLAVE *session; +int i = 0; + + spinlock_acquire(&router_inst->lock); + session = router_inst->slaves; + while (session) + { + i++; + session = session->next; + } + spinlock_release(&router_inst->lock); + + dcb_printf(dcb, "\tCurrent binlog file: %s\n", + router_inst->binlog_name); + dcb_printf(dcb, "\tCurrent binlog position: %u\n", + router_inst->binlog_position); + dcb_printf(dcb, "\tNumber of slave servers: %u\n", + router_inst->stats.n_slaves); + dcb_printf(dcb, "\tNumber of binlog events received: %u\n", + router_inst->stats.n_binlogs); + dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", + router_inst->stats.n_fakeevents); + dcb_printf(dcb, "\tNumber of binlog events in error: %u\n", + router_inst->stats.n_binlog_errors); + dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n", + router_inst->stats.n_rotates); + dcb_printf(dcb, "\tNumber of binlog cache hits: %u\n", + router_inst->stats.n_cachehits); + dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n", + router_inst->stats.n_cachemisses); + dcb_printf(dcb, "\tNumber of packets received: %u\n", + router_inst->stats.n_reads); + dcb_printf(dcb, "\tAverage events per packet %.1f\n", + (double)router_inst->stats.n_binlogs / router_inst->stats.n_reads); + dcb_printf(dcb, "\tEvents received:\n"); + for (i = 0; i < 0x24; i++) + { + dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]); + } + + if (router_inst->slaves) + { + dcb_printf(dcb, "\tSlaves:\n"); + spinlock_acquire(&router_inst->lock); + session = router_inst->slaves; + while (session) + { + dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid); + if (session->hostname) + dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname); + dcb_printf(dcb, "\t\tSlave DCB: %x\n", session->dcb); + dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno); + dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]); + dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile); + dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos); + dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests); + dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events); + dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts); + dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); + session = session->next; + } + spinlock_release(&router_inst->lock); + } +} + +/** + * Client Reply routine - in this case this is a message from the + * master server, It should be sent to the state machine that manages + * master packets as it may be binlog records or part of the registration + * handshake that takes part during connection establishment. + * + * + * @param instance The router instance + * @param router_session The router session + * @param master_dcb The DCB for the connection to the master + * @param queue The GWBUF with reply data + */ +static void +clientReply(ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb) +{ +ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; + + atomic_add(&router->stats.n_reads, 1); + blr_master_response(router, queue); +} + +/** + * Error Reply routine + * + * The routine will reply to client errors and/or closing the session + * or try to open a new backend connection. + * + * @param instance The router instance + * @param router_session The router session + * @param message The error message to reply + * @param backend_dcb The backend DCB + * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION + * + */ +static void +errorReply( + ROUTER *instance, + void *router_session, + char *message, + DCB *backend_dcb, + int action) +{ +} + +/** to be inline'd */ +/** + * @node Acquires lock to router client session if it is not closed. + * + * Parameters: + * @param rses - in, use + * + * + * @return true if router session was not closed. If return value is true + * it means that router is locked, and must be unlocked later. False, if + * router was closed before lock was acquired. + * + * + * @details (write detailed description here) + * + */ +static bool rses_begin_locked_router_action(ROUTER_SLAVE *rses) +{ + bool succp = false; + + CHK_CLIENT_RSES(rses); + + spinlock_acquire(&rses->rses_lock); + succp = true; + + return succp; +} + +/** to be inline'd */ +/** + * @node Releases router client session lock. + * + * Parameters: + * @param rses - + * + * + * @return void + * + * + * @details (write detailed description here) + * + */ +static void rses_end_locked_router_action(ROUTER_SLAVE * rses) +{ + CHK_CLIENT_RSES(rses); + spinlock_release(&rses->rses_lock); +} + + +static uint8_t getCapabilities(ROUTER *inst, void *router_session) +{ + return 0; +} diff --git a/server/modules/routing/binlog/blr_cache.c b/server/modules/routing/binlog/blr_cache.c new file mode 100644 index 000000000..5bc46f036 --- /dev/null +++ b/server/modules/routing/binlog/blr_cache.c @@ -0,0 +1,69 @@ +/* + * This file is distributed as part of MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file blr_cache.c - binlog router cache, manage the binlog cache + * + * The binlog router is designed to be used in replication environments to + * increase the replication fanout of a master server. It provides a transparant + * mechanism to read the binlog entries for multiple slaves while requiring + * only a single connection to the actual master to support the slaves. + * + * The current prototype implement is designed to support MySQL 5.6 and has + * a number of limitations. This prototype is merely a proof of concept and + * should not be considered production ready. + * + * @verbatim + * Revision History + * + * Date Who Description + * 07/04/2014 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +extern int lm_enabled_logfiles_bitmask; + + +/** + * Initialise the cache for this instanceof the binlog router. As a side + * effect also determine the binlog file to read and the position to read + * from. + * + * @param router The router instance + */ +void +blr_init_cache(ROUTER_INSTANCE *router) +{ +} diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c new file mode 100644 index 000000000..c1ca8d2fd --- /dev/null +++ b/server/modules/routing/binlog/blr_file.c @@ -0,0 +1,282 @@ +/* + * This file is distributed as part of MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file blr_file.c - contains code for the router binlog file management + * + * + * @verbatim + * Revision History + * + * Date Who Description + * 14/04/2014 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +extern int lm_enabled_logfiles_bitmask; + +static void blr_file_create(ROUTER_INSTANCE *router, char *file); +static void blr_file_append(ROUTER_INSTANCE *router, char *file); +static uint32_t extract_field(uint8_t *src, int bits); + +/** + * Initialise the binlog file for this instance. MaxScale will look + * for all the binlogs that it has on local disk, determien the next + * binlog to use and initialise it for writing, determining the + * next record to be fetched from the real master. + * + * @param router The router instance this defines the master for this replication chain + */ +void +blr_file_init(ROUTER_INSTANCE *router) +{ +char *ptr, path[1024], filename[1050]; +int file_found, n = 1; + + strcpy(path, "/usr/local/skysql/MaxScale"); + if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + { + strcpy(path, ptr); + } + strcat(path, "/"); + strcat(path, router->service->name); + + if (access(path, R_OK) == -1) + mkdir(path, 0777); + file_found = 0; + do { + sprintf(filename, "%s/" BINLOG_NAMEFMT, path, n); + if (access(filename, R_OK) != -1) + { + file_found = 1; + n++; + } + else + file_found = 0; + } while (file_found); + n--; + + if (n == 0) // No binlog files found + { + sprintf(filename, BINLOG_NAMEFMT, 1); + blr_file_create(router, filename); + } + else + { + sprintf(filename, BINLOG_NAMEFMT, n); + blr_file_append(router, filename); + } + +} + +void +blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos) +{ + blr_file_create(router, file); +} + + +/** + * Create a new binlog file for the router to use. + * + * @param router The router instance + * @param file The binlog file name + */ +static void +blr_file_create(ROUTER_INSTANCE *router, char *file) +{ +char *ptr, path[1024]; +int fd; +unsigned char magic[] = BINLOG_MAGIC; + + strcpy(path, "/usr/local/skysql/MaxScale"); + if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + { + strcpy(path, ptr); + } + strcat(path, "/"); + strcat(path, router->service->name); + strcat(path, "/"); + strcat(path, file); + + if ((fd = open(path, O_RDWR|O_CREAT, 0666)) != -1) + { + write(fd, magic, 4); + } + fsync(fd); + close(router->binlog_fd); + strcpy(router->binlog_name, file); + router->binlog_position = 4; /* Initial position after the magic number */ + router->binlog_fd = fd; +} + + +/** + * Prepare an existing binlog file to be appened to. + * + * @param router The router instance + * @param file The binlog file name + */ +static void +blr_file_append(ROUTER_INSTANCE *router, char *file) +{ +char *ptr, path[1024]; +int fd; + + strcpy(path, "/usr/local/skysql/MaxScale"); + if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + { + strcpy(path, ptr); + } + strcat(path, "/"); + strcat(path, router->service->name); + strcat(path, "/"); + strcat(path, file); + + fd = open(path, O_RDWR|O_APPEND, 0666); + fsync(fd); + close(router->binlog_fd); + strcpy(router->binlog_name, file); + router->binlog_position = lseek(fd, 0L, SEEK_END); + router->binlog_fd = fd; +} + +/** + * Write a binlog entry to disk. + * + * @param router The router instance + * @param buf The binlog record + * @param len The length of the binlog record + */ +void +blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf) +{ + pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size); + router->binlog_position = hdr->next_pos; +} + +/** + * Flush the content of the binlog file to disk. + * + * @param router The binlog router + */ +void +blr_file_flush(ROUTER_INSTANCE *router) +{ + fsync(router->binlog_fd); +} + +int +blr_open_binlog(ROUTER_INSTANCE *router, char *binlog) +{ +char *ptr, path[1024]; + + strcpy(path, "/usr/local/skysql/MaxScale"); + if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + { + strcpy(path, ptr); + } + strcat(path, "/"); + strcat(path, router->service->name); + strcat(path, "/"); + strcat(path, binlog); + + return open(path, O_RDONLY, 0666); +} + +/** + * Read a replication event into a GWBUF structure. + * + * @param fd File descriptor of the binlog file + * @param pos Position of binlog record to read + * @param hdr Binlog header to populate + * @return The binlog record wrapped in a GWBUF structure + */ +GWBUF * +blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr) +{ +uint8_t hdbuf[19]; +GWBUF *result; +unsigned char *data; + + if (lseek(fd, pos, SEEK_SET) != pos) + { + return NULL; + } + + /* Read the header information from the file */ + if (read(fd, hdbuf, 19) != 19) + return NULL; + hdr->timestamp = extract_field(hdbuf, 32); + hdr->event_type = hdbuf[4]; + hdr->serverid = extract_field(&hdbuf[5], 32); + hdr->event_size = extract_field(&hdbuf[9], 32); + hdr->next_pos = extract_field(&hdbuf[13], 32); + hdr->flags = extract_field(&hdbuf[17], 16); + if ((result = gwbuf_alloc(hdr->event_size)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Failed to allocate memory for binlog entry.\n"))); + return NULL; + } + data = GWBUF_DATA(result); + memcpy(data, hdbuf, 19); // Copy the header in + read(fd, &data[19], hdr->event_size - 19); // Read the balance + return result; +} + +/** + * Extract a numeric field from a packet of the specified number of bits + * + * @param src The raw packet source + * @param birs The number of bits to extract (multiple of 8) + */ +static uint32_t +extract_field(uint8_t *src, int bits) +{ +uint32_t rval = 0, shift = 0; + + while (bits > 0) + { + rval |= (*src++) << shift; + shift += 8; + bits -= 8; + } + return rval; +} diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c new file mode 100644 index 000000000..46b2ee7b6 --- /dev/null +++ b/server/modules/routing/binlog/blr_master.c @@ -0,0 +1,654 @@ +/* + * This file is distributed as part of MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file blr_master.c - contains code for the router to master communication + * + * The binlog router is designed to be used in replication environments to + * increase the replication fanout of a master server. It provides a transparant + * mechanism to read the binlog entries for multiple slaves while requiring + * only a single connection to the actual master to support the slaves. + * + * The current prototype implement is designed to support MySQL 5.6 and has + * a number of limitations. This prototype is merely a proof of concept and + * should not be considered production ready. + * + * @verbatim + * Revision History + * + * Date Who Description + * 02/04/2014 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +/* Temporary requirement for auth data */ +#include + +extern int lm_enabled_logfiles_bitmask; + +static GWBUF *blr_make_query(char *statement); +static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); +static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router); +static void encode_value(unsigned char *data, unsigned int value, int len); +static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); +static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr); +static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); +static void *CreateMySQLAuthData(char *username, char *password, char *database); +static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); +static uint32_t extract_field(uint8_t *src, int bits); + +/** + * blr_start_master - controls the connection of the binlog router to the + * master MySQL server and triggers the slave registration process for + * the router. + * + * @param router The router instance + */ +void +blr_start_master(ROUTER_INSTANCE *router) +{ +DCB *client; +GWBUF *buf; + + client = dcb_alloc(DCB_ROLE_INTERNAL); + client->data = CreateMySQLAuthData(router->user, router->password, ""); + router->session = session_alloc(router->service, client); + client->session = router->session; + router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL); + router->master_state = BLRM_AUTHENTICATED; + buf = blr_make_query("SELECT UNIX_TIMESTAMP()"); + router->master->func.write(router->master, buf); + router->master_state = BLRM_TIMESTAMP; +} + +/** + * Binlog router master side state machine event handler. + * + * Handles an incoming response from the master server to the binlog + * router. + * + * @param router The router instance + * @param buf The incoming packet + */ +void +blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) +{ +char query[128]; + + /* + * We need to make sure that incoming packets (gwbufs) are + * strictly processed in order and that we do not have packets + * from the same master being processed on multiple threads. + * to do this we create a queue of the GWBUF structures and have + * a flag that indicates if this routine is processing a packet + * on another thread. Items will be added to the queue if the + * routine is running in another thread. That thread will read + * the queue before returning. + * The action of adding items to the queue is protected by a + * spinlock and a flag that inidicates if the routine running + * in the other thread has reached the point at which it will + * no longer check the queue before returning. In order to + * manipulate the queue or the flag then router spinlock must + * be held. + */ + spinlock_acquire(&router->lock); + if (router->active_logs) + { + /* + * Thread already processing a packet and has not got + * to the point that it will not look at new packets + * added to the queue. + */ + router->queue = gwbuf_append(router->queue, buf); + spinlock_release(&router->lock); + return; + } + else + { + router->active_logs = 1; + } + spinlock_release(&router->lock); + + if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", + router->master_state))); + gwbuf_consume(buf, gwbuf_length(buf)); + spinlock_acquire(&router->lock); + router->active_logs = 0; + spinlock_release(&router->lock); + return; + } + + if (MYSQL_RESPONSE_ERR(buf)) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_ERROR, + "Received error: %d, %s from master during %s phase of the master state machine.\n", + MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] + ))); + gwbuf_consume(buf, gwbuf_length(buf)); + spinlock_acquire(&router->lock); + router->active_logs = 0; + spinlock_release(&router->lock); + return; + } + do { + switch (router->master_state) + { + case BLRM_TIMESTAMP: + // Response to a timestamp message, no need to save this. + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); + router->master_state = BLRM_SERVERID; + router->master->func.write(router->master, buf); + break; + case BLRM_SERVERID: + // Response to fetch of master's server-id + router->saved_master.server_id = buf; + // TODO: Extract the value of server-id and place in router->master_id + buf = blr_make_query("SET @master_heartbeat_period = 1799999979520"); + router->master_state = BLRM_HBPERIOD; + router->master->func.write(router->master, buf); + break; + case BLRM_HBPERIOD: + // Response to set the heartbeat period + router->saved_master.heartbeat = buf; + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); + router->master_state = BLRM_CHKSUM1; + router->master->func.write(router->master, buf); + break; + case BLRM_CHKSUM1: + // Response to set the master binlog checksum + router->saved_master.chksum1 = buf; + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + buf = blr_make_query("SELECT @master_binlog_checksum"); + router->master_state = BLRM_CHKSUM2; + router->master->func.write(router->master, buf); + break; + case BLRM_CHKSUM2: + // Response to the master_binlog_checksum, should be stored + router->saved_master.chksum2 = buf; + buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); + router->master_state = BLRM_GTIDMODE; + router->master->func.write(router->master, buf); + break; + case BLRM_GTIDMODE: + // Response to the GTID_MODE, should be stored + router->saved_master.gtid_mode = buf; + buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); + router->master_state = BLRM_MUUID; + router->master->func.write(router->master, buf); + break; + case BLRM_MUUID: + // Response to the SERVER_UUID, should be stored + router->saved_master.uuid = buf; + sprintf(query, "SET @slave_uuid='%s'", router->uuid); + buf = blr_make_query(query); + router->master_state = BLRM_SUUID; + router->master->func.write(router->master, buf); + break; + case BLRM_SUUID: + // Response to the SET @server_uuid, should be stored + router->saved_master.setslaveuuid = buf; + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + buf = blr_make_query("SET NAMES latin1"); + router->master_state = BLRM_LATIN1; + router->master->func.write(router->master, buf); + break; + case BLRM_LATIN1: + // Response to the SET NAMES latin1, should be stored + router->saved_master.setnames = buf; + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + buf = blr_make_registration(router); + router->master_state = BLRM_REGISTER; + router->master->func.write(router->master, buf); + break; + case BLRM_REGISTER: + // Request a dump of the binlog file + buf = blr_make_binlog_dump(router); + router->master_state = BLRM_BINLOGDUMP; + router->master->func.write(router->master, buf); + break; + case BLRM_BINLOGDUMP: + // Main body, we have received a binlog record from the master + blr_handle_binlog_record(router, buf); + break; + } + + /* + * Check for messages queued by other threads. + */ + spinlock_acquire(&router->lock); + if ((buf = router->queue) != NULL) + { + router->queue = buf->next; + buf->next = NULL; + } + else + { + router->active_logs = 0; + } + spinlock_release(&router->lock); + } while (buf != NULL); +} + +/** + * Build a MySQL query into a GWBUF that we can send to the master database + * + * @param query The text of the query to send + */ +static GWBUF * +blr_make_query(char *query) +{ +GWBUF *buf; +unsigned char *data; +int len; + + if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL) + return NULL; + data = GWBUF_DATA(buf); + len = strlen(query) + 1; + encode_value(&data[0], len, 24); // Payload length + data[3] = 0; // Sequence id + // Payload + data[4] = COM_QUERY; // Command + memcpy(&data[5], query, strlen(query)); + + return buf; +} + +/** + * Build a MySQL slave registration into a GWBUF that we can send to the + * master database + * + * @param router The router instance + * @return A MySQL Replication registration message in a GWBUF structure + */ +static GWBUF * +blr_make_registration(ROUTER_INSTANCE *router) +{ +GWBUF *buf; +unsigned char *data; +int len = 18; + + if ((buf = gwbuf_alloc(len + 4)) == NULL) + return NULL; + data = GWBUF_DATA(buf); + encode_value(&data[0], len, 24); // Payload length + data[3] = 0; // Sequence ID + data[4] = COM_REGISTER_SLAVE; // Command + encode_value(&data[5], router->serverid, 32); // Slave Server ID + data[9] = 0; // Slave hostname length + data[10] = 0; // Slave username length + data[11] = 0; // Slave password length + encode_value(&data[12], router->service->ports->port, 16); // Slave master port + encode_value(&data[14], 0, 32); // Replication rank + encode_value(&data[18], router->masterid, 32); // Master server-id + + return buf; +} + + +/** + * Build a Binlog dump command into a GWBUF that we can send to the + * master database + * + * @param router The router instance + * @return A MySQL Replication COM_BINLOG_DUMP message in a GWBUF structure + */ +static GWBUF * +blr_make_binlog_dump(ROUTER_INSTANCE *router) +{ +GWBUF *buf; +unsigned char *data; +int len = 0x1b; + + if ((buf = gwbuf_alloc(len + 4)) == NULL) + return NULL; + data = GWBUF_DATA(buf); + + encode_value(&data[0], len,24); // Payload length + data[3] = 0; // Sequence ID + data[4] = COM_BINLOG_DUMP; // Command + encode_value(&data[5], router->binlog_position, 32); // binlog position + encode_value(&data[9], 0, 16); // Flags + encode_value(&data[11], router->serverid, 32); // Server-id of MaxScale + strncpy((char *)&data[15], router->binlog_name, + BINLOG_FNAMELEN); // binlog filename + return buf; +} + + +/** + * Encode a value into a number of bits in a MySQL packet + * + * @param data Point to location in target packet + * @param value The value to pack + * @param len Number of bits to encode value into + */ +static void +encode_value(unsigned char *data, unsigned int value, int len) +{ + while (len > 0) + { + *data++ = value & 0xff; + value >>= 8; + len -= 8; + } +} + +/** + * blr_handle_binlog_record - we have received binlog records from + * the master and we must now work out what to do with them. + * + * @param router The router instance + * @param pkt The binlog records + */ +static void +blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) +{ +uint8_t *msg, *ptr, *pdata; +REP_HEADER hdr; +int len, reslen; + + /* Prepend any residual buffer to the buffer chain we have been called with. */ + if (router->residual) + { + pkt = gwbuf_append(router->residual, pkt); + router->residual = NULL; + } + + while (pkt && gwbuf_length(pkt) > 24) + { + reslen = GWBUF_LENGTH(pkt); + pdata = GWBUF_DATA(pkt); + if (reslen < 3) // Payload length straddles buffers + { + /* Get the length of the packet from the residual and new packet */ + if (reslen >= 3) + { + len = extract_field(pdata, 24); + } + else if (reslen == 2) + { + len = extract_field(pdata, 16); + len |= (extract_field(GWBUF_DATA(pkt->next), 8) << 16); + } + else if (reslen == 1) + { + len = extract_field(pdata, 8); + len |= (extract_field(GWBUF_DATA(pkt->next), 16) << 8); + } + len += 4; // Allow space for the header + } + else + { + len = extract_field(pdata, 24) + 4; + } + + if (reslen < len && pkt->next) // Message straddles buffers + { + /* Allocate a contiguous buffer for the binlog message */ + msg = malloc(len); + + memcpy(msg, pdata, reslen); + memcpy(&msg[reslen], GWBUF_DATA(pkt->next), len - reslen); + ptr = msg; + } + else if (reslen < len) // Message straddles buffers + { + break; + } + else + { + ptr = pdata; + msg = NULL; + } + + blr_extract_header(ptr, &hdr); + if (hdr.ok == 0) + { + router->stats.n_binlogs++; + +// #define SHOW_EVENTS +#ifdef SHOW_EVENTS + printf("blr: event type 0x%02x, flags 0x%04x, event size %d\n", hdr.event_type, hdr.flags, hdr.event_size); +#endif + if (hdr.event_type >= 0 && hdr.event_type < 0x24) + router->stats.events[hdr.event_type]++; + if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0) + { + // Fake format description message + router->stats.n_fakeevents++; + if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) + { + /* + * We need to save this to replay to new + * slaves that attach later. + */ + if (router->saved_master.fde_event) + free(router->saved_master.fde_event); + router->saved_master.fde_len = hdr.event_size; + router->saved_master.fde_event = malloc(hdr.event_size); + if (router->saved_master.fde_event) + memcpy(router->saved_master.fde_event, + ptr + 5, hdr.event_size); + } + } + else + { + if (hdr.event_type == ROTATE_EVENT) + { + blr_rotate_event(router, ptr, &hdr); + } + if (hdr.event_type == HEARTBEAT_EVENT) + { +#ifdef SHOW_EVENTS + printf("Replication heartbeat\n"); +#endif + ; + } + else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) + { + ptr = ptr + 5; // We don't put the first byte of the payload + // into the binlog file + blr_write_binlog_record(router, &hdr, ptr); + blr_distribute_binlog_record(router, &hdr, ptr); + } + } + } + else + { + printf("Binlog router error: %s\n", &ptr[7]); + router->stats.n_binlog_errors++; + } + + if (msg) + { + free(msg); + pkt = gwbuf_consume(pkt, reslen); + pkt = gwbuf_consume(pkt, len - reslen); + } + else + { + pkt = gwbuf_consume(pkt, 4 + hdr.payload_len); + } + } + + /* + * Check if we have a residual, part binlog message to deal with. + * Just simply store the GWBUF for next time + */ + if (pkt) + { + router->residual = pkt; + } + blr_file_flush(router); +} + +/** + * Populate a header structure for a replication message from a GWBUF structure. + * + * @param pkt The incoming packet in a GWBUF chain + * @param hdr The packet header to populate + */ +static void +blr_extract_header(uint8_t *ptr, REP_HEADER *hdr) +{ + + hdr->payload_len = extract_field(ptr, 24); + hdr->seqno = ptr[3]; + hdr->ok = ptr[4]; + hdr->timestamp = extract_field(&ptr[5], 32); + hdr->event_type = ptr[9]; + hdr->serverid = extract_field(&ptr[10], 32); + hdr->event_size = extract_field(&ptr[14], 32); + hdr->next_pos = extract_field(&ptr[18], 32); + hdr->flags = extract_field(&ptr[22], 16); +} + +/** + * Extract a numeric field from a packet of the specified number of bits + * + * @param src The raw packet source + * @param birs The number of bits to extract (multiple of 8) + */ +static uint32_t +extract_field(uint8_t *src, int bits) +{ +uint32_t rval = 0, shift = 0; + + while (bits > 0) + { + rval |= (*src++) << shift; + shift += 8; + bits -= 8; + } + return rval; +} + +/** + * Process a binlog rotate event. + * + * @param router The instance of the router + * @param ptr The packet containing the rotate event + * @param hdr The replication message header + */ +static void +blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr) +{ +int len; +uint64_t pos; +char file[BINLOG_FNAMELEN+1]; + + ptr += 4; // Skip packet header + ptr++; // Skip the OK + ptr += 19; // Skip event header + len = hdr->event_size - 19; // Event size minus header + pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); + memcpy(file, ptr + 8, BINLOG_FNAMELEN); + file[BINLOG_FNAMELEN] = 0; + +#ifdef VEBOSE_ROTATE + printf("binlog rotate: "); + while (len--) + printf("0x%02x ", *ptr++); + printf("\n"); + printf("New file: %s @ %ld\n", file, pos); +#endif + + if (strncmp(router->binlog_name, file, BINLOG_FNAMELEN) != 0) + { + router->stats.n_rotates++; + blr_file_rotate(router, file, pos); + } +} + +/** + * Create the auth data needed to be able to call dcb_connect. + * + * This doesn't really belong here and should be moved at some stage. + */ +static void * +CreateMySQLAuthData(char *username, char *password, char *database) +{ +MYSQL_session *auth_info; + + if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL) + return NULL; + strcpy(auth_info->user, username); + strcpy(auth_info->db, database); + gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1); + + return auth_info; +} + +/** + * Distribute the binlog record we have just received to all the registered slaves. + * + * @param router The router instance + * @param hdr The replication event header + * @param ptr The raw replication eent data + */ +static void +blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) +{ +GWBUF *pkt; +uint8_t *buf; +ROUTER_SLAVE *slave; + + spinlock_acquire(&router->lock); + slave = router->slaves; + while (slave) + { + if (slave->binlog_pos == hdr->next_pos - hdr->event_size) + { + pkt = gwbuf_alloc(hdr->event_size + 5); + buf = GWBUF_DATA(pkt); + encode_value(buf, hdr->event_size + 1, 24); + buf += 3; + *buf++ = slave->seqno++; + *buf++ = 0; // OK + memcpy(buf, ptr, hdr->event_size); + slave->dcb->func.write(slave->dcb, pkt); + slave->binlog_pos = hdr->next_pos; + } + + slave = slave->next; + } + spinlock_release(&router->lock); +} diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c new file mode 100644 index 000000000..79a94844b --- /dev/null +++ b/server/modules/routing/binlog/blr_slave.c @@ -0,0 +1,767 @@ +/* + * This file is distributed as part of MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ + +/** + * @file blr_slave.c - contains code for the router to slave communication + * + * The binlog router is designed to be used in replication environments to + * increase the replication fanout of a master server. It provides a transparant + * mechanism to read the binlog entries for multiple slaves while requiring + * only a single connection to the actual master to support the slaves. + * + * The current prototype implement is designed to support MySQL 5.6 and has + * a number of limitations. This prototype is merely a proof of concept and + * should not be considered production ready. + * + * @verbatim + * Revision History + * + * Date Who Description + * 14/04/2014 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + + +static uint32_t extract_field(uint8_t *src, int bits); +static void encode_value(unsigned char *data, unsigned int value, int len); +static int blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); +static int blr_slave_replay(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *master); +static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *msg); +static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); +static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); +static int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); +static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); + +extern int lm_enabled_logfiles_bitmask; + +/** + * Process a request packet from the slave server. + * + * The router can handle a limited subset of requests from the slave, these + * include a subset of general SQL queries, a slave registeration command and + * the binlog dump command. + * + * The strategy for responding to these commands is to use caches responses + * for the the same commands that have previously been made to the real master + * if this is possible, if it is not then the router itself will synthesize a + * response. + * + * @param router The router instance this defines the master for this replication chain + * @param slave The slave specific data + * @param queue The incoming request packet + */ +int +blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) +{ + if (slave->state < 0 || slave->state > BLRS_MAXSTATE) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n", + slave->state))); + gwbuf_consume(queue, gwbuf_length(queue)); + return 0; + } + + atomic_add(&slave->stats.n_requests, 1); + switch (MYSQL_COMMAND(queue)) + { + case COM_QUERY: + return blr_slave_query(router, slave, queue); + break; + case COM_REGISTER_SLAVE: + return blr_slave_register(router, slave, queue); + break; + case COM_BINLOG_DUMP: + return blr_slave_binlog_dump(router, slave, queue); + break; + default: + break; + } + return 0; +} + +/** + * Handle a query from the slave. This is expected to be one of the "standard" + * queries we expect as part of the registraton process. Most of these can + * be dealt with by replying the stored responses we got from the master + * when MaxScale registered as a slave. The exception to the rule is the + * request to obtain the current timestamp value of the server. + * + * Three select statements are currently supported: + * SELECT UNIX_TIMESTAMP(); + * SELECT @master_binlog_checksum + * SELECT @@GLOBAL.GTID_MODE + * + * Two show commands are supported: + * SHOW VARIABLES LIKE 'SERVER_ID' + * SHOW VARIABLES LIKE 'SERVER_UUID' + * + * Four set commands are supported: + * SET @master_binlog_checksum = @@global.binlog_checksum + * SET @master_heartbeat_period=... + * SET @slave_slave_uuid=... + * SET NAMES latin1 + * + * @param router The router instance this defines the master for this replication chain + * @param slave The slave specific data + * @param queue The incoming request packet + * @return Non-zero if data has been sent + */ +static int +blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) +{ +char *qtext, *query_text; +char *sep = " ,="; +char *word, *brkb; +int query_len; + + qtext = GWBUF_DATA(queue); + query_len = extract_field(qtext, 24) - 1; + qtext += 5; // Skip header and first byte of the payload + query_text = strndup(qtext, query_len); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, "Execute statement from the slave '%s'\n", query_text))); + /* + * Implement a very rudimental "parsing" of the query text by extarcting the + * words from the statement and matchng them against the subset of queries we + * are expecting from the slave. We already have responses to these commands, + * except for the select of UNIX_TIMESTAMP(), that we have saved from MaxScale's + * own interaction with the real master. We simply replay these saved responses + * to the slave. + */ + word = strtok_r(query_text, sep, &brkb); + if (strcasecmp(word, "SELECT") == 0) + { + word = strtok_r(NULL, sep, &brkb); + if (strcasecmp(word, "UNIX_TIMESTAMP()") == 0) + { + free(query_text); + return blr_slave_send_timestamp(router, slave); + } + else if (strcasecmp(word, "@master_binlog_checksum") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.chksum2); + } + else if (strcasecmp(word, "@@GLOBAL.GTID_MODE") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.gtid_mode); + } + } + else if (strcasecmp(word, "SHOW") == 0) + { + word = strtok_r(NULL, sep, &brkb); + if (strcasecmp(word, "VARIABLES") == 0) + { + word = strtok_r(NULL, sep, &brkb); + if (strcasecmp(word, "LIKE") == 0) + { + word = strtok_r(NULL, sep, &brkb); + if (strcasecmp(word, "'SERVER_ID'") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.server_id); + } + else if (strcasecmp(word, "'SERVER_UUID'") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.uuid); + } + } + } + } + else if (strcasecmp(query_text, "SET") == 0) + { + word = strtok_r(NULL, sep, &brkb); + if (strcasecmp(word, "@master_heartbeat_period") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.heartbeat); + } + else if (strcasecmp(word, "@master_binlog_checksum") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.chksum1); + } + else if (strcasecmp(word, "@slave_uuid") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.setslaveuuid); + } + else if (strcasecmp(word, "NAMES") == 0) + { + word = strtok_r(NULL, sep, &brkb); + if (strcasecmp(word, "latin1") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.setnames); + } + } + } + free(query_text); + + query_text = strndup(qtext, query_len); + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Unexpected query from slave server %s\n", query_text))); + free(query_text); + blr_slave_send_error(router, slave, "Unexpected SQL query received from slave."); + return 0; +} + + +/** + * Send a reply to a command we have received from the slave. The reply itself + * is merely a copy of a previous message we received from the master when we + * registered as a slave. Hence we just replay this saved reply. + * + * @param router The binlog router instance + * @param slave The slave server to which we are sending the response + * @param master The saved master response + * @return Non-zero if data was sent + */ +static int +blr_slave_replay(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *master) +{ +GWBUF *clone; + + if (!master) + return 0; + if ((clone = gwbuf_clone(master)) != NULL) + { + return slave->dcb->func.write(slave->dcb, clone); + } + else + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Failed to clone server response to send to slave.\n"))); + return 0; + } +} + +/** + * Construct an error response + * + * @param router The router instance + * @param slave The slave server instance + * @param msg The error message to send + */ +static void +blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *msg) +{ +GWBUF *pkt; +unsigned char *data; +int len; + + if ((pkt = gwbuf_alloc(strlen(msg) + 13)) == NULL) + return NULL; + data = GWBUF_DATA(pkt); + len = strlen(msg) + 1; + encode_value(&data[0], len, 24); // Payload length + data[3] = 0; // Sequence id + // Payload + data[4] = 0xff; // Error indicator + data[5] = 0; // Error Code + data[6] = 0; // Error Code + strncpy(&data[7], "#00000", 6); + memcpy(&data[13], msg, strlen(msg)); // Error Message + slave->dcb->func.write(slave->dcb, pkt); +} + +/* + * Some standard packets that have been captured from a network trace of server + * interactions. These packets are the schema definition sent in response to + * a SELECT UNIX_TIMESTAMP() statement and the EOF packet that marks the end + * of transmission of the result set. + */ +static uint8_t timestamp_def[] = { + 0x01, 0x00, 0x00, 0x01, 0x01, 0x26, 0x00, 0x00, 0x02, 0x03, 0x64, 0x65, 0x66, 0x00, 0x00, 0x00, + 0x10, 0x55, 0x4e, 0x49, 0x58, 0x5f, 0x54, 0x49, 0x4d, 0x45, 0x53, 0x54, 0x41, 0x4d, 0x50, 0x28, + 0x29, 0x00, 0x0c, 0x3f, 0x00, 0x0b, 0x00, 0x00, 0x00, 0x08, 0x81, 0x00, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x03, 0xfe, 0x00, 0x00, 0x02, 0x00 +}; +static uint8_t timestamp_eof[] = { 0x05, 0x00, 0x00, 0x05, 0xfe, 0x00, 0x00, 0x02, 0x00 }; + +/** + * Send a response to a "SELECT UNIX_TIMESTAMP()" request. This differs from the other + * requests since we do not save a copy of the original interaction with the master + * and simply replay it. We want to always send the current time. We have stored a typcial + * response, which gives us the schema information normally returned. This is sent to the + * client and then we add a dynamic part that will insert the current timestamp data. + * Finally we send a preprepaed EOF packet to end the response stream. + * + * @param router The binlog router instance + * @param slave The slave server to which we are sending the response + * @return Non-zero if data was sent + */ +static int +blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +GWBUF *pkt; +char timestamp[20]; +uint8_t *ptr; +int len, ts_len; + + sprintf(timestamp, "%ld", time(0)); + ts_len = strlen(timestamp); + len = sizeof(timestamp_def) + sizeof(timestamp_eof) + 5 + ts_len; + if ((pkt = gwbuf_alloc(len)) == NULL) + return 0; + ptr = GWBUF_DATA(pkt); + memcpy(ptr, timestamp_def, sizeof(timestamp_def)); // Fixed preamble + ptr += sizeof(timestamp_def); + encode_value(ptr, ts_len + 1, 24); // Add length of data packet + ptr += 3; + *ptr++ = 0x04; // Sequence number in response + *ptr++ = ts_len; // Length of result string + strncpy(ptr, timestamp, ts_len); // Result string + ptr += ts_len; + memcpy(ptr, timestamp_eof, sizeof(timestamp_eof)); // EOF packet to terminate result + return slave->dcb->func.write(slave->dcb, pkt); +} + +/** + * Process a slave replication registration message. + * + * We store the various bits of information the slave gives us and generate + * a reply message. + * + * @param router The router instance + * @param slave The slave server + * @param queue The BINLOG_DUMP packet + * @return Non-zero if data was sent + */ +static int +blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) +{ +GWBUF *resp; +uint8_t *ptr; +int len, slen; + + ptr = GWBUF_DATA(queue); + len = extract_field(ptr, 24); + ptr += 4; // Skip length and sequence number + if (*ptr++ != COM_REGISTER_SLAVE) + return 0; + slave->serverid = extract_field(ptr, 32); + ptr += 4; + slen = *ptr++; + if (slen != 0) + { + slave->hostname = strndup(ptr, slen); + ptr += slen; + } + else + slave->hostname = NULL; + slen = *ptr++; + if (slen != 0) + { + ptr += slen; + slave->user = strndup(ptr, slen); + } + else + slave->user = NULL; + slen = *ptr++; + if (slen != 0) + { + slave->passwd = strndup(ptr, slen); + ptr += slen; + } + else + slave->passwd = NULL; + slave->port = extract_field(ptr, 16); + ptr += 2; + slave->rank = extract_field(ptr, 32); + + /* + * Now construct a response + */ + if ((resp = gwbuf_alloc(11)) == NULL) + return 0; + ptr = GWBUF_DATA(resp); + encode_value(ptr, 7, 24); // Payload length + ptr += 3; + *ptr++ = 1; // Sequence number + encode_value(ptr, 0, 24); + ptr += 3; + encode_value(ptr, slave->serverid, 32); + slave->state = BLRS_REGISTERED; + return slave->dcb->func.write(slave->dcb, resp); +} + +/** + * Process a COM_BINLOG_DUMP message from the slave. This is the + * final step in the process of registration. The new master, MaxScale + * must send a response packet and generate a fake BINLOG_ROTATE event + * with the binlog file requested by the slave. And then send a + * FORMAT_DESCRIPTION_EVENT that has been saved from the real master. + * + * Once send MaxScale must continue to send binlog events to the slave. + * + * @param router The router instance + * @param slave The slave server + * @param queue The BINLOG_DUMP packet + * @return The number of bytes written to the slave + */ +static int +blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) +{ +GWBUF *resp; +uint8_t *ptr; +int len, flags, serverid, rval; +REP_HEADER hdr; +uint32_t chksum; + + ptr = GWBUF_DATA(queue); + len = extract_field(ptr, 24); + ptr += 4; // Skip length and sequence number + if (*ptr++ != COM_BINLOG_DUMP) + return 0; + + slave->binlog_pos = extract_field(ptr, 32); + ptr += 4; + flags = extract_field(ptr, 16); + ptr += 2; + serverid = extract_field(ptr, 32); + ptr += 4; + strncpy(slave->binlogfile, ptr, BINLOG_FNAMELEN); + + slave->state = BLRS_DUMPING; + slave->seqno = 1; + + // Build a fake rotate event + resp = gwbuf_alloc(0x34); + hdr.payload_len = 0x30; + hdr.seqno = slave->seqno++; + hdr.ok = 0; + hdr.timestamp = 0L; + hdr.event_type = ROTATE_EVENT; + hdr.serverid = router->masterid; + hdr.event_size = 0x2f; + hdr.next_pos = slave->binlog_pos; + hdr.flags = 0; + ptr = blr_build_header(resp, &hdr); + encode_value(ptr, slave->binlog_pos, 64); + ptr += 8; + memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN); + ptr += BINLOG_FNAMELEN; + + /* + * Now add the CRC to the fake binlog rotate event. + * + * The algorithm is first to compute the checksum of an empty buffer + * and then the checksum of the event portion of the message, ie we do not + * include the length, sequence number and ok byte that makes up the first + * 5 bytes of the message. We also do not include the 4 byte checksum itself. + */ + chksum = crc32(0L, NULL, 0); + chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4); + encode_value(ptr, chksum, 32); + + rval = slave->dcb->func.write(slave->dcb, resp); + + /* Send the FORMAT_DESCRIPTION_EVENT */ + if (router->saved_master.fde_event) + { + resp = gwbuf_alloc(router->saved_master.fde_len + 5); + ptr = GWBUF_DATA(resp); + encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length + ptr += 3; + *ptr++ = slave->seqno++; + *ptr++ = 0; // OK + memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len); + encode_value(ptr, time(0), 32); // Overwrite timestamp + /* + * Since we have changed the timestamp we must recalculate the CRC + * + * Position ptr to the start of the event header, + * calculate a new checksum + * and write it into the header + */ + ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4; + chksum = crc32(0L, NULL, 0); + chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4); + encode_value(ptr, chksum, 32); + rval = slave->dcb->func.write(slave->dcb, resp); + } + + slave->dcb->low_water = router->low_water; + slave->dcb->high_water = router->high_water; + dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave); + + rval = blr_slave_catchup(router, slave); + + return rval; +} + +/** + * Extract a numeric field from a packet of the specified number of bits, + * the number of bits must be a multiple of 8. + * + * @param src The raw packet source + * @param bits The number of bits to extract (multiple of 8) + * @return The extracted value + */ +static uint32_t +extract_field(uint8_t *src, int bits) +{ +uint32_t rval = 0, shift = 0; + + while (bits > 0) + { + rval |= (*src++) << shift; + shift += 8; + bits -= 8; + } + return rval; +} + +/** + * Encode a value into a number of bits in a MySQL packet + * + * @param data Pointer to location in target packet + * @param value The value to encode into the buffer + * @param len Number of bits to encode value into + */ +static void +encode_value(unsigned char *data, unsigned int value, int len) +{ + while (len > 0) + { + *data++ = value & 0xff; + value >>= 8; + len -= 8; + } +} + + +/** + * Populate a header structure for a replication message from a GWBUF structure. + * + * @param pkt The incoming packet in a GWBUF chain + * @param hdr The packet header to populate + * @return A pointer to the first byte following the event header + */ +static uint8_t * +blr_build_header(GWBUF *pkt, REP_HEADER *hdr) +{ +uint8_t *ptr; + + ptr = GWBUF_DATA(pkt); + + encode_value(ptr, hdr->payload_len, 24); + ptr += 3; + *ptr++ = hdr->seqno; + *ptr++ = hdr->ok; + encode_value(ptr, hdr->timestamp, 32); + ptr += 4; + *ptr++ = hdr->event_type; + encode_value(ptr, hdr->serverid, 32); + ptr += 4; + encode_value(ptr, hdr->event_size, 32); + ptr += 4; + encode_value(ptr, hdr->next_pos, 32); + ptr += 4; + encode_value(ptr, hdr->flags, 16); + ptr += 2; + + return ptr; +} + +/** + * We have a registered slave that is behind the current leading edge of the + * binlog. We must replay the log entries to bring this node up to speed. + * + * There may be a large numebr of records to send to the slave, the process + * is triggered by the slave COM_BINLOG_DUMP message and all the events must + * be sent without receiving any new event. This measn there is no trigger into + * MaxScale other than this initial message. However, if we simply send all the + * events we end up with an extremely long write queue on the DCB and risk running + * the server out of resources. + * + * To resolve this the concept of high and low water marks within the DCB has been + * added, with the ability for the DCB code to call user defined callbacks when the + * write queue is completely drained, when it crosses above the high water mark and + * when it crosses below the low water mark. + * + * The blr_slave_catchup routine will send binlog events to the slave until the high + * water mark is reached, at which point it will return. Later, when a low water mark + * callback is generated by the code that drains the DCB of data the blr_slave_catchup + * routine will again be called to write more events. The process is repeated until + * the slave has caught up with the master. + * + * Note: an additional check that the DCB is still above the low water mark is done + * prior to the return from this function to allow for any delays due to the call to + * the close system call, since this may cause thread rescheduling. + * + * @param router The binlog router + * @param slave The slave that is behind + * @return The number of bytes written + */ +static int +blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +GWBUF *head, *record; +REP_HEADER hdr; +int written, fd, rval = 0, burst = 0; +uint8_t *ptr; +struct timespec req; + + + /* + * We have a slightly complex syncronisation mechansim here, + * we need to make sure that we do not have multiple threads + * running the catchup loop, but we need to be very careful + * that we do not loose a call that is coming via a callback + * call as this will stall the binlog catchup process. + * + * We don't want to simply use a traditional mutex here for + * the loop, since this would block a MaxScale thread for + * an unacceptable length of time. + * + * We have two status bits, the CS_READING that says we are + * in the outer loop and the CS_INNERLOOP, to say we are in + * the inner loop. + * + * If just CS_READING is set the thread other may be about to + * enter the inner loop or may be about to exit the function + * completely. therefore we have to wait to see if CS_READING + * is cleared or CS_INNERLOOP is set. + * + * If CS_READING gets cleared then this thread should proceed + * into the loop. + * + * If CS_INNERLOOP get's set then this thread does not need to + * proceed. + * + * If CS_READING is not set then this thread simply enters the + * loop. + */ + req.tv_sec = 0; + req.tv_nsec = 1000; + spinlock_acquire(&slave->catch_lock); + if (slave->cstate & CS_READING) + { + while ((slave->cstate & (CS_READING|CS_INNERLOOP)) == CS_READING) + { + spinlock_release(&slave->catch_lock); + nanosleep(&req, NULL); + spinlock_acquire(&slave->catch_lock); + } + if (slave->cstate & CS_READING) + { + spinlock_release(&slave->catch_lock); + return 1; // We cheat here and return 1 because otherwise + // an error would be sent and we do not want that + } + } + slave->cstate |= CS_READING; + spinlock_release(&slave->catch_lock); + + + do { + if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) + { + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_READING; + spinlock_release(&slave->catch_lock); + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "blr_slave_catchup failed to open binlog file %s\n", + slave->binlogfile))); + return 0; + } + atomic_add(&slave->stats.n_bursts, 1); + spinlock_acquire(&slave->catch_lock); + slave->cstate |= CS_INNERLOOP; + spinlock_release(&slave->catch_lock); + while ((!DCB_ABOVE_HIGH_WATER(slave->dcb)) && + (record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL) + { + head = gwbuf_alloc(5); + ptr = GWBUF_DATA(head); + encode_value(ptr, hdr.event_size + 1, 24); + ptr += 3; + *ptr++ = slave->seqno++; + *ptr++ = 0; // OK + head = gwbuf_append(head, record); + written = slave->dcb->func.write(slave->dcb, head); + if (written) + slave->binlog_pos = hdr.next_pos; + rval = written; + atomic_add(&slave->stats.n_events, 1); + burst++; + } + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_INNERLOOP; + spinlock_release(&slave->catch_lock); + + close(fd); + } while (record && DCB_BELOW_LOW_WATER(slave->dcb)); + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_READING; + spinlock_release(&slave->catch_lock); + if (record) + atomic_add(&slave->stats.n_flows, 1); + return rval; +} + +/** + * The DCB callback used by the slave to obtain DCB_REASON_LOW_WATER callbacks + * when the server sends all the the queue data for a DCB. This is the mechanism + * that is used to implement the flow control mechanism for the sending of + * large quantities of binlog records during the catchup process. + * + * @param dcb The DCB of the slave connection + * @param reason The reason the callback was called + * @param data The user data, in this case the server structure + */ +static int +blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data) +{ +ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data; +ROUTER_INSTANCE *router = slave->router; + + if (reason != DCB_REASON_LOW_WATER) + return 0; + + if (slave->state == BLRS_DUMPING) + { + atomic_add(&slave->stats.n_events, 1); + blr_slave_catchup(router, slave); + } +}