diff --git a/client/maxadmin.c b/client/maxadmin.c index 6a3255b19..65b7ed4a6 100644 --- a/client/maxadmin.c +++ b/client/maxadmin.c @@ -63,6 +63,7 @@ static void DoSource(int so, char *cmd); static void DoUsage(); static int isquit(char *buf); static void PrintVersion(const char *progname); +static void read_inifile(char **hostname, char **port, char **user, char **passwd); #ifdef HISTORY static char * @@ -112,6 +113,8 @@ int so; int option_index = 0; char c; + read_inifile(&hostname, &port, &user, &passwd); + while ((c = getopt_long(argc, argv, "h:p:P:u:v?", long_options, &option_index)) >= 0) @@ -240,7 +243,7 @@ char c; */ el_source(el, NULL); - while ((buf = el_gets(el, &num)) != NULL && num != 0) + while ((buf = (char *)el_gets(el, &num)) != NULL && num != 0) { #else while (printf("MaxScale> ") && fgets(buf, 1024, stdin) != NULL) @@ -562,3 +565,68 @@ char *ptr = buf; return 1; return 0; } + +/** + * Trim whitespace from the right hand end of the string + * + * @param str String to trim + */ +static void +rtrim(char *str) +{ +char *ptr = str + strlen(str); + + if (ptr > str) // step back from the terminating null + ptr--; // If the string has more characters + while (ptr >= str && isspace(*ptr)) + *ptr-- = 0; +} + +/** + * Read defaults for hostname, port, user and password from + * the .maxadmin file in the users home directory. + * + * @param hostname Pointer the hostname to be updated + * @param port Pointer to the port to be updated + * @param user Pointer to the user to be updated + * @param passwd Pointer to the password to be updated + */ +static void +read_inifile(char **hostname, char **port, char **user, char **passwd) +{ +char pathname[400]; +char *home, *brkt; +char *name, *value; +FILE *fp; +char line[400]; + + if ((home = getenv("HOME")) == NULL) + return; + snprintf(pathname, 400, "%s/.maxadmin", home); + if ((fp = fopen(pathname, "r")) == NULL) + return; + while (fgets(line, 400, fp) != NULL) + { + rtrim(line); + if (line[0] == 0) + continue; + if (line[0] == '#') + continue; + name = strtok_r(line, "=", &brkt); + value = strtok_r(NULL, "=", &brkt); + if (strcmp(name, "hostname") == 0) + *hostname = strdup(value); + else if (strcmp(name, "port") == 0) + *port = strdup(value); + else if (strcmp(name, "user") == 0) + *user = strdup(value); + else if (strcmp(name, "passwd") == 0) + *passwd = strdup(value); + else + { + fprintf(stderr, "WARNING: Unrecognised " + "parameter '%s' in .maxadmin file\n", name); + } + } + fclose(fp); +} diff --git a/server/core/hashtable.c b/server/core/hashtable.c index ab979e472..9467e454b 100644 --- a/server/core/hashtable.c +++ b/server/core/hashtable.c @@ -180,6 +180,7 @@ HASHENTRIES *entry, *ptr; } free(table->entries); + hashtable_write_unlock(table); if (!table->ht_isflat) { free(table); diff --git a/server/core/service.c b/server/core/service.c index 8432898b0..8fa4e1e89 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -1091,8 +1091,8 @@ int service_refresh_users(SERVICE *service) { if (! spinlock_acquire_nowait(&service->users_table_spin)) { LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, - "%lu [service_refresh_users] failed to get get lock for loading new users' table: another thread is loading users", - pthread_self()))); + "%s: [service_refresh_users] failed to get get lock for loading new users' table: another thread is loading users", + service->name))); return 1; } @@ -1100,12 +1100,12 @@ int service_refresh_users(SERVICE *service) { /* check if refresh rate limit has exceeded */ if ( (time(NULL) < (service->rate_limit.last + USERS_REFRESH_TIME)) || (service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME)) { + spinlock_release(&service->users_table_spin); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Refresh rate limit exceeded for load of users' table for service '%s'.", + "%s: Refresh rate limit exceeded for load of users' table.", service->name))); - spinlock_release(&service->users_table_spin); return 1; } @@ -1427,4 +1427,4 @@ void service_shutdown() svc = svc->next; } spinlock_release(&service_spin); -} \ No newline at end of file +} diff --git a/server/include/buffer.h b/server/include/buffer.h index df426baca..c0555bae4 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -168,6 +168,9 @@ typedef struct gwbuf { /*< Consume a number of bytes in the buffer */ #define GWBUF_CONSUME(b, bytes) ((b)->start = bytes > ((char *)(b)->end - (char *)(b)->start) ? (b)->end : (void *)((char *)(b)->start + (bytes))); +/*< Consume a complete buffer */ +#define GWBUF_CONSUME_ALL(b) gwbuf_consume((b), GWBUF_LENGTH((b))) + #define GWBUF_RTRIM(b, bytes) ((b)->end = bytes > ((char *)(b)->end - (char *)(b)->start) ? (b)->start : (void *)((char *)(b)->end - (bytes))); #define GWBUF_TYPE(b) (b)->gwbuf_type diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index f7b8f38ea..f8b8d3139 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -207,6 +207,7 @@ typedef struct { time_t lastReply; uint64_t n_fakeevents; /*< Fake events not written to disk */ uint64_t n_artificial; /*< Artificial events not written to disk */ + int n_badcrc; /*< No. of bad CRC's from master */ uint64_t events[0x24]; /*< Per event counters */ uint64_t lastsample; int minno; @@ -230,6 +231,7 @@ typedef struct { GWBUF *selectver; /*< select version() */ GWBUF *selectvercom; /*< select @@version_comment */ GWBUF *selecthostname;/*< select @@hostname */ + GWBUF *map; /*< select @@max_allowed_packet */ uint8_t *fde_event; /*< Format Description Event */ int fde_len; /*< Length of fde_event */ } MASTER_RESPONSES; @@ -305,17 +307,18 @@ typedef struct router_instance { #define BLRM_SELECTVER 0x000E #define BLRM_SELECTVERCOM 0x000F #define BLRM_SELECTHOSTNAME 0x0010 -#define BLRM_REGISTER 0x0011 -#define BLRM_BINLOGDUMP 0x0012 +#define BLRM_MAP 0x0011 +#define BLRM_REGISTER 0x0012 +#define BLRM_BINLOGDUMP 0x0013 -#define BLRM_MAXSTATE 0x0012 +#define BLRM_MAXSTATE 0x0013 static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval", "Server ID retrieval", "HeartBeat Period setup", "binlog checksum config", "binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval", "Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1", "select version()", "select @@version_comment", "select @@hostname", - "Register slave", "Binlog Dump" }; + "select @@mx_allowed_packet", "Register slave", "Binlog Dump" }; #define BLRS_CREATED 0x0000 #define BLRS_UNREGISTERED 0x0001 diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 43ceea176..32e385e23 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -514,6 +514,13 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) { username, stage1_hash); } + else + { + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "%s: login attempt for user %s, user not " + "found.", + dcb->service->name, username))); + } } /* Do again the database check */ diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 40c87b092..a89307794 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -165,6 +165,7 @@ createInstance(SERVICE *service, char **options) ROUTER_INSTANCE *inst; char *value, *name; int i; +unsigned char *defuuid; if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; @@ -191,6 +192,19 @@ int i; inst->binlogdir = NULL; inst->heartbeat = 300; // Default is every 5 minutes + my_uuid_init((ulong)rand()*12345,12345); + if ((defuuid = (char *)malloc(20)) != NULL) + { + int i; + my_uuid(defuuid); + if ((inst->uuid = (char *)malloc(38)) != NULL) + sprintf(inst->uuid, "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x", + defuuid[0], defuuid[1], defuuid[2], defuuid[3], + defuuid[4], defuuid[5], defuuid[6], defuuid[7], + defuuid[8], defuuid[9], defuuid[10], defuuid[11], + defuuid[12], defuuid[13], defuuid[14], defuuid[15]); + } + /* * We only support one server behind this router, since the server is * the master from which we replicate binlog records. Therefore check @@ -340,6 +354,24 @@ int i; inst->slaves = NULL; inst->next = NULL; + /* + * Read any cached response messages + */ + inst->saved_master.server_id = blr_cache_read_response(inst, "serverid"); + inst->saved_master.heartbeat = blr_cache_read_response(inst, "heartbeat"); + inst->saved_master.chksum1 = blr_cache_read_response(inst, "chksum1"); + inst->saved_master.chksum2 = blr_cache_read_response(inst, "chksum2"); + inst->saved_master.gtid_mode = blr_cache_read_response(inst, "gtidmode"); + inst->saved_master.uuid = blr_cache_read_response(inst, "uuid"); + inst->saved_master.setslaveuuid = blr_cache_read_response(inst, "ssuuid"); + inst->saved_master.setnames = blr_cache_read_response(inst, "setnames"); + inst->saved_master.utf8 = blr_cache_read_response(inst, "utf8"); + inst->saved_master.select1 = blr_cache_read_response(inst, "select1"); + inst->saved_master.selectver = blr_cache_read_response(inst, "selectver"); + inst->saved_master.selectvercom = blr_cache_read_response(inst, "selectvercom"); + inst->saved_master.selecthostname = blr_cache_read_response(inst, "selecthostname"); + inst->saved_master.map = blr_cache_read_response(inst, "map"); + /* * Initialise the binlog file and position */ @@ -702,6 +734,8 @@ struct tm tm; router_inst->stats.n_binlogs_ses); dcb_printf(dcb, "\tTotal no. of binlog events received: %u\n", router_inst->stats.n_binlogs); + dcb_printf(dcb, "\tNo. of bad CRC received from master: %u\n", + router_inst->stats.n_badcrc); minno = router_inst->stats.minno - 1; if (minno == -1) minno = 30; diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 6d9f395eb..1042fb356 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -290,7 +290,7 @@ blr_file_flush(ROUTER_INSTANCE *router) BLFILE * blr_open_binlog(ROUTER_INSTANCE *router, char *binlog) { -char *ptr, path[1024]; +char path[1024]; BLFILE *file; spinlock_acquire(&router->fileslock); @@ -613,3 +613,85 @@ struct stat statb; return statb.st_size; return 0; } + + +/** + * Write the response packet to a cache file so that MaxScale can respond + * even if there is no master running when MaxScale starts. + * + * @param router The instance of the router + * @param response The name of the response, used to name the cached file + * @param buf The buffer to written to the cache + */ +void +blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf) +{ +char path[4096], *ptr; +int fd; + + strcpy(path, "/usr/local/skysql/MaxScale"); + if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + { + strncpy(path, ptr, 4096); + } + strncat(path, "/", 4096); + strncat(path, router->service->name, 4096); + + if (access(path, R_OK) == -1) + mkdir(path, 0777); + strncat(path, "/.cache", 4096); + if (access(path, R_OK) == -1) + mkdir(path, 0777); + strncat(path, "/", 4096); + strncat(path, response, 4096); + + if ((fd = open(path, O_WRONLY|O_CREAT|O_TRUNC, 0666)) == -1) + return; + write(fd, GWBUF_DATA(buf), GWBUF_LENGTH(buf)); + close(fd); +} + +/** + * Read a cached copy of a master response message. This allows + * the router to start and serve any binlogs it already has on disk + * if the master is not available. + * + * @param router The router instance structure + * @param response The name of the response + * @return A pointer to a GWBUF structure + */ +GWBUF * +blr_cache_read_response(ROUTER_INSTANCE *router, char *response) +{ +struct stat statb; +char path[4096], *ptr; +int fd; +GWBUF *buf; + + strcpy(path, "/usr/local/skysql/MaxScale"); + if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + { + strncpy(path, ptr, 4096); + } + strncat(path, "/", 4096); + strncat(path, router->service->name, 4096); + strncat(path, "/.cache/", 4096); + strncat(path, response, 4096); + + if ((fd = open(path, O_RDONLY)) == -1) + return NULL; + + if (fstat(fd, &statb) != 0) + { + close(fd); + return NULL; + } + if ((buf = gwbuf_alloc(statb.st_size)) == NULL) + { + close(fd); + return NULL; + } + read(fd, GWBUF_DATA(buf), statb.st_size); + close(fd); + return buf; +} diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 17e6f3ce0..ac216c665 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -48,6 +48,7 @@ #include #include #include +#include #include #include @@ -104,6 +105,15 @@ GWBUF *buf; return; } router->master_state = BLRM_CONNECTING; + + /* Discard the queued residual data */ + buf = router->residual; + while (buf) + { + buf = gwbuf_consume(buf, GWBUF_LENGTH(buf)); + } + router->residual = NULL; + spinlock_release(&router->lock); if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL) { @@ -141,7 +151,7 @@ GWBUF *buf; router->master->remote = strdup(router->service->dbref->server->name); LOGIF(LM,(skygw_log_write( LOGFILE_MESSAGE, - "%s: atempting to connect to master server %s.", + "%s: attempting to connect to master server %s.", router->service->name, router->master->remote))); router->connect_time = time(0); @@ -361,7 +371,10 @@ char query[128]; break; case BLRM_SERVERID: // Response to fetch of master's server-id + if (router->saved_master.server_id) + GWBUF_CONSUME_ALL(router->saved_master.server_id); router->saved_master.server_id = buf; + blr_cache_response(router, "serverid", buf); // TODO: Extract the value of server-id and place in router->master_id { char str[80]; @@ -373,35 +386,50 @@ char query[128]; break; case BLRM_HBPERIOD: // Response to set the heartbeat period + if (router->saved_master.heartbeat) + GWBUF_CONSUME_ALL(router->saved_master.heartbeat); router->saved_master.heartbeat = buf; + blr_cache_response(router, "heartbeat", buf); buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); router->master_state = BLRM_CHKSUM1; router->master->func.write(router->master, buf); break; case BLRM_CHKSUM1: // Response to set the master binlog checksum + if (router->saved_master.chksum1) + GWBUF_CONSUME_ALL(router->saved_master.chksum1); router->saved_master.chksum1 = buf; + blr_cache_response(router, "chksum1", buf); buf = blr_make_query("SELECT @master_binlog_checksum"); router->master_state = BLRM_CHKSUM2; router->master->func.write(router->master, buf); break; case BLRM_CHKSUM2: // Response to the master_binlog_checksum, should be stored + if (router->saved_master.chksum2) + GWBUF_CONSUME_ALL(router->saved_master.chksum2); router->saved_master.chksum2 = buf; + blr_cache_response(router, "chksum2", buf); buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); router->master_state = BLRM_GTIDMODE; router->master->func.write(router->master, buf); break; case BLRM_GTIDMODE: // Response to the GTID_MODE, should be stored + if (router->saved_master.gtid_mode) + GWBUF_CONSUME_ALL(router->saved_master.gtid_mode); router->saved_master.gtid_mode = buf; + blr_cache_response(router, "gtidmode", buf); buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); router->master_state = BLRM_MUUID; router->master->func.write(router->master, buf); break; case BLRM_MUUID: // Response to the SERVER_UUID, should be stored + if (router->saved_master.uuid) + GWBUF_CONSUME_ALL(router->saved_master.uuid); router->saved_master.uuid = buf; + blr_cache_response(router, "uuid", buf); sprintf(query, "SET @slave_uuid='%s'", router->uuid); buf = blr_make_query(query); router->master_state = BLRM_SUUID; @@ -409,49 +437,80 @@ char query[128]; break; case BLRM_SUUID: // Response to the SET @server_uuid, should be stored + if (router->saved_master.setslaveuuid) + GWBUF_CONSUME_ALL(router->saved_master.setslaveuuid); router->saved_master.setslaveuuid = buf; + blr_cache_response(router, "ssuuid", buf); buf = blr_make_query("SET NAMES latin1"); router->master_state = BLRM_LATIN1; router->master->func.write(router->master, buf); break; case BLRM_LATIN1: // Response to the SET NAMES latin1, should be stored + if (router->saved_master.setnames) + GWBUF_CONSUME_ALL(router->saved_master.setnames); router->saved_master.setnames = buf; + blr_cache_response(router, "setnames", buf); buf = blr_make_query("SET NAMES utf8"); router->master_state = BLRM_UTF8; router->master->func.write(router->master, buf); break; case BLRM_UTF8: // Response to the SET NAMES utf8, should be stored + if (router->saved_master.utf8) + GWBUF_CONSUME_ALL(router->saved_master.utf8); router->saved_master.utf8 = buf; + blr_cache_response(router, "utf8", buf); buf = blr_make_query("SELECT 1"); router->master_state = BLRM_SELECT1; router->master->func.write(router->master, buf); break; case BLRM_SELECT1: // Response to the SELECT 1, should be stored + if (router->saved_master.select1) + GWBUF_CONSUME_ALL(router->saved_master.select1); router->saved_master.select1 = buf; + blr_cache_response(router, "select1", buf); buf = blr_make_query("SELECT VERSION();"); router->master_state = BLRM_SELECTVER; router->master->func.write(router->master, buf); break; case BLRM_SELECTVER: // Response to SELECT VERSION should be stored + if (router->saved_master.selectver) + GWBUF_CONSUME_ALL(router->saved_master.selectver); router->saved_master.selectver = buf; + blr_cache_response(router, "selectver", buf); buf = blr_make_query("SELECT @@version_comment limit 1;"); router->master_state = BLRM_SELECTVERCOM; router->master->func.write(router->master, buf); break; case BLRM_SELECTVERCOM: // Response to SELECT @@version_comment should be stored + if (router->saved_master.selectvercom) + GWBUF_CONSUME_ALL(router->saved_master.selectvercom); router->saved_master.selectvercom = buf; + blr_cache_response(router, "selectvercom", buf); buf = blr_make_query("SELECT @@hostname;"); router->master_state = BLRM_SELECTHOSTNAME; router->master->func.write(router->master, buf); break; case BLRM_SELECTHOSTNAME: // Response to SELECT @@hostname should be stored + if (router->saved_master.selecthostname) + GWBUF_CONSUME_ALL(router->saved_master.selecthostname); router->saved_master.selecthostname = buf; + blr_cache_response(router, "selecthostname", buf); + buf = blr_make_query("SELECT @@max_allowed_packet;"); + router->master_state = BLRM_MAP; + router->master->func.write(router->master, buf); + break; + case BLRM_MAP: + // Response to SELECT @@max_allowed_packet should be stored + if (router->saved_master.map) + GWBUF_CONSUME_ALL(router->saved_master.map); + router->saved_master.map = buf; + blr_cache_response(router, "map", buf); buf = blr_make_registration(router); router->master_state = BLRM_REGISTER; router->master->func.write(router->master, buf); @@ -622,6 +681,11 @@ static REP_HEADER phdr; } pkt_length = gwbuf_length(pkt); + /* + * Loop over all the packets while we still have some data + * and the packet length is enough to hold a replication event + * header. + */ while (pkt && pkt_length > 24) { reslen = GWBUF_LENGTH(pkt); @@ -649,6 +713,7 @@ static REP_HEADER phdr; { len = EXTRACT24(pdata) + 4; } + /* len is now the payload length for the packet we are working on */ if (reslen < len && pkt_length >= len) { @@ -728,10 +793,17 @@ static REP_HEADER phdr; n_bufs = 1; } + /* + * ptr now points at the current message in a contiguous buffer, + * this buffer is either within the GWBUF or in a malloc'd + * copy if the message straddles GWBUF's. + */ + if (len < BINLOG_EVENT_HDR_LEN) { char *msg = ""; + /* Packet is too small to be a binlog event */ if (ptr[4] == 0xfe) /* EOF Packet */ { msg = "end of file"; @@ -753,7 +825,7 @@ static REP_HEADER phdr; blr_extract_header(ptr, &hdr); - if (hdr.event_size != len - 5) + if (hdr.event_size != len - 5) /* Sanity check */ { LOGIF(LE,(skygw_log_write( LOGFILE_ERROR, @@ -784,6 +856,35 @@ static REP_HEADER phdr; phdr = hdr; if (hdr.ok == 0) { + /* + * First check that the checksum we calculate matches the + * checksum in the packet we received. + */ + uint32_t chksum, pktsum; + + chksum = crc32(0L, NULL, 0); + chksum = crc32(chksum, ptr + 5, hdr.event_size - 4); + pktsum = EXTRACT32(ptr + hdr.event_size + 1); + if (pktsum != chksum) + { + router->stats.n_badcrc++; + if (msg) + { + free(msg); + msg = NULL; + } + LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, + "%s: Checksum error in event " + "from master, " + "binlog %s @ %d. " + "Closing master connection.", + router->service->name, + router->binlog_name, + router->binlog_position))); + blr_master_close(router); + blr_master_delayed_connect(router); + return; + } router->stats.n_binlogs++; router->lastEventReceived = hdr.event_type; diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index a98de60f7..dba0f7cde 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -52,6 +52,7 @@ #include #include #include +#include static uint32_t extract_field(uint8_t *src, int bits); static void encode_value(unsigned char *data, unsigned int value, int len); @@ -66,6 +67,13 @@ uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static int blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static int blr_slave_send_maxscale_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static int blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static int blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static int blr_slave_send_fieldcount(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int count); +static int blr_slave_send_columndef(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno); +static int blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno); extern int lm_enabled_logfiles_bitmask; extern size_t log_ses_count[]; @@ -141,7 +149,11 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) * when MaxScale registered as a slave. The exception to the rule is the * request to obtain the current timestamp value of the server. * - * Seven select statements are currently supported: + * The original set added for the registration process has been enhanced in + * order to support some commands that are useful for monitoring the binlog + * router. + * + * Eight select statements are currently supported: * SELECT UNIX_TIMESTAMP(); * SELECT @master_binlog_checksum * SELECT @@GLOBAL.GTID_MODE @@ -149,10 +161,15 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) * SELECT 1 * SELECT @@version_comment limit 1 * SELECT @@hostname + * SELECT @@max_allowed_packet + * SELECT @@maxscale_version * - * Two show commands are supported: + * Five show commands are supported: * SHOW VARIABLES LIKE 'SERVER_ID' * SHOW VARIABLES LIKE 'SERVER_UUID' + * SHOW VARIABLES LIKE 'MAXSCALE% + * SHOW MASTER STATUS + * SHOW SLAVE HOSTS * * Five set commands are supported: * SET @master_binlog_checksum = @@global.binlog_checksum @@ -189,11 +206,20 @@ int query_len; * own interaction with the real master. We simply replay these saved responses * to the slave. */ - word = strtok_r(query_text, sep, &brkb); - if (strcasecmp(word, "SELECT") == 0) + if ((word = strtok_r(query_text, sep, &brkb)) == NULL) { - word = strtok_r(NULL, sep, &brkb); - if (strcasecmp(word, "UNIX_TIMESTAMP()") == 0) + + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete query.", + router->service->name))); + } + else if (strcasecmp(word, "SELECT") == 0) + { + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete select query.", + router->service->name))); + } + else if (strcasecmp(word, "UNIX_TIMESTAMP()") == 0) { free(query_text); return blr_slave_send_timestamp(router, slave); @@ -228,17 +254,41 @@ int query_len; free(query_text); return blr_slave_replay(router, slave, router->saved_master.selecthostname); } + else if (strcasecmp(word, "@@max_allowed_packet") == 0) + { + free(query_text); + return blr_slave_replay(router, slave, router->saved_master.map); + } + else if (strcasecmp(word, "@@maxscale_version") == 0) + { + free(query_text); + return blr_slave_send_maxscale_version(router, slave); + } } else if (strcasecmp(word, "SHOW") == 0) { - word = strtok_r(NULL, sep, &brkb); - if (strcasecmp(word, "VARIABLES") == 0) + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) { - word = strtok_r(NULL, sep, &brkb); - if (strcasecmp(word, "LIKE") == 0) + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete show query.", + router->service->name))); + } + else if (strcasecmp(word, "VARIABLES") == 0) + { + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) { - word = strtok_r(NULL, sep, &brkb); - if (strcasecmp(word, "'SERVER_ID'") == 0) + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "%s: Expected LIKE clause in SHOW VARIABLES.", + router->service->name))); + } + else if (strcasecmp(word, "LIKE") == 0) + { + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "%s: Missing LIKE clause in SHOW VARIABLES.", + router->service->name))); + } + else if (strcasecmp(word, "'SERVER_ID'") == 0) { free(query_text); return blr_slave_replay(router, slave, router->saved_master.server_id); @@ -248,13 +298,55 @@ int query_len; free(query_text); return blr_slave_replay(router, slave, router->saved_master.uuid); } + else if (strcasecmp(word, "'MAXSCALE%'") == 0) + { + free(query_text); + return blr_slave_send_maxscale_variables(router, slave); + } + } + } + else if (strcasecmp(word, "MASTER") == 0) + { + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "%s: Expected SHOW MASTER STATUS command", + router->service->name))); + } + else if (strcasecmp(word, "STATUS") == 0) + { + free(query_text); + return blr_slave_send_master_status(router, slave); + } + } + else if (strcasecmp(word, "SLAVE") == 0) + { + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "%s: Expected SHOW MASTER STATUS command", + router->service->name))); + } + else if (strcasecmp(word, "STATUS") == 0) + { + free(query_text); + return blr_slave_send_master_status(router, slave); + } + else if (strcasecmp(word, "HOSTS") == 0) + { + free(query_text); + return blr_slave_send_slave_hosts(router, slave); } } } else if (strcasecmp(query_text, "SET") == 0) { - word = strtok_r(NULL, sep, &brkb); - if (strcasecmp(word, "@master_heartbeat_period") == 0) + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Incomplete set command.", + router->service->name))); + } + else if (strcasecmp(word, "@master_heartbeat_period") == 0) { free(query_text); return blr_slave_replay(router, slave, router->saved_master.heartbeat); @@ -262,7 +354,7 @@ int query_len; else if (strcasecmp(word, "@master_binlog_checksum") == 0) { word = strtok_r(NULL, sep, &brkb); - if (strcasecmp(word, "'none'") == 0) + if (word && (strcasecmp(word, "'none'") == 0)) slave->nocrc = 1; else slave->nocrc = 0; @@ -278,8 +370,12 @@ int query_len; } else if (strcasecmp(word, "NAMES") == 0) { - word = strtok_r(NULL, sep, &brkb); - if (strcasecmp(word, "latin1") == 0) + if ((word = strtok_r(NULL, sep, &brkb)) == NULL) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "%s: Truncated SET NAMES command.", + router->service->name))); + } + else if (strcasecmp(word, "latin1") == 0) { free(query_text); return blr_slave_replay(router, slave, router->saved_master.setnames); @@ -412,6 +508,203 @@ int len, ts_len; return slave->dcb->func.write(slave->dcb, pkt); } +/** + * Send a response the the SQL command SELECT @@MAXSCALE_VERSION + * + * @param router The binlog router instance + * @param slave The slave server to which we are sending the response + * @return Non-zero if data was sent + */ +static int +blr_slave_send_maxscale_version(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +GWBUF *pkt; +char version[40]; +uint8_t *ptr; +int len, vers_len; + + sprintf(version, "%s", MAXSCALE_VERSION); + vers_len = strlen(version); + blr_slave_send_fieldcount(router, slave, 1); + blr_slave_send_columndef(router, slave, "MAXSCALE_VERSION", 0xf, vers_len, 2); + blr_slave_send_eof(router, slave, 3); + + len = 5 + vers_len; + if ((pkt = gwbuf_alloc(len)) == NULL) + return 0; + ptr = GWBUF_DATA(pkt); + encode_value(ptr, vers_len + 1, 24); // Add length of data packet + ptr += 3; + *ptr++ = 0x04; // Sequence number in response + *ptr++ = vers_len; // Length of result string + strncpy((char *)ptr, version, vers_len); // Result string + ptr += vers_len; + slave->dcb->func.write(slave->dcb, pkt); + return blr_slave_send_eof(router, slave, 5); +} + + +/** + * Send the response to the SQL command "SHOW VARIABLES LIKE 'MAXSCALE%' + * + * @param router The binlog router instance + * @param slave The slave server to which we are sending the response + * @return Non-zero if data was sent + */ +static int +blr_slave_send_maxscale_variables(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +GWBUF *pkt; +char name[40]; +char version[40]; +uint8_t *ptr; +int len, vers_len, seqno = 2; + + blr_slave_send_fieldcount(router, slave, 2); + blr_slave_send_columndef(router, slave, "Variable_name", 0xf, 40, seqno++); + blr_slave_send_columndef(router, slave, "value", 0xf, 40, seqno++); + blr_slave_send_eof(router, slave, seqno++); + + sprintf(version, "%s", MAXSCALE_VERSION); + vers_len = strlen(version); + strcpy(name, "MAXSCALE_VERSION"); + len = 5 + vers_len + strlen(name) + 1; + if ((pkt = gwbuf_alloc(len)) == NULL) + return 0; + ptr = GWBUF_DATA(pkt); + encode_value(ptr, vers_len + 2 + strlen(name), 24); // Add length of data packet + ptr += 3; + *ptr++ = seqno++; // Sequence number in response + *ptr++ = strlen(name); // Length of result string + strncpy((char *)ptr, name, strlen(name)); // Result string + ptr += strlen(name); + *ptr++ = vers_len; // Length of result string + strncpy((char *)ptr, version, vers_len); // Result string + ptr += vers_len; + slave->dcb->func.write(slave->dcb, pkt); + + return blr_slave_send_eof(router, slave, seqno++); +} + + +/** + * Send the response to the SQL command "SHOW MASTER STATUS" + * + * @param router The binlog router instance + * @param slave The slave server to which we are sending the response + * @return Non-zero if data was sent + */ +static int +blr_slave_send_master_status(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +GWBUF *pkt; +char file[40]; +char position[40]; +uint8_t *ptr; +int len, file_len; + + blr_slave_send_fieldcount(router, slave, 5); + blr_slave_send_columndef(router, slave, "File", 0xf, 40, 2); + blr_slave_send_columndef(router, slave, "Position", 0xf, 40, 3); + blr_slave_send_columndef(router, slave, "Binlog_Do_DB", 0xf, 40, 4); + blr_slave_send_columndef(router, slave, "Binlog_Ignore_DB", 0xf, 40, 5); + blr_slave_send_columndef(router, slave, "Execute_Gtid_Set", 0xf, 40, 6); + blr_slave_send_eof(router, slave, 7); + + sprintf(file, "%s", router->binlog_name); + file_len = strlen(file); + sprintf(position, "%d", router->binlog_position); + len = 5 + file_len + strlen(position) + 1 + 3; + if ((pkt = gwbuf_alloc(len)) == NULL) + return 0; + ptr = GWBUF_DATA(pkt); + encode_value(ptr, len - 4, 24); // Add length of data packet + ptr += 3; + *ptr++ = 0x08; // Sequence number in response + *ptr++ = strlen(file); // Length of result string + strncpy((char *)ptr, file, strlen(file)); // Result string + ptr += strlen(file); + *ptr++ = strlen(position); // Length of result string + strncpy((char *)ptr, position, strlen(position)); // Result string + ptr += strlen(position); + *ptr++ = 0; // Send 3 empty values + *ptr++ = 0; + *ptr++ = 0; + slave->dcb->func.write(slave->dcb, pkt); + return blr_slave_send_eof(router, slave, 9); +} + +/** + * Send the response to the SQL command "SHOW SLAVE HOSTS" + * + * @param router The binlog router instance + * @param slave The slave server to which we are sending the response + * @return Non-zero if data was sent + */ +static int +blr_slave_send_slave_hosts(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +GWBUF *pkt; +char server_id[40]; +char host[40]; +char port[40]; +char master_id[40]; +char slave_uuid[40]; +uint8_t *ptr; +int len, seqno; +ROUTER_SLAVE *sptr; + + blr_slave_send_fieldcount(router, slave, 5); + blr_slave_send_columndef(router, slave, "Server_id", 0xf, 40, 2); + blr_slave_send_columndef(router, slave, "Host", 0xf, 40, 3); + blr_slave_send_columndef(router, slave, "Port", 0xf, 40, 4); + blr_slave_send_columndef(router, slave, "Master_id", 0xf, 40, 5); + blr_slave_send_columndef(router, slave, "Slave_UUID", 0xf, 40, 6); + blr_slave_send_eof(router, slave, 7); + + seqno = 8; + spinlock_acquire(&router->lock); + sptr = router->slaves; + while (sptr) + { + if (sptr->state != 0) + { + sprintf(server_id, "%d", sptr->serverid); + sprintf(host, "%s", sptr->hostname ? sptr->hostname : ""); + sprintf(port, "%d", sptr->port); + sprintf(master_id, "%d", router->serverid); + sprintf(slave_uuid, "%s", sptr->uuid); + len = 5 + strlen(server_id) + strlen(host) + strlen(port) + + strlen(master_id) + strlen(slave_uuid) + 5; + if ((pkt = gwbuf_alloc(len)) == NULL) + return 0; + ptr = GWBUF_DATA(pkt); + encode_value(ptr, len - 4, 24); // Add length of data packet + ptr += 3; + *ptr++ = seqno++; // Sequence number in response + *ptr++ = strlen(server_id); // Length of result string + strncpy((char *)ptr, server_id, strlen(server_id)); // Result string + ptr += strlen(server_id); + *ptr++ = strlen(host); // Length of result string + strncpy((char *)ptr, host, strlen(host)); // Result string + ptr += strlen(host); + *ptr++ = strlen(port); // Length of result string + strncpy((char *)ptr, port, strlen(port)); // Result string + ptr += strlen(port); + *ptr++ = strlen(master_id); // Length of result string + strncpy((char *)ptr, master_id, strlen(master_id)); // Result string + ptr += strlen(master_id); + *ptr++ = strlen(slave_uuid); // Length of result string + strncpy((char *)ptr, slave_uuid, strlen(slave_uuid)); // Result string + ptr += strlen(slave_uuid); + slave->dcb->func.write(slave->dcb, pkt); + } + sptr = sptr->next; + } + spinlock_release(&router->lock); + return blr_slave_send_eof(router, slave, seqno); +} + /** * Process a slave replication registration message. * @@ -685,7 +978,7 @@ uint8_t *ptr; * call. The paramter "long" control the number of events in the burst. The * short burst is intended to be used when the master receive an event and * needs to put the slave into catchup mode. This prevents the slave taking - * too much tiem away from the thread that is processing the master events. + * too much time away from the thread that is processing the master events. * * At the end of the burst a fake EPOLLOUT event is added to the poll event * queue. This ensures that the slave callback for processing DCB write drain @@ -880,9 +1173,10 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write( * we ignore these issues during the rotate processing. */ LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Slave reached end of file for binlong file %s at %u " + "Slave reached end of file for binlog file %s at %u " "which is not the file currently being downloaded. " - "Master binlog is %s, %lu.", + "Master binlog is %s, %lu. This may be caused by a " + "previous failure of the master.", slave->binlogfile, slave->binlog_pos, router->binlog_name, router->binlog_position))); if (blr_slave_fake_rotate(router, slave)) @@ -1008,11 +1302,7 @@ uint32_t chksum; return 0; binlognamelen = strlen(slave->binlogfile); - - if (slave->nocrc) - len = 19 + 8 + binlognamelen; - else - len = 19 + 8 + 4 + binlognamelen; + len = 19 + 8 + 4 + binlognamelen; // Build a fake rotate event resp = gwbuf_alloc(len + 5); @@ -1031,20 +1321,17 @@ uint32_t chksum; memcpy(ptr, slave->binlogfile, binlognamelen); ptr += binlognamelen; - if (!slave->nocrc) - { - /* - * Now add the CRC to the fake binlog rotate event. - * - * The algorithm is first to compute the checksum of an empty buffer - * and then the checksum of the event portion of the message, ie we do not - * include the length, sequence number and ok byte that makes up the first - * 5 bytes of the message. We also do not include the 4 byte checksum itself. - */ - chksum = crc32(0L, NULL, 0); - chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4); - encode_value(ptr, chksum, 32); - } + /* + * Now add the CRC to the fake binlog rotate event. + * + * The algorithm is first to compute the checksum of an empty buffer + * and then the checksum of the event portion of the message, ie we do not + * include the length, sequence number and ok byte that makes up the first + * 5 bytes of the message. We also do not include the 4 byte checksum itself. + */ + chksum = crc32(0L, NULL, 0); + chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4); + encode_value(ptr, chksum, 32); slave->dcb->func.write(slave->dcb, resp); return 1; @@ -1153,7 +1440,7 @@ uint8_t *ptr; *ptr++ = 'e'; *ptr++ = 'f'; *ptr++ = 0; // Schema name length - *ptr++ = 0; // virtal table name length + *ptr++ = 0; // virtual table name length *ptr++ = 0; // Table name length *ptr++ = strlen(name); // Column name length; while (*name) @@ -1175,3 +1462,31 @@ uint8_t *ptr; *ptr++= 0; return slave->dcb->func.write(slave->dcb, pkt); } + + +/** + * Send an EOF packet in a response packet sequence. + * + * @param router The router + * @param slave The slave connection + * @param seqno The sequence number of the EOF packet + * @return Non-zero on success + */ +static int +blr_slave_send_eof(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, int seqno) +{ +GWBUF *pkt; +uint8_t *ptr; + + if ((pkt = gwbuf_alloc(9)) == NULL) + return 0; + ptr = GWBUF_DATA(pkt); + encode_value(ptr, 5, 24); // Add length of data packet + ptr += 3; + *ptr++ = seqno; // Sequence number in response + *ptr++ = 0xfe; // Length of result string + encode_value(ptr, 0, 16); // No errors + ptr += 2; + encode_value(ptr, 2, 16); // Autocommit enabled + return slave->dcb->func.write(slave->dcb, pkt); +}