diff --git a/Documentation/MaxScale Configuration And Usage Scenarios.pdf b/Documentation/MaxScale Configuration And Usage Scenarios.pdf index a2ad20aef..e5b649413 100644 Binary files a/Documentation/MaxScale Configuration And Usage Scenarios.pdf and b/Documentation/MaxScale Configuration And Usage Scenarios.pdf differ diff --git a/log_manager/CMakeLists.txt b/log_manager/CMakeLists.txt index 1cbe6cd87..fdef33f6c 100644 --- a/log_manager/CMakeLists.txt +++ b/log_manager/CMakeLists.txt @@ -1,3 +1,6 @@ +if(LOG_DEBUG) + add_definitions(-DSS_LOG_DEBUG) +endif() add_library(log_manager SHARED log_manager.cc) target_link_libraries(log_manager pthread aio stdc++) install(TARGETS log_manager DESTINATION lib) diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index 230e44804..8dfa50fbd 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -1093,7 +1093,7 @@ static char* blockbuf_get_writepos( simple_mutex_unlock(&bb->bb_mutex); simple_mutex_lock(&bb_list->mlist_mutex, true); - + node = bb_list->mlist_first; } else { @@ -3111,9 +3111,9 @@ static int find_last_seqno( { if (snstr != NULL && i == seqnoidx) { - strcat(filename, snstr); /*< add sequence number */ + strncat(filename, snstr, NAME_MAX - 1); /*< add sequence number */ } - strcat(filename, p->sp_string); + strncat(filename, p->sp_string, NAME_MAX - 1); if (p->sp_next == NULL) { diff --git a/server/core/dbusers.c b/server/core/dbusers.c index 2b000d57a..17f5ecbd6 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -32,11 +32,13 @@ * x.y.z.%, x.y.%.%, x.%.%.% * 03/10/14 Massimiliano Pinto Added netmask to user@host authentication for wildcard in IPv4 hosts * 13/10/14 Massimiliano Pinto Added (user@host)@db authentication + * 04/12/14 Massimiliano Pinto Added support for IPv$ wildcard hosts: a.%, a.%.% and a.b.% * * @endverbatim */ #include +#include #include #include @@ -82,6 +84,7 @@ void resource_free(HASHTABLE *resource); void *resource_fetch(HASHTABLE *, char *); int resource_add(HASHTABLE *, char *, char *); int resource_hash(char *); +static int normalize_hostname(char *input_host, char *output_host); /** * Load the user/passwd form mysql.user table into the service users' hashtable @@ -217,8 +220,6 @@ int add_mysql_users_with_host_ipv4(USERS *users, char *user, char *host, char *p struct sockaddr_in serv_addr; MYSQL_USER_HOST key; char ret_ip[INET_ADDRSTRLEN + 1]=""; - int found_range=0; - int found_any=0; int ret = 0; if (users == NULL || user == NULL || host == NULL) { @@ -255,42 +256,30 @@ int add_mysql_users_with_host_ipv4(USERS *users, char *user, char *host, char *p /* ANY */ if (strcmp(host, "%") == 0) { strcpy(ret_ip, "0.0.0.0"); - found_any = 1; + key.netmask = 0; } else { - char *tmp; - strncpy(ret_ip, host, INET_ADDRSTRLEN); - tmp = ret_ip+strlen(ret_ip)-1; + /* hostname without % wildcards has netmask = 32 */ + key.netmask = normalize_hostname(host, ret_ip); - /* start from Class C */ - - while(tmp > ret_ip) { - if (*tmp == '%') { - /* set only the last IPv4 byte to 1 - * avoiding setipadress() failure - * for Class C address - */ - found_range++; - if (found_range == 1) - *tmp = '1'; - else - *tmp = '0'; - } - tmp--; + if (key.netmask == -1) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : strdup() failed in normalize_hostname for %s@%s", + user, + host))); } } /* fill IPv4 data struct */ - if (setipaddress(&serv_addr.sin_addr, ret_ip)) { + if (setipaddress(&serv_addr.sin_addr, ret_ip) && strlen(ret_ip)) { /* copy IPv4 data into key.ipv4 */ memcpy(&key.ipv4, &serv_addr, sizeof(serv_addr)); - if (found_range) { - /* let's zero the last IP byte: a.b.c.0 we set above to 1*/ + /* if netmask < 32 there are % wildcards */ + if (key.netmask < 32) { + /* let's zero the last IP byte: a.b.c.0 we may have set above to 1*/ key.ipv4.sin_addr.s_addr &= 0x00FFFFFF; - key.netmask = 32 - (found_range * 8); - } else { - key.netmask = 32 - (found_any * 32); } /* add user@host as key and passwd as value in the MySQL users hash table */ @@ -1120,3 +1109,87 @@ resource_fetch(HASHTABLE *resources, char *key) return hashtable_fetch(resources, key); } +/** + * Normalize hostname with % wildcards to a valid IP string. + * + * Valid input values: + * a.b.c.d, a.b.c.%, a.b.%.%, a.%.%.% + * Short formats a.% and a.%.% are both converted to a.%.%.% + * Short format a.b.% is converted to a.b.%.% + * + * Last host byte is set to 1, avoiding setipadress() failure + * + * @param input_host The hostname with possible % wildcards + * @param output_host The normalized hostname (buffer must be preallocated) + * @return The calculated netmask or -1 on failure + */ +static int normalize_hostname(char *input_host, char *output_host) +{ +int netmask, bytes, bits = 0, found_wildcard = 0; +char *p, *lasts, *tmp; +int useorig = 0; + + output_host[0] = 0; + bytes = 0; + + tmp = strdup(input_host); + + if (tmp == NULL) { + return -1; + } + + p = strtok_r(tmp, ".", &lasts); + while (p != NULL) + { + + if (strcmp(p, "%")) + { + if (! isdigit(*p)) + useorig = 1; + + strcat(output_host, p); + bits += 8; + } + else if (bytes == 3) + { + found_wildcard = 1; + strcat(output_host, "1"); + } + else + { + found_wildcard = 1; + strcat(output_host, "0"); + } + bytes++; + p = strtok_r(NULL, ".", &lasts); + if (p) + strcat(output_host, "."); + } + if (found_wildcard) + { + netmask = bits; + while (bytes++ < 4) + { + if (bytes == 4) + { + strcat(output_host, ".1"); + } + else + { + strcat(output_host, ".0"); + } + } + } + else + netmask = 32; + + if (useorig == 1) + { + netmask = 32; + strcpy(output_host, input_host); + } + + free(tmp); + + return netmask; +} diff --git a/server/core/filter.c b/server/core/filter.c index 975386f2b..c697a264e 100644 --- a/server/core/filter.c +++ b/server/core/filter.c @@ -332,8 +332,7 @@ DOWNSTREAM *me; if ((filter->obj = load_module(filter->module, MODULE_FILTER)) == NULL) { - me = NULL; - goto retblock; + return NULL; } } @@ -342,8 +341,7 @@ DOWNSTREAM *me; if ((filter->filter = (filter->obj->createInstance)(filter->options, filter->parameters)) == NULL) { - me = NULL; - goto retblock; + return NULL; } } if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL) @@ -355,7 +353,7 @@ DOWNSTREAM *me; errno, strerror(errno)))); - goto retblock; + return NULL; } me->instance = filter->filter; me->routeQuery = (void *)(filter->obj->routeQuery); @@ -363,12 +361,10 @@ DOWNSTREAM *me; if ((me->session=filter->obj->newSession(me->instance, session)) == NULL) { free(me); - me = NULL; - goto retblock; + return NULL; } filter->obj->setDownstream(me->instance, me->session, downstream); -retblock: return me; } diff --git a/server/core/gateway.c b/server/core/gateway.c index 379002521..45f493adc 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -637,55 +637,63 @@ static bool resolve_maxscale_homedir( } check_home_dir: + if (*p_home_dir != NULL) + { + if (!file_is_readable(*p_home_dir)) + { + char* tailstr = "MaxScale doesn't have read permission " + "to MAXSCALE_HOME."; + char* logstr = (char*)malloc(strlen(log_context)+ + 1+ + strlen(tailstr)+ + 1); + snprintf(logstr, + strlen(log_context)+ + 1+ + strlen(tailstr)+1, + "%s:%s", + log_context, + tailstr); + print_log_n_stderr(true, true, logstr, logstr, 0); + free(logstr); + goto return_succp; + } + +#if WRITABLE_HOME + if (!file_is_writable(*p_home_dir)) + { + char* tailstr = "MaxScale doesn't have write permission " + "to MAXSCALE_HOME. Exiting."; + char* logstr = (char*)malloc(strlen(log_context)+ + 1+ + strlen(tailstr)+ + 1); + snprintf(logstr, + strlen(log_context)+ + 1+ + strlen(tailstr)+1, + "%s:%s", + log_context, + tailstr); + print_log_n_stderr(true, true, logstr, logstr, 0); + free(logstr); + goto return_succp; + } +#endif + if (!daemon_mode) + { + fprintf(stderr, + "Using %s as MAXSCALE_HOME = %s\n", + log_context, + tmp); + } + succp = true; + goto return_succp; + } + +return_succp: + free (tmp); - if (*p_home_dir != NULL) - { - char* errstr; - - errstr = check_dir_access(*p_home_dir); - - if (errstr != NULL) - { - char* logstr = (char*)malloc(strlen(log_context)+ - 1+ - strlen(errstr)+ - 1); - - snprintf(logstr, - strlen(log_context)+ - 1+ - strlen(errstr)+1, - "%s: %s", - log_context, - errstr); - - print_log_n_stderr(true, true, logstr, logstr, 0); - - free(errstr); - free(logstr); - succp = false; - } - else - { - succp = true; - - if (!daemon_mode) - { - fprintf(stderr, - "Using %s as MAXSCALE_HOME = %s\n", - log_context, - (tmp == NULL ? *p_home_dir : tmp)); - } - } - } - else - { - succp = false; - } - if (tmp != NULL) - { - free(tmp); - } if (log_context != NULL) { diff --git a/server/core/modutil.c b/server/core/modutil.c index b9a7aab1c..21978a740 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -293,7 +293,7 @@ GWBUF *modutil_create_mysql_err_msg( const char *msg) { uint8_t *outbuf = NULL; - uint8_t mysql_payload_size = 0; + uint32_t mysql_payload_size = 0; uint8_t mysql_packet_header[4]; uint8_t *mysql_payload = NULL; uint8_t field_count = 0; diff --git a/server/core/secrets.c b/server/core/secrets.c index 4f98e81f7..57325a5ff 100644 --- a/server/core/secrets.c +++ b/server/core/secrets.c @@ -227,8 +227,9 @@ static int reported = 0; */ int secrets_writeKeys(char *secret_file) { -int fd; -MAXKEYS key; +int fd,randfd; +unsigned int randval; +MAXKEYS key; /* Open for writing | Create | Truncate the file for writing */ if ((fd = open(secret_file, O_CREAT | O_WRONLY | O_TRUNC, S_IRUSR)) < 0) @@ -243,7 +244,28 @@ MAXKEYS key; return 1; } - srand(time(NULL)); + /* Open for writing | Create | Truncate the file for writing */ + if ((randfd = open("/dev/random", O_RDONLY)) < 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : failed opening /dev/random. Error %d, %s.", + errno, + strerror(errno)))); + return 1; + } + + if(read(randfd,(void*)&randval,sizeof(unsigned int)) < 1) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : failed to read /dev/random."))); + close(randfd); + return 1; + } + + close(randfd); + srand(randval); secrets_random_str(key.enckey, MAXSCALE_KEYLEN); secrets_random_str(key.initvector, MAXSCALE_IV_LEN); diff --git a/server/core/test/test_mysql_users.c b/server/core/test/test_mysql_users.c index 090d0d3c0..ccf52dbf3 100644 --- a/server/core/test/test_mysql_users.c +++ b/server/core/test/test_mysql_users.c @@ -231,9 +231,9 @@ int set_and_get_mysql_users_wildcards(char *username, char *hostname, char *pass service->users = mysql_users; if (db_from != NULL) - strcpy(data->db, db_from); + strncpy(data->db, db_from,MYSQL_DATABASE_MAXLEN); else - strcpy(data->db, ""); + strncpy(data->db, "",MYSQL_DATABASE_MAXLEN); /* freed by dcb_free(dcb) */ dcb->data = data; @@ -392,6 +392,22 @@ int main() { if (!ret) fprintf(stderr, "\t-- Expecting ok\n"); assert(ret == 0); + ret = set_and_get_mysql_users_wildcards("pippo", "192.%", "foo", "192.254.254.242", NULL, NULL, NULL); + if (!ret) fprintf(stderr, "\t-- Expecting ok\n"); + assert(ret == 0); + + ret = set_and_get_mysql_users_wildcards("pippo", "192.%.%", "foo", "192.254.254.242", NULL, NULL, NULL); + if (!ret) fprintf(stderr, "\t-- Expecting ok\n"); + assert(ret == 0); + + ret = set_and_get_mysql_users_wildcards("pippo", "192.254.%", "foo", "192.254.254.242", NULL, NULL, NULL); + if (!ret) fprintf(stderr, "\t-- Expecting ok\n"); + assert(ret == 0); + + ret = set_and_get_mysql_users_wildcards("pippo", "192.254.%", "foo", "192.254.0.242", NULL, NULL, NULL); + if (!ret) fprintf(stderr, "\t-- Expecting ok\n"); + assert(ret == 0); + ret = set_and_get_mysql_users_wildcards("riccio", "192.0.0.%", "foo", "192.134.0.2", NULL, NULL, NULL); if (ret) fprintf(stderr, "\t-- Expecting no match\n"); assert(ret == 1); diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 282bcdcdf..2225ff655 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -41,6 +41,8 @@ #define BINLOG_NAMEFMT "%s.%06d" #define BINLOG_NAME_ROOT "mysql-bin" +#define BINLOG_EVENT_HDR_LEN 19 + /* How often to call the binlog status function (seconds) */ #define BLR_STATS_FREQ 60 #define BLR_NSTATS_MINUTES 30 @@ -64,9 +66,9 @@ * BLR_MASTER_BACKOFF_TIME The increments of the back off time (seconds) * BLR_MAX_BACKOFF Maximum number of increments to backoff to */ - -#define BLR_MASTER_BACKOFF_TIME 5 +#define BLR_MASTER_BACKOFF_TIME 10 #define BLR_MAX_BACKOFF 60 + /** * Some useful macros for examining the MySQL Response packets */ @@ -128,6 +130,7 @@ typedef struct blfile { */ typedef struct { int n_events; /*< Number of events sent */ + unsigned long n_bytes; /*< Number of bytes sent */ int n_bursts; /*< Number of bursts sent */ int n_requests; /*< Number of requests received */ int n_flows; /*< Number of flow control restarts */ @@ -138,6 +141,7 @@ typedef struct { int n_above; int n_failed_read; int n_overrun; + int n_caughtup; int n_actions[3]; uint64_t lastsample; int minno; @@ -175,6 +179,7 @@ typedef struct router_slave { *router; /*< Pointer to the owning router */ struct router_slave *next; SLAVE_STATS stats; /*< Slave statistics */ + time_t connect_time; /*< Connect time of slave */ #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; #endif @@ -188,6 +193,7 @@ typedef struct { int n_slaves; /*< Number slave sessions created */ int n_reads; /*< Number of record reads */ uint64_t n_binlogs; /*< Number of binlog records from master */ + uint64_t n_binlogs_ses; /*< Number of binlog records from master */ uint64_t n_binlog_errors;/*< Number of binlog records from master */ uint64_t n_rotates; /*< Number of binlog rotate events */ uint64_t n_cachehits; /*< Number of hits on the binlog cache */ @@ -265,10 +271,12 @@ typedef struct router_instance { unsigned int short_burst; /*< Short burst for slave catchup */ unsigned int long_burst; /*< Long burst for slave catchup */ unsigned long burst_size; /*< Maximum size of burst to send */ + unsigned long heartbeat; /*< Configured heartbeat value */ ROUTER_STATS stats; /*< Statistics for this router */ int active_logs; int reconnect_pending; int retry_backoff; + time_t connect_time; int handling_threads; struct router_instance *next; @@ -278,25 +286,26 @@ typedef struct router_instance { * State machine for the master to MaxScale replication */ #define BLRM_UNCONNECTED 0x0000 -#define BLRM_AUTHENTICATED 0x0001 -#define BLRM_TIMESTAMP 0x0002 -#define BLRM_SERVERID 0x0003 -#define BLRM_HBPERIOD 0x0004 -#define BLRM_CHKSUM1 0x0005 -#define BLRM_CHKSUM2 0x0006 -#define BLRM_GTIDMODE 0x0007 -#define BLRM_MUUID 0x0008 -#define BLRM_SUUID 0x0009 -#define BLRM_LATIN1 0x000A -#define BLRM_UTF8 0x000B -#define BLRM_SELECT1 0x000C -#define BLRM_SELECTVER 0x000D -#define BLRM_REGISTER 0x000E -#define BLRM_BINLOGDUMP 0x000F +#define BLRM_CONNECTING 0x0001 +#define BLRM_AUTHENTICATED 0x0002 +#define BLRM_TIMESTAMP 0x0003 +#define BLRM_SERVERID 0x0004 +#define BLRM_HBPERIOD 0x0005 +#define BLRM_CHKSUM1 0x0006 +#define BLRM_CHKSUM2 0x0007 +#define BLRM_GTIDMODE 0x0008 +#define BLRM_MUUID 0x0009 +#define BLRM_SUUID 0x000A +#define BLRM_LATIN1 0x000B +#define BLRM_UTF8 0x000C +#define BLRM_SELECT1 0x000D +#define BLRM_SELECTVER 0x000E +#define BLRM_REGISTER 0x000F +#define BLRM_BINLOGDUMP 0x0010 -#define BLRM_MAXSTATE 0x000F +#define BLRM_MAXSTATE 0x0010 -static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrieval", +static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval", "Server ID retrieval", "HeartBeat Period setup", "binlog checksum config", "binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval", "Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1", @@ -371,6 +380,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", #define ANONYMOUS_GTID_EVENT 0x22 #define PREVIOUS_GTIDS_EVENT 0x23 +#define MAX_EVENT_TYPE 0x23 + /** * Binlog event flags */ diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 07c18b831..5e00a2c7a 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -761,7 +761,7 @@ int log_no_master = 1; /* log master detection failure od first master becomes available after failure */ if (root_master && mon_status_changed(root_master) && !(root_master->server->status & SERVER_STALE_STATUS)) { if (root_master->pending_status & (SERVER_MASTER)) { - if (!(root_master->mon_prev_status & SERVER_STALE_STATUS)) { + if (!(root_master->mon_prev_status & SERVER_STALE_STATUS) && !(root_master->server->status & SERVER_MAINT)) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Info: A Master Server is now available: %s:%i", diff --git a/server/modules/protocol/httpd.c b/server/modules/protocol/httpd.c index 453dd6c79..2f4a1918d 100644 --- a/server/modules/protocol/httpd.c +++ b/server/modules/protocol/httpd.c @@ -177,7 +177,7 @@ HTTPD_session *client_data = NULL; j++; } - while (!ISspace(buf[j]) && (i < sizeof(url) - 1) && (j < sizeof(buf) - 1)) { + while ((j < sizeof(buf) - 1) && !ISspace(buf[j]) && (i < sizeof(url) - 1)) { url[i] = buf[j]; i++; j++; } diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 1ccf18052..cd22f7c50 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -962,11 +962,20 @@ static int gw_create_backend_connection( } /** Copy client flags to backend protocol */ - protocol->client_capabilities = - ((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities; - /** Copy client charset to backend protocol */ - protocol->charset = - ((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset; + if (backend_dcb->session->client->protocol) + { + /** Copy client flags to backend protocol */ + protocol->client_capabilities = + ((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities; + /** Copy client charset to backend protocol */ + protocol->charset = + ((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset; + } + else + { + protocol->client_capabilities = GW_MYSQL_CAPABILITIES_CLIENT; + protocol->charset = 0x08; + } /*< if succeed, fd > 0, -1 otherwise */ rv = gw_do_connect_to_backend(server->name, server->port, &fd); @@ -1329,7 +1338,7 @@ static int gw_change_user( /* now get the user, after 4 bytes header and 1 byte command */ client_auth_packet += 5; - strcpy(username, (char *)client_auth_packet); + strncpy(username, (char *)client_auth_packet,MYSQL_USER_MAXLEN); client_auth_packet += strlen(username) + 1; /* get the auth token len */ @@ -1349,7 +1358,7 @@ static int gw_change_user( } /* get new database name */ - strcpy(database, (char *)client_auth_packet); + strncpy(database, (char *)client_auth_packet,MYSQL_DATABASE_MAXLEN); /* get character set */ if (strlen(database)) { @@ -1362,7 +1371,7 @@ static int gw_change_user( memcpy(&backend_protocol->charset, client_auth_packet, sizeof(int)); /* save current_database name */ - strcpy(current_database, current_session->db); + strncpy(current_database, current_session->db,MYSQL_DATABASE_MAXLEN); /* * Now clear database name in dcb as we don't do local authentication on db name for change user. diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 64697bbfa..b24260bef 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -969,7 +969,7 @@ GWBUF* mysql_create_custom_error( const char* msg) { uint8_t* outbuf = NULL; - uint8_t mysql_payload_size = 0; + uint32_t mysql_payload_size = 0; uint8_t mysql_packet_header[4]; uint8_t* mysql_payload = NULL; uint8_t field_count = 0; @@ -1579,7 +1579,7 @@ mysql_send_auth_error ( const char *mysql_message) { uint8_t *outbuf = NULL; - uint8_t mysql_payload_size = 0; + uint32_t mysql_payload_size = 0; uint8_t mysql_packet_header[4]; uint8_t *mysql_payload = NULL; uint8_t field_count = 0; diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index 8208c470b..59695b715 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -19,4 +19,7 @@ target_link_libraries(cli log_manager utils) install(TARGETS cli DESTINATION modules) add_subdirectory(readwritesplit) +if(BUILD_BINLOG) + add_subdirectory(binlog) +endif() diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index bded77735..1287eacf4 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -59,6 +59,8 @@ #include extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; static char *version_str = "V1.0.6"; @@ -186,6 +188,8 @@ int i; inst->long_burst = DEF_LONG_BURST; inst->burst_size = DEF_BURST_SIZE; inst->retry_backoff = 1; + inst->binlogdir = NULL; + inst->heartbeat = 300; // Default is every 5 minutes /* * We only support one server behind this router, since the server is @@ -306,6 +310,14 @@ int i; inst->burst_size = size; } + else if (strcmp(options[i], "heartbeat") == 0) + { + inst->heartbeat = atoi(value); + } + else if (strcmp(options[i], "binlogdir") == 0) + { + inst->binlogdir = strdup(value); + } else { LOGIF(LE, (skygw_log_write( @@ -416,6 +428,7 @@ ROUTER_SLAVE *slave; slave->router = inst; slave->file = NULL; strcpy(slave->binlogfile, "unassigned"); + slave->connect_time = time(0); /** * Add this session to the list of active sessions. @@ -509,9 +522,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; { /* * We must be closing the master session. - * - * TODO: Handle closure of master session */ + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "%s: Master %s disconnected after %ld seconds. " + "%d events read,", + router->service->name, router->master->remote, + time(0) - router->connect_time, router->stats.n_binlogs_ses))); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Binlog router close session with master server %s", @@ -529,6 +546,15 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; /* decrease server registered slaves counter */ atomic_add(&router->stats.n_registered, -1); + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "%s: Slave %s, server id %d, disconnected after %ld seconds. " + "%d events sent, %lu bytes.", + router->service->name, slave->dcb->remote, + slave->serverid, + time(0) - slave->connect_time, slave->stats.n_events, + slave->stats.n_bytes))); + /* * Mark the slave as unregistered to prevent the forwarding * of any more binlog records to this slave. @@ -641,25 +667,29 @@ struct tm tm; min5 /= 5.0; - dcb_printf(dcb, "\tMaster connection DCB: %p\n", + dcb_printf(dcb, "\tMaster connection DCB: %p\n", router_inst->master); - dcb_printf(dcb, "\tMaster connection state: %s\n", + dcb_printf(dcb, "\tMaster connection state: %s\n", blrm_states[router_inst->master_state]); localtime_r(&router_inst->stats.lastReply, &tm); asctime_r(&tm, buf); - dcb_printf(dcb, "\tNumber of master connects: %d\n", + dcb_printf(dcb, "\tBinlog directory: %s\n", + router_inst->binlogdir); + dcb_printf(dcb, "\tNumber of master connects: %d\n", router_inst->stats.n_masterstarts); - dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n", + dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n", router_inst->stats.n_delayedreconnects); - dcb_printf(dcb, "\tCurrent binlog file: %s\n", + dcb_printf(dcb, "\tCurrent binlog file: %s\n", router_inst->binlog_name); - dcb_printf(dcb, "\tCurrent binlog position: %u\n", + dcb_printf(dcb, "\tCurrent binlog position: %u\n", router_inst->binlog_position); - dcb_printf(dcb, "\tNumber of slave servers: %u\n", + dcb_printf(dcb, "\tNumber of slave servers: %u\n", router_inst->stats.n_slaves); - dcb_printf(dcb, "\tNumber of binlog events received: %u\n", + dcb_printf(dcb, "\tNo. of binlog events received this session: %u\n", + router_inst->stats.n_binlogs_ses); + dcb_printf(dcb, "\tTotal no. of binlog events received: %u\n", router_inst->stats.n_binlogs); minno = router_inst->stats.minno - 1; if (minno == -1) @@ -668,28 +698,31 @@ struct tm tm; dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n"); dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n", router_inst->stats.minavgs[minno], min5, min10, min15, min30); - dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", + dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", router_inst->stats.n_fakeevents); - dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n", + dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n", router_inst->stats.n_artificial); - dcb_printf(dcb, "\tNumber of binlog events in error: %u\n", + dcb_printf(dcb, "\tNumber of binlog events in error: %u\n", router_inst->stats.n_binlog_errors); - dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n", + dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n", router_inst->stats.n_rotates); - dcb_printf(dcb, "\tNumber of heartbeat events: %u\n", + dcb_printf(dcb, "\tNumber of heartbeat events: %u\n", router_inst->stats.n_heartbeats); - dcb_printf(dcb, "\tNumber of packets received: %u\n", + dcb_printf(dcb, "\tNumber of packets received: %u\n", router_inst->stats.n_reads); - dcb_printf(dcb, "\tNumber of residual data packets: %u\n", + dcb_printf(dcb, "\tNumber of residual data packets: %u\n", router_inst->stats.n_residuals); - dcb_printf(dcb, "\tAverage events per packet %.1f\n", + dcb_printf(dcb, "\tAverage events per packet %.1f\n", (double)router_inst->stats.n_binlogs / router_inst->stats.n_reads); - dcb_printf(dcb, "\tLast event from master at: %s", + dcb_printf(dcb, "\tLast event from master at: %s", buf); dcb_printf(dcb, "\t (%d seconds ago)\n", time(0) - router_inst->stats.lastReply); - dcb_printf(dcb, "\tLast event from master: 0x%x\n", - router_inst->lastEventReceived); + dcb_printf(dcb, "\tLast event from master: 0x%x (%s)\n", + router_inst->lastEventReceived, + (router_inst->lastEventReceived >= 0 && + router_inst->lastEventReceived < 0x24) ? + event_names[router_inst->lastEventReceived] : "unknown"); if (router_inst->active_logs) dcb_printf(dcb, "\tRouter processing binlog records\n"); if (router_inst->reconnect_pending) @@ -697,7 +730,7 @@ struct tm tm; dcb_printf(dcb, "\tEvents received:\n"); for (i = 0; i < 0x24; i++) { - dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]); + dcb_printf(dcb, "\t\t%-38s %u\n", event_names[i], router_inst->stats.events[i]); } #if SPINLOCK_PROFILE @@ -739,19 +772,44 @@ struct tm tm; min15 /= 15.0; min10 /= 10.0; min5 /= 5.0; - dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid); + dcb_printf(dcb, + "\t\tServer-id: %d\n", + session->serverid); if (session->hostname) - dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname); - dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb); - dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno); - dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]); - dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile); - dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos); + dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname); + dcb_printf(dcb, + "\t\tSlave: %d\n", + session->dcb->remote); + dcb_printf(dcb, + "\t\tSlave DCB: %p\n", + session->dcb); + dcb_printf(dcb, + "\t\tNext Sequence No: %d\n", + session->seqno); + dcb_printf(dcb, + "\t\tState: %s\n", + blrs_states[session->state]); + dcb_printf(dcb, + "\t\tBinlog file: %s\n", + session->binlogfile); + dcb_printf(dcb, + "\t\tBinlog position: %u\n", + session->binlog_pos); if (session->nocrc) - dcb_printf(dcb, "\t\tMaster Binlog CRC: None\n"); - dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests); - dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events); - dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts); + dcb_printf(dcb, + "\t\tMaster Binlog CRC: None\n"); + dcb_printf(dcb, + "\t\tNo. requests: %u\n", + session->stats.n_requests); + dcb_printf(dcb, + "\t\tNo. events sent: %u\n", + session->stats.n_events); + dcb_printf(dcb, + "\t\tNo. bursts sent: %u\n", + session->stats.n_bursts); + dcb_printf(dcb, + "\t\tNo. transitions to follow mode: %u\n", + session->stats.n_bursts); minno = session->stats.minno - 1; if (minno == -1) minno = 30; @@ -760,15 +818,18 @@ struct tm tm; dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n", session->stats.minavgs[minno], min5, min10, min15, min30); - dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); - dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd); - dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb); - dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna); - dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read); - dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun); - dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]); - dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]); - dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]); + dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); + dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd); + dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb); + dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read); + +#if DETAILED_DIAG + dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun); + dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]); + dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]); + dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]); +#endif + if ((session->cstate & CS_UPTODATE) == 0) { dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n", @@ -793,7 +854,7 @@ struct tm tm; dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n"); spinlock_stats(&session->rses_lock, spin_reporter, dcb); #endif - dcb_printf(dcb, "\n"); + dcb_printf(dcb, "\t\t--------------------\n\n"); session = session->next; } spinlock_release(&router_inst->lock); @@ -822,6 +883,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; router->stats.lastReply = time(0); } +static char * +extract_message(GWBUF *errpkt) +{ +char *rval; +int len; + + len = EXTRACT24(errpkt->start); + if ((rval = (char *)malloc(len)) == NULL) + return NULL; + memcpy(rval, (char *)(errpkt->start) + 7, 6); + rval[6] = ' '; + memcpy(&rval[7], (char *)(errpkt->start) + 13, len - 8); + rval[len-2] = 0; + return rval; +} + + + /** * Error Reply routine * @@ -841,10 +920,10 @@ errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_ { ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; int error, len; -char msg[85]; +char msg[85], *errmsg; len = sizeof(error); - if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0) + if (router->master && getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0 && error != 0) { strerror_r(error, msg, 80); strcat(msg, " "); @@ -852,10 +931,21 @@ char msg[85]; else strcpy(msg, ""); + errmsg = extract_message(message); LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master", - message, msg))); + LOGFILE_ERROR, "%s: Master connection error '%s' in state '%s', " + "%sattempting reconnect to master", + router->service->name, errmsg, + blrm_states[router->master_state], msg))); + if (errmsg) + free(errmsg); *succp = true; + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "%s: Master %s disconnected after %ld seconds. " + "%d events read.", + router->service->name, router->master->remote, + time(0) - router->connect_time, router->stats.n_binlogs_ses))); blr_master_reconnect(router); } diff --git a/server/modules/routing/binlog/blr_cache.c b/server/modules/routing/binlog/blr_cache.c index a7213aa8a..590fe55e7 100644 --- a/server/modules/routing/binlog/blr_cache.c +++ b/server/modules/routing/binlog/blr_cache.c @@ -54,6 +54,8 @@ extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; /** diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index ada392742..24be12782 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -51,6 +51,8 @@ #include extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; static void blr_file_create(ROUTER_INSTANCE *router, char *file); @@ -75,18 +77,27 @@ int root_len, i; DIR *dirp; struct dirent *dp; - strcpy(path, "/usr/local/skysql/MaxScale"); - if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + if (router->binlogdir == NULL) { - strcpy(path, ptr); + strcpy(path, "/usr/local/skysql/MaxScale"); + if ((ptr = getenv("MAXSCALE_HOME")) != NULL) + { + strcpy(path, ptr); + } + strcat(path, "/"); + strcat(path, router->service->name); + + if (access(path, R_OK) == -1) + mkdir(path, 0777); + + router->binlogdir = strdup(path); + } + if (access(router->binlogdir, R_OK) == -1) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "%s: Unable to read the binlog directory %s.", + router->service->name, router->binlogdir))); } - strcat(path, "/"); - strcat(path, router->service->name); - - if (access(path, R_OK) == -1) - mkdir(path, 0777); - - router->binlogdir = strdup(path); /* First try to find a binlog file number by reading the directory */ root_len = strlen(router->fileroot); @@ -353,7 +364,7 @@ struct stat statb; "Short read when reading the header. " "Expected 19 bytes but got %d bytes. " "Binlog file is %s, position %d", - file->binlogname, pos, n))); + n, file->binlogname, pos))); break; } return NULL; @@ -364,6 +375,17 @@ struct stat statb; hdr->event_size = extract_field(&hdbuf[9], 32); hdr->next_pos = EXTRACT32(&hdbuf[13]); hdr->flags = EXTRACT16(&hdbuf[17]); + + if (hdr->event_type > MAX_EVENT_TYPE) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Invalid event type 0x%x. " + "Binlog file is %s, position %d", + hdr->event_type, + file->binlogname, pos))); + return NULL; + } + if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index bb43c1a27..db95cf6c5 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -63,6 +63,8 @@ extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; static GWBUF *blr_make_query(char *statement); static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); @@ -91,6 +93,18 @@ blr_start_master(ROUTER_INSTANCE *router) DCB *client; GWBUF *buf; + router->stats.n_binlogs_ses = 0; + spinlock_acquire(&router->lock); + if (router->master_state != BLRM_UNCONNECTED) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "%s: Master Connect: Unexpected master state %s\n", + router->service->name, blrm_states[router->master_state]))); + spinlock_release(&router->lock); + return; + } + router->master_state = BLRM_CONNECTING; + spinlock_release(&router->lock); if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL) { LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, @@ -98,6 +112,7 @@ GWBUF *buf; return; } router->client = client; + client->state = DCB_STATE_POLLING; /* Fake the client is reading */ client->data = CreateMySQLAuthData(router->user, router->password, ""); if ((router->session = session_alloc(router->service, client)) == NULL) { @@ -108,17 +123,27 @@ GWBUF *buf; client->session = router->session; if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL) { - char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1); - sprintf(name, "%s Master", router->service->name); - hktask_oneshot(name, blr_start_master, router, - BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); + char *name; + if ((name = malloc(strlen(router->service->name) + + strlen(" Master") + 1)) != NULL) + { + sprintf(name, "%s Master", router->service->name); + hktask_oneshot(name, blr_start_master, router, + BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); + } if (router->retry_backoff > BLR_MAX_BACKOFF) - router->retry_backoff = 1; + router->retry_backoff = BLR_MAX_BACKOFF; LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, "Binlog router: failed to connect to master server '%s'", router->service->databases->unique_name))); return; } + router->master->remote = strdup(router->service->databases->name); + LOGIF(LM,(skygw_log_write( + LOGFILE_MESSAGE, + "%s: atempting to connect to master server %s.", + router->service->name, router->master->remote))); + router->connect_time = time(0); if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive ))) perror("setsockopt"); @@ -129,7 +154,6 @@ perror("setsockopt"); router->master_state = BLRM_TIMESTAMP; router->stats.n_masterstarts++; - router->retry_backoff = 1; } /** @@ -160,7 +184,27 @@ GWBUF *ptr; router->reconnect_pending = 0; router->active_logs = 0; spinlock_release(&router->lock); - blr_start_master(router); + if (router->master_state < BLRM_BINLOGDUMP) + { + char *name; + + router->master_state = BLRM_UNCONNECTED; + + if ((name = malloc(strlen(router->service->name) + + strlen(" Master")+1)) != NULL); + { + sprintf(name, "%s Master", router->service->name); + hktask_oneshot(name, blr_start_master, router, + BLR_MASTER_BACKOFF_TIME * router->retry_backoff++); + } + if (router->retry_backoff > BLR_MAX_BACKOFF) + router->retry_backoff = BLR_MAX_BACKOFF; + } + else + { + router->master_state = BLRM_UNCONNECTED; + blr_start_master(router); + } } /** @@ -225,8 +269,9 @@ char query[128]; if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.", - router->master_state))); + LOGFILE_ERROR, + "Invalid master state machine state (%d) for binlog router.", + router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); if (router->reconnect_pending) @@ -234,6 +279,12 @@ char query[128]; router->active_logs = 0; spinlock_release(&router->lock); atomic_add(&router->handling_threads, -1); + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "%s: Pending reconnect in state %s.", + router->service->name, + blrm_states[router->master_state] + ))); blr_restart_master(router); return; } @@ -247,8 +298,11 @@ char query[128]; { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "Received error: %d, %s from master during %s phase of the master state machine.", - MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] + "%s: Received error: %d, %s from master during %s phase " + "of the master state machine.", + router->service->name, + MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), + blrm_states[router->master_state] ))); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); @@ -272,12 +326,17 @@ char query[128]; buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); router->master_state = BLRM_SERVERID; router->master->func.write(router->master, buf); + router->retry_backoff = 1; break; case BLRM_SERVERID: // Response to fetch of master's server-id router->saved_master.server_id = buf; // TODO: Extract the value of server-id and place in router->master_id - buf = blr_make_query("SET @master_heartbeat_period = 1799999979520"); + { + char str[80]; + sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat); + buf = blr_make_query(str); + } router->master_state = BLRM_HBPERIOD; router->master->func.write(router->master, buf); break; @@ -357,6 +416,12 @@ char query[128]; buf = blr_make_binlog_dump(router); router->master_state = BLRM_BINLOGDUMP; router->master->func.write(router->master, buf); + LOGIF(LM,(skygw_log_write( + LOGFILE_MESSAGE, + "%s: Request binlog records from %s at " + "position %d from master server %s.", + router->service->name, router->binlog_name, + router->binlog_position, router->master->remote))); break; case BLRM_BINLOGDUMP: // Main body, we have received a binlog record from the master @@ -618,133 +683,172 @@ static REP_HEADER phdr; n_bufs = 1; } - blr_extract_header(ptr, &hdr); - - if (hdr.event_size != len - 5) + if (len < BINLOG_EVENT_HDR_LEN) { - LOGIF(LE,(skygw_log_write( - LOGFILE_ERROR, - "Packet length is %d, but event size is %d, " - "binlog file %s position %d" - "reslen is %d and preslen is %d, " - "length of previous event %d. %s", - len, hdr.event_size, - router->binlog_name, - router->binlog_position, - reslen, preslen, prev_length, - (prev_length == -1 ? - (no_residual ? "No residual data from previous call" : "Residual data from previous call") : "") - ))); - blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len); - LOGIF(LE,(skygw_log_write( - LOGFILE_ERROR, - "This event (0x%x) was contained in %d GWBUFs, " - "the previous events was contained in %d GWBUFs", - router->lastEventReceived, n_bufs, pn_bufs))); - if (msg) + char *msg = ""; + + if (ptr[4] == 0xfe) /* EOF Packet */ { - free(msg); - msg = NULL; + msg = "end of file"; } - break; + else if (ptr[4] == 0xff) /* EOF Packet */ + { + msg = "error"; + } + LOGIF(LM,(skygw_log_write( + LOGFILE_MESSAGE, + "Non-event message (%s) from master.", + msg))); } - phdr = hdr; - if (hdr.ok == 0) + else { router->stats.n_binlogs++; + router->stats.n_binlogs_ses++; router->lastEventReceived = hdr.event_type; + blr_extract_header(ptr, &hdr); + + if (hdr.event_size != len - 5) + { + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "Packet length is %d, but event size is %d, " + "binlog file %s position %d " + "reslen is %d and preslen is %d, " + "length of previous event %d. %s", + len, hdr.event_size, + router->binlog_name, + router->binlog_position, + reslen, preslen, prev_length, + (prev_length == -1 ? + (no_residual ? "No residual data from previous call" : "Residual data from previous call") : "") + ))); + blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len); + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "This event (0x%x) was contained in %d GWBUFs, " + "the previous events was contained in %d GWBUFs", + router->lastEventReceived, n_bufs, pn_bufs))); + if (msg) + { + free(msg); + msg = NULL; + } + break; + } + phdr = hdr; + if (hdr.ok == 0) + { + router->stats.n_binlogs++; + router->lastEventReceived = hdr.event_type; + // #define SHOW_EVENTS #ifdef SHOW_EVENTS - printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size); + printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size); #endif - if (hdr.event_type >= 0 && hdr.event_type < 0x24) - router->stats.events[hdr.event_type]++; - if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0) - { - // Fake format description message - LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG, - "Replication fake event. " - "Binlog %s @ %d.", - router->binlog_name, - router->binlog_position))); - router->stats.n_fakeevents++; - if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) + if (hdr.event_type >= 0 && hdr.event_type < 0x24) + router->stats.events[hdr.event_type]++; + if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0) { - /* - * We need to save this to replay to new - * slaves that attach later. - */ - if (router->saved_master.fde_event) - free(router->saved_master.fde_event); - router->saved_master.fde_len = hdr.event_size; - router->saved_master.fde_event = malloc(hdr.event_size); - if (router->saved_master.fde_event) - memcpy(router->saved_master.fde_event, - ptr + 5, hdr.event_size); + // Fake format description message + LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG, + "Replication fake event. " + "Binlog %s @ %d.", + router->binlog_name, + router->binlog_position))); + router->stats.n_fakeevents++; + if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) + { + uint8_t *new_fde; + unsigned int new_fde_len; + /* + * We need to save this to replay to new + * slaves that attach later. + */ + new_fde_len = hdr.event_size; + new_fde = malloc(hdr.event_size); + if (new_fde) + { + memcpy(new_fde, ptr + 5, hdr.event_size); + if (router->saved_master.fde_event) + free(router->saved_master.fde_event); + router->saved_master.fde_event = new_fde; + router->saved_master.fde_len = new_fde_len; + } + else + { + LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, + "%s: Received a format description " + "event that MaxScale was unable to " + "record. Event length is %d.", + router->service->name, + hdr.event_size))); + blr_log_packet(LOGFILE_ERROR, + "Format Description Event:", ptr, len); + } + } + } + else + { + if (hdr.event_type == HEARTBEAT_EVENT) + { +#ifdef SHOW_EVENTS + printf("Replication heartbeat\n"); +#endif + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Replication heartbeat. " + "Binlog %s @ %d.", + router->binlog_name, + router->binlog_position))); + router->stats.n_heartbeats++; + } + else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) + { + ptr = ptr + 5; // We don't put the first byte of the payload + // into the binlog file + if (hdr.event_type == ROTATE_EVENT) + router->rotating = 1; + blr_write_binlog_record(router, &hdr, ptr); + if (hdr.event_type == ROTATE_EVENT) + { + blr_rotate_event(router, ptr, &hdr); + } + blr_distribute_binlog_record(router, &hdr, ptr); + } + else + { + router->stats.n_artificial++; + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Artificial event not written " + "to disk or distributed. " + "Type 0x%x, Length %d, Binlog " + "%s @ %d.", + hdr.event_type, + hdr.event_size, + router->binlog_name, + router->binlog_position))); + ptr += 5; + if (hdr.event_type == ROTATE_EVENT) + { + router->rotating = 1; + blr_rotate_event(router, ptr, &hdr); + } + } } } else { - if (hdr.event_type == HEARTBEAT_EVENT) - { -#ifdef SHOW_EVENTS - printf("Replication heartbeat\n"); -#endif - LOGIF(LD,(skygw_log_write( - LOGFILE_DEBUG, - "Replication heartbeat. " - "Binlog %s @ %d.", - router->binlog_name, - router->binlog_position))); - router->stats.n_heartbeats++; - } - else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) - { - ptr = ptr + 5; // We don't put the first byte of the payload - // into the binlog file - if (hdr.event_type == ROTATE_EVENT) - router->rotating = 1; - blr_write_binlog_record(router, &hdr, ptr); - if (hdr.event_type == ROTATE_EVENT) - { - blr_rotate_event(router, ptr, &hdr); - } - blr_distribute_binlog_record(router, &hdr, ptr); - } - else - { - router->stats.n_artificial++; - LOGIF(LD,(skygw_log_write( - LOGFILE_DEBUG, - "Artificial event not written " - "to disk or distributed. " - "Type 0x%x, Length %d, Binlog " - "%s @ %d.", - hdr.event_type, - hdr.event_size, - router->binlog_name, - router->binlog_position))); - ptr += 5; - if (hdr.event_type == ROTATE_EVENT) - { - router->rotating = 1; - blr_rotate_event(router, ptr, &hdr); - } - } + LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, + "Error packet in binlog stream.%s @ %d.", + router->binlog_name, + router->binlog_position))); + blr_log_packet(LOGFILE_ERROR, "Error Packet:", + ptr, len); + router->stats.n_binlog_errors++; } } - else - { - printf("Binlog router error: %s\n", &ptr[7]); - LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, - "Error packet in binlog stream.%s @ %d.", - router->binlog_name, - router->binlog_position))); - blr_log_packet(LOGFILE_ERROR, "Error Packet:", - ptr, len); - router->stats.n_binlog_errors++; - } if (msg) { @@ -968,6 +1072,7 @@ int action; { blr_slave_rotate(slave, ptr); } + slave->stats.n_bytes += gwbuf_length(pkt); slave->dcb->func.write(slave->dcb, pkt); if (hdr->event_type != ROTATE_EVENT) { diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 7a6489e02..685a6e5ae 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -65,8 +65,11 @@ int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; /** * Process a request packet from the slave server. @@ -544,29 +547,8 @@ uint32_t chksum; rval = slave->dcb->func.write(slave->dcb, resp); /* Send the FORMAT_DESCRIPTION_EVENT */ - if (router->saved_master.fde_event) - { - resp = gwbuf_alloc(router->saved_master.fde_len + 5); - ptr = GWBUF_DATA(resp); - encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length - ptr += 3; - *ptr++ = slave->seqno++; - *ptr++ = 0; // OK - memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len); - encode_value(ptr, time(0), 32); // Overwrite timestamp - /* - * Since we have changed the timestamp we must recalculate the CRC - * - * Position ptr to the start of the event header, - * calculate a new checksum - * and write it into the header - */ - ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4; - chksum = crc32(0L, NULL, 0); - chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4); - encode_value(ptr, chksum, 32); - rval = slave->dcb->func.write(slave->dcb, resp); - } + if (slave->binlog_pos != 4) + blr_slave_send_fde(router, slave); slave->dcb->low_water = router->low_water; slave->dcb->high_water = router->high_water; @@ -575,8 +557,9 @@ uint32_t chksum; LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, - "%s: New slave %s requested binlog file %s from position %lu", + "%s: New slave %s, server id %d, requested binlog file %s from position %lu", router->service->name, slave->dcb->remote, + slave->serverid, slave->binlogfile, slave->binlog_pos))); if (slave->binlog_pos != router->binlog_position || @@ -782,6 +765,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "blr_open_binlog took %d beats", hkheartbeat - beat1))); } + slave->stats.n_bytes += gwbuf_length(head); written = slave->dcb->func.write(slave->dcb, head); if (written && hdr.event_type != ROTATE_EVENT) { @@ -839,11 +823,23 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write( if (state_change) { - LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, - "%s: Slave %s is up to date %s, %u.", + slave->stats.n_caughtup++; + if (slave->stats.n_caughtup == 1) + { + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "%s: Slave %s is up to date %s, %u.", router->service->name, slave->dcb->remote, slave->binlogfile, slave->binlog_pos))); + } + else if ((slave->stats.n_caughtup % 50) == 0) + { + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "%s: Slave %s is up to date %s, %u.", + router->service->name, + slave->dcb->remote, + slave->binlogfile, slave->binlog_pos))); + } } } else @@ -1031,3 +1027,51 @@ uint32_t chksum; slave->dcb->func.write(slave->dcb, resp); return 1; } + +/** + * Send a "fake" format description event to the newly connected slave + * + * @param router The router instance + * @param slave The slave to send the event to + */ +static void +blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +{ +BLFILE *file; +REP_HEADER hdr; +GWBUF *record, *head; +uint8_t *ptr; +uint32_t chksum; + + if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL) + return; + if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL) + { + blr_close_binlog(router, file); + return; + } + blr_close_binlog(router, file); + head = gwbuf_alloc(5); + ptr = GWBUF_DATA(head); + encode_value(ptr, hdr.event_size + 1, 24); // Payload length + ptr += 3; + *ptr++ = slave->seqno++; + *ptr++ = 0; // OK + head = gwbuf_append(head, record); + ptr = GWBUF_DATA(record); + encode_value(ptr, time(0), 32); // Overwrite timestamp + ptr += 13; + encode_value(ptr, 0, 32); // Set next position to 0 + /* + * Since we have changed the timestamp we must recalculate the CRC + * + * Position ptr to the start of the event header, + * calculate a new checksum + * and write it into the header + */ + ptr = GWBUF_DATA(record) + hdr.event_size - 4; + chksum = crc32(0L, NULL, 0); + chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4); + encode_value(ptr, chksum, 32); + slave->dcb->func.write(slave->dcb, head); +} diff --git a/server/modules/routing/readwritesplit/test/rwsplit.sh b/server/modules/routing/readwritesplit/test/rwsplit.sh index 7c9ec5edb..fa50ae6b7 100755 --- a/server/modules/routing/readwritesplit/test/rwsplit.sh +++ b/server/modules/routing/readwritesplit/test/rwsplit.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash NARGS=7 TLOG=$1 THOST=$2