diff --git a/client/depend.mk b/client/depend.mk index e69de29bb..a5b4abeb3 100644 --- a/client/depend.mk +++ b/client/depend.mk @@ -0,0 +1,63 @@ +maxadmin.o: maxadmin.c /usr/include/stdc-predef.h /usr/include/stdio.h \ + /usr/include/features.h /usr/include/x86_64-linux-gnu/sys/cdefs.h \ + /usr/include/x86_64-linux-gnu/bits/wordsize.h \ + /usr/include/x86_64-linux-gnu/gnu/stubs.h \ + /usr/include/x86_64-linux-gnu/gnu/stubs-64.h \ + /usr/lib/gcc/x86_64-linux-gnu/4.8/include/stddef.h \ + /usr/include/x86_64-linux-gnu/bits/types.h \ + /usr/include/x86_64-linux-gnu/bits/typesizes.h /usr/include/libio.h \ + /usr/include/_G_config.h /usr/include/wchar.h \ + /usr/lib/gcc/x86_64-linux-gnu/4.8/include/stdarg.h \ + /usr/include/x86_64-linux-gnu/bits/stdio_lim.h \ + /usr/include/x86_64-linux-gnu/bits/sys_errlist.h /usr/include/string.h \ + /usr/include/xlocale.h /usr/include/signal.h \ + /usr/include/x86_64-linux-gnu/bits/sigset.h \ + /usr/include/x86_64-linux-gnu/bits/signum.h /usr/include/time.h \ + /usr/include/x86_64-linux-gnu/bits/siginfo.h \ + /usr/include/x86_64-linux-gnu/bits/sigaction.h \ + /usr/include/x86_64-linux-gnu/bits/sigcontext.h \ + /usr/include/x86_64-linux-gnu/bits/sigstack.h \ + /usr/include/x86_64-linux-gnu/sys/ucontext.h \ + /usr/include/x86_64-linux-gnu/bits/pthreadtypes.h \ + /usr/include/x86_64-linux-gnu/bits/sigthread.h \ + /usr/include/x86_64-linux-gnu/sys/wait.h \ + /usr/include/x86_64-linux-gnu/bits/waitflags.h \ + /usr/include/x86_64-linux-gnu/bits/waitstatus.h /usr/include/endian.h \ + /usr/include/x86_64-linux-gnu/bits/endian.h \ + /usr/include/x86_64-linux-gnu/bits/byteswap.h \ + /usr/include/x86_64-linux-gnu/bits/byteswap-16.h \ + /usr/include/x86_64-linux-gnu/sys/types.h \ + /usr/include/x86_64-linux-gnu/sys/select.h \ + /usr/include/x86_64-linux-gnu/bits/select.h \ + /usr/include/x86_64-linux-gnu/bits/time.h \ + /usr/include/x86_64-linux-gnu/sys/sysmacros.h \ + /usr/include/x86_64-linux-gnu/sys/socket.h \ + /usr/include/x86_64-linux-gnu/sys/uio.h \ + /usr/include/x86_64-linux-gnu/bits/uio.h \ + /usr/include/x86_64-linux-gnu/bits/socket.h \ + /usr/include/x86_64-linux-gnu/bits/socket_type.h \ + /usr/include/x86_64-linux-gnu/bits/sockaddr.h \ + /usr/include/x86_64-linux-gnu/asm/socket.h \ + /usr/include/asm-generic/socket.h \ + /usr/include/x86_64-linux-gnu/asm/sockios.h \ + /usr/include/asm-generic/sockios.h /usr/include/netinet/in.h \ + /usr/lib/gcc/x86_64-linux-gnu/4.8/include/stdint.h /usr/include/stdint.h \ + /usr/include/x86_64-linux-gnu/bits/wchar.h \ + /usr/include/x86_64-linux-gnu/bits/in.h /usr/include/arpa/inet.h \ + /usr/include/netdb.h /usr/include/rpc/netdb.h \ + /usr/include/x86_64-linux-gnu/bits/netdb.h /usr/include/ctype.h \ + /usr/include/stdlib.h /usr/include/alloca.h \ + /usr/include/x86_64-linux-gnu/bits/stdlib-float.h /usr/include/termios.h \ + /usr/include/x86_64-linux-gnu/bits/termios.h \ + /usr/include/x86_64-linux-gnu/sys/ttydefaults.h /usr/include/unistd.h \ + /usr/include/x86_64-linux-gnu/bits/posix_opt.h \ + /usr/include/x86_64-linux-gnu/bits/environments.h \ + /usr/include/x86_64-linux-gnu/bits/confname.h /usr/include/getopt.h \ + /usr/include/dirent.h /usr/include/x86_64-linux-gnu/bits/dirent.h \ + /usr/include/x86_64-linux-gnu/bits/posix1_lim.h \ + /usr/include/x86_64-linux-gnu/bits/local_lim.h \ + /usr/include/linux/limits.h /usr/include/locale.h \ + /usr/include/x86_64-linux-gnu/bits/locale.h /usr/include/errno.h \ + /usr/include/x86_64-linux-gnu/bits/errno.h /usr/include/linux/errno.h \ + /usr/include/x86_64-linux-gnu/asm/errno.h \ + /usr/include/asm-generic/errno.h /usr/include/asm-generic/errno-base.h diff --git a/debian/changelog b/debian/changelog new file mode 100644 index 000000000..eeffe8a63 --- /dev/null +++ b/debian/changelog @@ -0,0 +1,5 @@ +maxscale (0.7-1) UNRELEASED; urgency=low + + * Initial release. (Closes: #XXXXXX) + + -- Timofey Turenko Tue, 11 Mar 2014 22:59:35 +0200 diff --git a/debian/compat b/debian/compat new file mode 100644 index 000000000..45a4fb75d --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +8 diff --git a/debian/control b/debian/control new file mode 100644 index 000000000..be0a062ec --- /dev/null +++ b/debian/control @@ -0,0 +1,15 @@ +Source: maxscale +Maintainer: Timofey Turenko +Section: misc +Priority: optional +Standards-Version: 3.9.2 +Build-Depends: debhelper (>= 8), gcc, g++, ncurses-dev, bison, build-essential, libssl-dev, libaio-dev, libmariadbclient-dev, libmariadbd-dev, mariadb-server, cmake, perl, make, libtool, + +Package: maxscale +Architecture: any +Depends: ${shlibs:Depends}, ${misc:Depends} +Description: MaxScale + The SkySQL MaxScale is an intelligent proxy that allows forwarding of + database statements to one or more database servers using complex rules, + a semantic understanding of the database statements and the roles of + the various servers within the backend cluster of databases. diff --git a/debian/files b/debian/files new file mode 100644 index 000000000..cdd732fd3 --- /dev/null +++ b/debian/files @@ -0,0 +1 @@ +maxscale_0.7-1_amd64.deb misc optional diff --git a/debian/install b/debian/install new file mode 100644 index 000000000..a9af76865 --- /dev/null +++ b/debian/install @@ -0,0 +1,3 @@ +maxscale.conf etc/ld.so.conf.d/ +etc/init.d/maxscale etc/init.d/ +binaries/* usr/local/sbin/ diff --git a/debian/postinst b/debian/postinst new file mode 100644 index 000000000..59b18bb73 --- /dev/null +++ b/debian/postinst @@ -0,0 +1,4 @@ +#!/bin/bash + +ln -s /lib64/libaio.so.1 /lib64/libaio.so +/sbin/ldconfig diff --git a/debian/rules b/debian/rules new file mode 100755 index 000000000..fd6b52622 --- /dev/null +++ b/debian/rules @@ -0,0 +1,11 @@ +#!/usr/bin/make -f +%: + $(MAKE) ROOT_PATH=$(shell pwd) HOME="" clean + $(MAKE) ROOT_PATH=$(shell pwd) HOME="" depend + $(MAKE) ROOT_PATH=$(shell pwd) HOME="" + $(MAKE) DEST="$(shell pwd)/binaries" ROOT_PATH=$(shell pwd) HOME="" ERRMSG="/usr/share/mysql/english" EMBEDDED_LIB="/usr/lib/x86_64-linux-gnu/" install + dh $@ +override_dh_usrlocal: +override_dh_auto_clean: +override_dh_auto_build: +override_dh_auto_install: diff --git a/server/core/dcb.c b/server/core/dcb.c index f78edd003..26eac3415 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1878,7 +1878,6 @@ static DCB* dcb_get_next ( } void dcb_call_foreach ( - SERVER* srv, DCB_REASON reason) { switch (reason) { diff --git a/server/core/server.c b/server/core/server.c index 8d16de41b..c699aac7a 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -28,6 +28,7 @@ * 20/05/14 Massimiliano Pinto Addition of server_string * 21/05/14 Massimiliano Pinto Addition of node_id * 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields + * 20/06/14 Massimiliano Pinto Addition of master_id, depth, slaves fields * 26/06/14 Mark Riddoch Addition of server parameters * * @endverbatim @@ -78,6 +79,9 @@ SERVER *server; server->rlag = -1; server->node_ts = 0; server->parameters = NULL; + server->master_id = -1; + server->depth = -1; + server->slaves = NULL; spinlock_acquire(&server_spin); server->next = allServers; @@ -252,7 +256,21 @@ char *stat; if (ptr->server_string) dcb_printf(dcb, "\tServer Version:\t\t%s\n", ptr->server_string); dcb_printf(dcb, "\tNode Id: %d\n", ptr->node_id); - if (SERVER_IS_SLAVE(ptr)) { + dcb_printf(dcb, "\tMaster Id: %d\n", ptr->master_id); + if (ptr->slaves) { + int i; + dcb_printf(dcb, "\tSlave Ids: "); + for (i = 0; ptr->slaves[i]; i++) + { + if (i == 0) + dcb_printf(dcb, "%li", ptr->slaves[i]); + else + dcb_printf(dcb, ", %li ", ptr->slaves[i]); + } + dcb_printf(dcb, "\n"); + } + dcb_printf(dcb, "\tRepl Depth: %d\n", ptr->depth); + if (SERVER_IS_SLAVE(ptr) || SERVER_IS_RELAY_SERVER(ptr)) { if (ptr->rlag >= 0) { dcb_printf(dcb, "\tSlave delay:\t\t%d\n", ptr->rlag); } @@ -289,7 +307,21 @@ SERVER_PARAM *param; if (server->server_string) dcb_printf(dcb, "\tServer Version:\t\t%s\n", server->server_string); dcb_printf(dcb, "\tNode Id: %d\n", server->node_id); - if (SERVER_IS_SLAVE(server)) { + dcb_printf(dcb, "\tMaster Id: %d\n", server->master_id); + if (server->slaves) { + int i; + dcb_printf(dcb, "\tSlave Ids: "); + for (i = 0; server->slaves[i]; i++) + { + if (i == 0) + dcb_printf(dcb, "%li", server->slaves[i]); + else + dcb_printf(dcb, ", %li ", server->slaves[i]); + } + dcb_printf(dcb, "\n"); + } + dcb_printf(dcb, "\tRepl Depth: %d\n", server->depth); + if (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) { if (server->rlag >= 0) { dcb_printf(dcb, "\tSlave delay:\t\t%d\n", server->rlag); } diff --git a/server/include/dcb.h b/server/include/dcb.h index 6d91b94b1..7072c21dd 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -298,6 +298,8 @@ bool dcb_set_state( dcb_state_t new_state, dcb_state_t* old_state); +void dcb_call_foreach ( + DCB_REASON reason); /* DCB flags values */ #define DCBF_CLONE 0x0001 /* DCB is a clone */ diff --git a/server/include/server.h b/server/include/server.h index 3768d98f0..fb382e6f8 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -36,6 +36,7 @@ * 20/05/14 Massimiliano Pinto Addition of node_id field * 23/05/14 Massimiliano Pinto Addition of rlag and node_ts fields * 03/06/14 Mark Riddoch Addition of maintainance mode + * 20/06/14 Massimiliano Pinto Addition of master_id, depth, slaves fields * 26/06/14 Mark Riddoch Adidtion of server parameters * * @endverbatim @@ -83,6 +84,9 @@ typedef struct server { int rlag; /**< Replication Lag for Master / Slave replication */ unsigned long node_ts; /**< Last timestamp set from M/S monitor module */ SERVER_PARAM *parameters; /**< Parameters of a server that may be used to weight routing decisions */ + long master_id; /**< Master server id of this node */ + int depth; /**< Replication level in the tree */ + long *slaves; /**< Slaves of this node */ } SERVER; /** @@ -90,11 +94,12 @@ typedef struct server { * * These are a bitmap of attributes that may be applied to a server */ -#define SERVER_RUNNING 0x0001 /**<< The server is up and running */ -#define SERVER_MASTER 0x0002 /**<< The server is a master, i.e. can handle writes */ -#define SERVER_SLAVE 0x0004 /**<< The server is a slave, i.e. can handle reads */ -#define SERVER_JOINED 0x0008 /**<< The server is joined in a Galera cluster */ -#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */ +#define SERVER_RUNNING 0x0001 /**<< The server is up and running */ +#define SERVER_MASTER 0x0002 /**<< The server is a master, i.e. can handle writes */ +#define SERVER_SLAVE 0x0004 /**<< The server is a slave, i.e. can handle reads */ +#define SERVER_JOINED 0x0008 /**<< The server is joined in a Galera cluster */ +#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */ +#define SERVER_SLAVE_OF_EXTERNAL_MASTER 0x0016 /**<< Server is slave of a Master outside the provided replication topology */ /** * Is the server running - the macro returns true if the server is marked as running @@ -135,6 +140,9 @@ typedef struct server { #define SERVER_IS_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) != 0) +#define SERVER_IS_RELAY_SERVER(server) \ + (((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE|SERVER_MAINT)) == (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE)) + extern SERVER *server_alloc(char *, char *, unsigned short); extern int server_free(SERVER *); extern SERVER *server_find_by_unique_name(char *); diff --git a/server/modules/monitor/galera_mon.c b/server/modules/monitor/galera_mon.c index a9f242756..c9c3aef01 100644 --- a/server/modules/monitor/galera_mon.c +++ b/server/modules/monitor/galera_mon.c @@ -29,6 +29,7 @@ * 23/05/14 Massimiliano Pinto Added 1 configuration option (setInterval). * Interval is printed in diagnostics. * 03/06/14 Mark Riddoch Add support for maintenance mode + * 24/06/14 Massimiliano Pinto Added depth level 0 for each node * * @endverbatim */ @@ -434,6 +435,7 @@ long master_id; /* set master_id to the lowest value of ptr->server->node_id */ if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) { + ptr->server->depth = 0; if (ptr->server->node_id < master_id && master_id >= 0) { master_id = ptr->server->node_id; } else { @@ -445,6 +447,7 @@ long master_id; /* clear M/S status */ server_clear_status(ptr->server, SERVER_SLAVE); server_clear_status(ptr->server, SERVER_MASTER); + ptr->server->depth = -1; } if (ptr->server->status != prev_status || SERVER_IS_DOWN(ptr->server)) diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 38625a6ed..80dc75091 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -33,6 +33,13 @@ * 28/05/14 Massimiliano Pinto Added set Id and configuration options (setInverval) * Parameters are now printed in diagnostics * 03/06/14 Mark Ridoch Add support for maintenance mode + * 17/06/14 Massimiliano Pinto Addition of getServerByNodeId routine + * and first implementation for depth of replication for nodes. + * 23/06/14 Massimiliano Pinto Added replication consistency after replication tree computation + * 27/06/14 Massimiliano Pinto Added replication pending status in monitored server, storing there + * the status to update in server status field before + * starting the replication consistency check. + * This will also give routers a consistent "status" of all servers * * @endverbatim */ @@ -75,6 +82,14 @@ static void defaultId(void *, unsigned long); static void replicationHeartbeat(void *, int); static bool mon_status_changed(MONITOR_SERVERS* mon_srv); static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv); +static MONITOR_SERVERS *getServerByNodeId(MONITOR_SERVERS *, long); +static MONITOR_SERVERS *getSlaveOfNodeId(MONITOR_SERVERS *, long); +static MONITOR_SERVERS *get_replication_tree(MYSQL_MONITOR *, int); +static void set_master_heartbeat(MYSQL_MONITOR *, MONITOR_SERVERS *); +static void set_slave_heartbeat(MYSQL_MONITOR *, MONITOR_SERVERS *); +static int add_slave_to_master(long *, int, long); +static void monitor_set_pending_status(MONITOR_SERVERS *, int); +static void monitor_clear_pending_status(MONITOR_SERVERS *, int); static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat }; @@ -145,6 +160,7 @@ MYSQL_MONITOR *handle; handle->id = MONITOR_DEFAULT_ID; handle->interval = MONITOR_INTERVAL; handle->replicationHeartbeat = 0; + handle->master = NULL; spinlock_init(&handle->lock); } handle->tid = (THREAD)thread_start(monitorMain, handle); @@ -185,6 +201,9 @@ MONITOR_SERVERS *ptr, *db; db->next = NULL; db->mon_err_count = 0; db->mon_prev_status = 0; + /* pending status is updated by get_replication_tree */ + db->pending_status = 0; + spinlock_acquire(&handle->lock); if (handle->databases == NULL) @@ -316,15 +335,11 @@ monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) MYSQL_ROW row; MYSQL_RES *result; int num_fields; -int ismaster = 0; int isslave = 0; char *uname = handle->defaultUser; char *passwd = handle->defaultPasswd; unsigned long int server_version = 0; char *server_string; -unsigned long id = handle->id; -int replication_heartbeat = handle->replicationHeartbeat; -static int conn_err_count; if (database->server->monuser != NULL) { @@ -373,15 +388,28 @@ static int conn_err_count; database->server->port, mysql_error(database->con)))); } - /** Store current status */ + + /* The current server is not running + * + * Store server NOT running in server and monitor server pending struct + * + */ server_clear_status(database->server, SERVER_RUNNING); - + monitor_clear_pending_status(database, SERVER_RUNNING); + + /* Also clear M/S state in both server and monitor server pending struct */ + server_clear_status(database->server, SERVER_SLAVE); + server_clear_status(database->server, SERVER_MASTER); + monitor_clear_pending_status(database, SERVER_SLAVE); + monitor_clear_pending_status(database, SERVER_MASTER); + return; } free(dpwd); - } - /** Store current status */ - server_set_status(database->server, SERVER_RUNNING); + } + /* Store current status in both server and monitor server pending struct */ + server_set_status(database->server, SERVER_RUNNING); + monitor_set_pending_status(database, SERVER_RUNNING); /* get server version from current server */ server_version = mysql_get_server_version(database->con); @@ -411,121 +439,6 @@ static int conn_err_count; mysql_free_result(result); } - /* Check SHOW SLAVE HOSTS - if we get rows then we are a master */ - if (mysql_query(database->con, "SHOW SLAVE HOSTS")) - { - if (mysql_errno(database->con) == ER_SPECIFIC_ACCESS_DENIED_ERROR) - { - /* Log lack of permission */ - } - - database->server->rlag = -1; - } else if ((result = mysql_store_result(database->con)) != NULL) { - num_fields = mysql_num_fields(result); - while ((row = mysql_fetch_row(result))) - { - ismaster = 1; - } - mysql_free_result(result); - - if (ismaster && replication_heartbeat == 1) { - time_t heartbeat; - time_t purge_time; - char heartbeat_insert_query[128]=""; - char heartbeat_purge_query[128]=""; - - handle->master_id = database->server->node_id; - - /* create the maxscale_schema database */ - if (mysql_query(database->con, "CREATE DATABASE IF NOT EXISTS maxscale_schema")) { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "[mysql_mon]: Error creating maxscale_schema database in Master server" - ": %s", mysql_error(database->con)))); - - database->server->rlag = -1; - } - - /* create repl_heartbeat table in maxscale_schema database */ - if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS " - "maxscale_schema.replication_heartbeat " - "(maxscale_id INT NOT NULL, " - "master_server_id INT NOT NULL, " - "master_timestamp INT UNSIGNED NOT NULL, " - "PRIMARY KEY ( master_server_id, maxscale_id ) ) " - "ENGINE=MYISAM DEFAULT CHARSET=latin1")) { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "[mysql_mon]: Error creating maxscale_schema.replication_heartbeat table in Master server" - ": %s", mysql_error(database->con)))); - - database->server->rlag = -1; - } - - /* auto purge old values after 48 hours*/ - purge_time = time(0) - (3600 * 48); - - sprintf(heartbeat_purge_query, "DELETE FROM maxscale_schema.replication_heartbeat WHERE master_timestamp < %lu", purge_time); - - if (mysql_query(database->con, heartbeat_purge_query)) { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "[mysql_mon]: Error deleting from maxscale_schema.replication_heartbeat table: [%s], %s", - heartbeat_purge_query, - mysql_error(database->con)))); - } - - heartbeat = time(0); - - /* set node_ts for master as time(0) */ - database->server->node_ts = heartbeat; - - sprintf(heartbeat_insert_query, "UPDATE maxscale_schema.replication_heartbeat SET master_timestamp = %lu WHERE master_server_id = %i AND maxscale_id = %lu", heartbeat, handle->master_id, id); - - /* Try to insert MaxScale timestamp into master */ - if (mysql_query(database->con, heartbeat_insert_query)) { - - database->server->rlag = -1; - - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "[mysql_mon]: Error updating maxscale_schema.replication_heartbeat table: [%s], %s", - heartbeat_insert_query, - mysql_error(database->con)))); - } else { - if (mysql_affected_rows(database->con) == 0) { - heartbeat = time(0); - sprintf(heartbeat_insert_query, "REPLACE INTO maxscale_schema.replication_heartbeat (master_server_id, maxscale_id, master_timestamp ) VALUES ( %i, %lu, %lu)", handle->master_id, id, heartbeat); - - if (mysql_query(database->con, heartbeat_insert_query)) { - - database->server->rlag = -1; - - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "[mysql_mon]: Error inserting into maxscale_schema.replication_heartbeat table: [%s], %s", - heartbeat_insert_query, - mysql_error(database->con)))); - } else { - /* Set replication lag to 0 for the master */ - database->server->rlag = 0; - - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "[mysql_mon]: heartbeat table inserted data for %s:%i", database->server->name, database->server->port))); - } - } else { - /* Set replication lag as 0 for the master */ - database->server->rlag = 0; - - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "[mysql_mon]: heartbeat table updated for %s:%i", database->server->name, database->server->port))); - } - } - } - } - /* Check if the Slave_SQL_Running and Slave_IO_Running status is * set to Yes */ @@ -537,17 +450,28 @@ static int conn_err_count; && (result = mysql_store_result(database->con)) != NULL) { int i = 0; + long master_id = -1; num_fields = mysql_num_fields(result); while ((row = mysql_fetch_row(result))) { + /* get Slave_IO_Running and Slave_SQL_Running values*/ if (strncmp(row[12], "Yes", 3) == 0 && strncmp(row[13], "Yes", 3) == 0) { isslave += 1; + + /* get Master_Server_Id values */ + master_id = atol(row[41]); + if (master_id == 0) + master_id = -1; } i++; } + /* store master_id of current node */ + memcpy(&database->server->master_id, &master_id, sizeof(long)); + mysql_free_result(result); + /* If all configured slaves are running set this node as slave */ if (isslave > 0 && isslave == i) isslave = 1; else @@ -557,105 +481,45 @@ static int conn_err_count; if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0 && (result = mysql_store_result(database->con)) != NULL) { + long master_id = -1; num_fields = mysql_num_fields(result); while ((row = mysql_fetch_row(result))) { + /* get Slave_IO_Running and Slave_SQL_Running values*/ if (strncmp(row[10], "Yes", 3) == 0 - && strncmp(row[11], "Yes", 3) == 0) + && strncmp(row[11], "Yes", 3) == 0) { isslave = 1; + + /* get Master_Server_Id values */ + master_id = atol(row[39]); + if (master_id == 0) + master_id = -1; + } } + /* store master_id of current node */ + memcpy(&database->server->master_id, &master_id, sizeof(long)); + mysql_free_result(result); } } - /* Get the master_timestamp value from maxscale_schema.replication_heartbeat table */ - if (isslave && replication_heartbeat == 1) { - time_t heartbeat; - char select_heartbeat_query[256] = ""; + /* Remove addition info */ + monitor_clear_pending_status(database, SERVER_SLAVE_OF_EXTERNAL_MASTER); - sprintf(select_heartbeat_query, "SELECT master_timestamp " - "FROM maxscale_schema.replication_heartbeat " - "WHERE maxscale_id = %lu AND master_server_id = %i", - id, handle->master_id); + /* Please note, the MASTER status and SERVER_SLAVE_OF_EXTERNAL_MASTER + * will be assigned in the monitorMain() via get_replication_tree() routine + */ - /* if there is a master then send the query to the slave with master_id*/ - if (handle->master_id >= 0 && (mysql_query(database->con, select_heartbeat_query) == 0 - && (result = mysql_store_result(database->con)) != NULL)) { - num_fields = mysql_num_fields(result); - - while ((row = mysql_fetch_row(result))) { - int rlag = -1; - time_t slave_read; - - heartbeat = time(0); - slave_read = strtoul(row[0], NULL, 10); - - if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) || (errno != 0 && slave_read == 0)) { - slave_read = 0; - } - - if (slave_read) { - /* set the replication lag */ - rlag = heartbeat - slave_read; - } - - /* set this node_ts as master_timestamp read from replication_heartbeat table */ - database->server->node_ts = slave_read; - - if (rlag >= 0) { - /* store rlag only if greater than monitor sampling interval */ - database->server->rlag = (rlag > (handle->interval / 1000)) ? rlag : 0; - } else { - database->server->rlag = -1; - } - - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "[mysql_mon]: replication heartbeat: " - "server %s:%i is %i seconds behind master", - database->server->name, - database->server->port, - database->server->rlag))); - } - mysql_free_result(result); - } else { - database->server->rlag = -1; - database->server->node_ts = 0; - - if (handle->master_id < 0) { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "[mysql_mon]: error: replication heartbeat: " - "master_server_id NOT available for %s:%i", - database->server->name, - database->server->port))); - } else { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "[mysql_mon]: error: replication heartbeat: " - "failed selecting from hearthbeat table of %s:%i : [%s], %s", - database->server->name, - database->server->port, - select_heartbeat_query, - mysql_error(database->con)))); - } - } - } - /** Store current status */ - if (ismaster) + /* Set the Slave Role */ + if (isslave) { - server_set_status(database->server, SERVER_MASTER); - server_clear_status(database->server, SERVER_SLAVE); - } - else if (isslave) - { - server_set_status(database->server, SERVER_SLAVE); - server_clear_status(database->server, SERVER_MASTER); - } - if (ismaster == 0 && isslave == 0) - { - server_clear_status(database->server, SERVER_SLAVE); - server_clear_status(database->server, SERVER_MASTER); + monitor_set_pending_status(database, SERVER_SLAVE); + /* Avoid any possible stale Master state */ + monitor_clear_pending_status(database, SERVER_MASTER); + } else { + /* Avoid any possible Master/Slave stale state */ + monitor_clear_pending_status(database, SERVER_SLAVE); + monitor_clear_pending_status(database, SERVER_MASTER); } } @@ -669,6 +533,9 @@ monitorMain(void *arg) { MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MONITOR_SERVERS *ptr; +int replication_heartbeat = handle->replicationHeartbeat; +int num_servers=0; +MONITOR_SERVERS *root_master; if (mysql_thread_init()) { @@ -691,11 +558,24 @@ MONITOR_SERVERS *ptr; ptr = handle->databases; while (ptr) { + /* copy server status into monitor pending_status */ + ptr->pending_status = ptr->server->status; + + /* monitor current node */ monitorDatabase(handle, ptr); + /* reset the slave list of current node */ + if (ptr->server->slaves) { + free(ptr->server->slaves); + } + /* create a new slave list */ + ptr->server->slaves = (long *) calloc(MONITOR_MAX_NUM_SLAVES, sizeof(long)); + + num_servers++; + if (mon_status_changed(ptr)) { - dcb_call_foreach(ptr->server, DCB_REASON_NOT_RESPONDING); + dcb_call_foreach(DCB_REASON_NOT_RESPONDING); } if (mon_status_changed(ptr) || @@ -718,8 +598,40 @@ MONITOR_SERVERS *ptr; /** Reset this server's error count */ ptr->mon_err_count = 0; } + ptr = ptr->next; } + + /* Compute the replication tree */ + root_master = get_replication_tree(handle, num_servers); + + /* Update server status from monitor pending status on that server*/ + + ptr = handle->databases; + while (ptr) + { + if (! SERVER_IN_MAINT(ptr->server)) { + ptr->server->status = ptr->pending_status; + } + ptr = ptr->next; + } + + /* Do now the heartbeat replication set/get for MySQL Replication Consistency */ + if (replication_heartbeat && root_master && (SERVER_IS_MASTER(root_master->server) || SERVER_IS_RELAY_SERVER(root_master->server))) { + set_master_heartbeat(handle, root_master); + ptr = handle->databases; + while (ptr) { + if( (! SERVER_IN_MAINT(ptr->server)) && SERVER_IS_RUNNING(ptr->server)) + { + if (ptr->server->node_id != root_master->server->node_id && (SERVER_IS_SLAVE(ptr->server) || SERVER_IS_RELAY_SERVER(ptr->server))) { + set_slave_heartbeat(handle, ptr); + } + } + ptr = ptr->next; + } + } + + /* wait for the configured interval */ thread_millisleep(handle->interval); } } @@ -798,3 +710,371 @@ static bool mon_print_fail_status( } return succp; } + +/** + * Fetch a MySQL node by node_id + * + * @param ptr The list of servers to monitor + * @param node_id The MySQL server_id to fetch + * @return The server with the required server_id + */ +static MONITOR_SERVERS * +getServerByNodeId(MONITOR_SERVERS *ptr, long node_id) { + SERVER *current; + while (ptr) + { + current = ptr->server; + if (current->node_id == node_id) { + return ptr; + } + ptr = ptr->next; + } + return NULL; +} + +/** + * Fetch a MySQL slave node from a node_id + * + * @param ptr The list of servers to monitor + * @param node_id The MySQL server_id to fetch + * @return The slave server of this node_id + */ +static MONITOR_SERVERS * +getSlaveOfNodeId(MONITOR_SERVERS *ptr, long node_id) { + SERVER *current; + while (ptr) + { + current = ptr->server; + if (current->master_id == node_id) { + return ptr; + } + ptr = ptr->next; + } + return NULL; +} + +/******* + * This function sets the replication heartbeat + * into the maxscale_schema.replication_heartbeat table in the current master. + * The inserted values will be seen from all slaves replication from this master. + * + * @param handle The monitor handle + * @param database The number database server + */ +static void set_master_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) { + unsigned long id = handle->id; + time_t heartbeat; + time_t purge_time; + char heartbeat_insert_query[128]=""; + char heartbeat_purge_query[128]=""; + + /* create the maxscale_schema database */ + if (mysql_query(database->con, "CREATE DATABASE IF NOT EXISTS maxscale_schema")) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error creating maxscale_schema database in Master server" + ": %s", mysql_error(database->con)))); + + database->server->rlag = -1; + } + + /* create repl_heartbeat table in maxscale_schema database */ + if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS " + "maxscale_schema.replication_heartbeat " + "(maxscale_id INT NOT NULL, " + "master_server_id INT NOT NULL, " + "master_timestamp INT UNSIGNED NOT NULL, " + "PRIMARY KEY ( master_server_id, maxscale_id ) ) " + "ENGINE=MYISAM DEFAULT CHARSET=latin1")) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error creating maxscale_schema.replication_heartbeat table in Master server" + ": %s", mysql_error(database->con)))); + + database->server->rlag = -1; + } + + /* auto purge old values after 48 hours*/ + purge_time = time(0) - (3600 * 48); + + sprintf(heartbeat_purge_query, "DELETE FROM maxscale_schema.replication_heartbeat WHERE master_timestamp < %lu", purge_time); + + if (mysql_query(database->con, heartbeat_purge_query)) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error deleting from maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_purge_query, + mysql_error(database->con)))); + } + + heartbeat = time(0); + + /* set node_ts for master as time(0) */ + database->server->node_ts = heartbeat; + + sprintf(heartbeat_insert_query, "UPDATE maxscale_schema.replication_heartbeat SET master_timestamp = %lu WHERE master_server_id = %li AND maxscale_id = %lu", heartbeat, handle->master->server->node_id, id); + + /* Try to insert MaxScale timestamp into master */ + if (mysql_query(database->con, heartbeat_insert_query)) { + + database->server->rlag = -1; + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error updating maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_insert_query, + mysql_error(database->con)))); + } else { + if (mysql_affected_rows(database->con) == 0) { + heartbeat = time(0); + sprintf(heartbeat_insert_query, "REPLACE INTO maxscale_schema.replication_heartbeat (master_server_id, maxscale_id, master_timestamp ) VALUES ( %li, %lu, %lu)", handle->master->server->node_id, id, heartbeat); + + if (mysql_query(database->con, heartbeat_insert_query)) { + + database->server->rlag = -1; + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: Error inserting into maxscale_schema.replication_heartbeat table: [%s], %s", + heartbeat_insert_query, + mysql_error(database->con)))); + } else { + /* Set replication lag to 0 for the master */ + database->server->rlag = 0; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: heartbeat table inserted data for %s:%i", database->server->name, database->server->port))); + } + } else { + /* Set replication lag as 0 for the master */ + database->server->rlag = 0; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: heartbeat table updated for Master %s:%i", database->server->name, database->server->port))); + } + } +} + +/******* + * This function gets the replication heartbeat + * from the maxscale_schema.replication_heartbeat table in the current slave + * and stores the timestamp and replication lag in the slave server struct + * + * @param handle The monitor handle + * @param database The number database server + */ +static void set_slave_heartbeat(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) { + unsigned long id = handle->id; + time_t heartbeat; + char select_heartbeat_query[256] = ""; + MYSQL_ROW row; + MYSQL_RES *result; + int num_fields; + + /* Get the master_timestamp value from maxscale_schema.replication_heartbeat table */ + + sprintf(select_heartbeat_query, "SELECT master_timestamp " + "FROM maxscale_schema.replication_heartbeat " + "WHERE maxscale_id = %lu AND master_server_id = %li", + id, handle->master->server->node_id); + + /* if there is a master then send the query to the slave with master_id */ + if (handle->master !=NULL && (mysql_query(database->con, select_heartbeat_query) == 0 + && (result = mysql_store_result(database->con)) != NULL)) { + num_fields = mysql_num_fields(result); + + while ((row = mysql_fetch_row(result))) { + int rlag = -1; + time_t slave_read; + + heartbeat = time(0); + slave_read = strtoul(row[0], NULL, 10); + + if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) || (errno != 0 && slave_read == 0)) { + slave_read = 0; + } + + if (slave_read) { + /* set the replication lag */ + rlag = heartbeat - slave_read; + } + + /* set this node_ts as master_timestamp read from replication_heartbeat table */ + database->server->node_ts = slave_read; + + if (rlag >= 0) { + /* store rlag only if greater than monitor sampling interval */ + database->server->rlag = (rlag > (handle->interval / 1000)) ? rlag : 0; + } else { + database->server->rlag = -1; + } + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "[mysql_mon]: replication heartbeat: " + "Slave %s:%i is %i seconds lag", + database->server->name, + database->server->port, + database->server->rlag))); + } + mysql_free_result(result); + } else { + database->server->rlag = -1; + database->server->node_ts = 0; + + if (handle->master->server->node_id < 0) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: error: replication heartbeat: " + "master_server_id NOT available for %s:%i", + database->server->name, + database->server->port))); + } else { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "[mysql_mon]: error: replication heartbeat: " + "failed selecting from hearthbeat table of %s:%i : [%s], %s", + database->server->name, + database->server->port, + select_heartbeat_query, + mysql_error(database->con)))); + } + } +} + +/******* + * This function computes the replication tree + * from a set of MySQL Master/Slave monitored servers + * and returns the root server with SERVER_MASTER bit + * + * @param handle The monitor handle + * @param num_servers The number of servers monitored + * @return The server at root level with SERVER_MASTER bit + */ + +static MONITOR_SERVERS *get_replication_tree(MYSQL_MONITOR *handle, int num_servers) { + MONITOR_SERVERS *ptr; + MONITOR_SERVERS *backend; + SERVER *current; + int depth=0; + long node_id; + int root_level; + + ptr = handle->databases; + root_level = num_servers; + + while (ptr) + { + if (SERVER_IN_MAINT(ptr->server) || SERVER_IS_DOWN(ptr->server)) { + ptr = ptr->next; + + continue; + } + depth = 0; + current = ptr->server; + + node_id = current->master_id; + if (node_id < 1) { + MONITOR_SERVERS *find_slave; + find_slave = getSlaveOfNodeId(handle->databases, current->node_id); + + if (find_slave == NULL) { + current->depth = -1; + ptr = ptr->next; + + continue; + } else { + current->depth = 0; + } + } else { + depth++; + } + + while(depth <= num_servers) { + /* set the root master at lowest depth level */ + if (current->depth > -1 && current->depth < root_level) { + root_level = current->depth; + handle->master = ptr; + } + backend = getServerByNodeId(handle->databases, node_id); + + if (backend) { + node_id = backend->server->master_id; + } else { + node_id = -1; + } + + if (node_id > 0) { + current->depth = depth + 1; + depth++; + + } else { + MONITOR_SERVERS *master; + current->depth = depth; + + master = getServerByNodeId(handle->databases, current->master_id); + if (master && master->server && master->server->node_id > 0) { + add_slave_to_master(master->server->slaves, MONITOR_MAX_NUM_SLAVES, current->node_id); + master->server->depth = current->depth -1; + monitor_set_pending_status(master, SERVER_MASTER); + } else { + if (current->master_id > 0) { + monitor_set_pending_status(ptr, SERVER_SLAVE_OF_EXTERNAL_MASTER); + } + } + break; + } + + } + + ptr = ptr->next; + } + + return handle->master; +} + +/******* + * This function add a slave id into the slaves server field + * of its master server + * + * @param slaves_list The slave list array of the master server + * @param list_size The size of the slave list + * @param node_id The node_id of the slave to be inserted + * @return 1 for inserted value and 0 otherwise + */ +static int add_slave_to_master(long *slaves_list, int list_size, long node_id) { + int i; + for (i = 0; i< list_size; i++) { + if (slaves_list[i] == 0) { + memcpy(&slaves_list[i], &node_id, sizeof(long)); + return 1; + } + } + return 0; +} + +/** + * Set a pending status bit in the monior server + * + * @param server The server to update + * @param bit The bit to clear for the server + */ +static void +monitor_set_pending_status(MONITOR_SERVERS *ptr, int bit) +{ + ptr->pending_status |= bit; +} + +/** + * Clear a pending status bit in the monior server + * + * @param server The server to update + * @param bit The bit to clear for the server + */ +static void +monitor_clear_pending_status(MONITOR_SERVERS *ptr, int bit) +{ + ptr->pending_status &= ~bit; +} diff --git a/server/modules/monitor/mysqlmon.h b/server/modules/monitor/mysqlmon.h index 5b5c7d04a..0e06db6e4 100644 --- a/server/modules/monitor/mysqlmon.h +++ b/server/modules/monitor/mysqlmon.h @@ -31,6 +31,7 @@ * 08/07/13 Mark Riddoch Initial implementation * 26/05/14 Massimiliano Pinto Default values for MONITOR_INTERVAL * 28/05/14 Massimiliano Pinto Addition of new fields in MYSQL_MONITOR struct + * 24/06/14 Massimiliano Pinto Addition of master field in MYSQL_MONITOR struct and MONITOR_MAX_NUM_SLAVES * * @endverbatim */ @@ -44,6 +45,7 @@ typedef struct monitor_servers { MYSQL *con; /**< The MySQL connection */ int mon_err_count; unsigned int mon_prev_status; + unsigned int pending_status; /**< Pending Status flag bitmap */ struct monitor_servers *next; /**< The next server in the list */ } MONITOR_SERVERS; @@ -52,17 +54,17 @@ typedef struct monitor_servers { * The handle for an instance of a MySQL Monitor module */ typedef struct { - SPINLOCK lock; /**< The monitor spinlock */ - pthread_t tid; /**< id of monitor thread */ - int shutdown; /**< Flag to shutdown the monitor thread */ - int status; /**< Monitor status */ - char *defaultUser; /**< Default username for monitoring */ - char *defaultPasswd; /**< Default password for monitoring */ - unsigned long interval; /**< Monitor sampling interval */ - unsigned long id; /**< Monitor ID */ + SPINLOCK lock; /**< The monitor spinlock */ + pthread_t tid; /**< id of monitor thread */ + int shutdown; /**< Flag to shutdown the monitor thread */ + int status; /**< Monitor status */ + char *defaultUser; /**< Default username for monitoring */ + char *defaultPasswd; /**< Default password for monitoring */ + unsigned long interval; /**< Monitor sampling interval */ + unsigned long id; /**< Monitor ID */ int replicationHeartbeat; /**< Monitor flag for MySQL replication heartbeat */ - int master_id; /**< Master server-id for MySQL Master/Slave replication */ - MONITOR_SERVERS *databases; /**< Linked list of servers to monitor */ + MONITOR_SERVERS *master; /**< Master server for MySQL Master/Slave replication */ + MONITOR_SERVERS *databases; /**< Linked list of servers to monitor */ } MYSQL_MONITOR; #define MONITOR_RUNNING 1 @@ -71,5 +73,6 @@ typedef struct { #define MONITOR_INTERVAL 10000 // in milliseconds #define MONITOR_DEFAULT_ID 1UL // unsigned long value +#define MONITOR_MAX_NUM_SLAVES 20 //number of MySQL slave servers associated to a MySQL master server #endif diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index fa2ac7477..0c4a4e678 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -65,6 +65,7 @@ * or take different actions such as open a new backend connection * 20/02/2014 Massimiliano Pinto If router_options=slave, route traffic to master if no slaves available * 06/03/2014 Massimiliano Pinto Server connection counter is now updated in closeSession + * 24/06/2014 Massimiliano Pinto New rules for selecting the Master server * 27/06/2014 Mark Riddoch Addition of server weighting * * @endverbatim @@ -140,6 +141,9 @@ static bool rses_begin_locked_router_action( static void rses_end_locked_router_action( ROUTER_CLIENT_SES* rses); +static BACKEND *get_root_master( + BACKEND **servers); + static SPINLOCK instlock; static ROUTER_INSTANCE *instances; @@ -345,7 +349,7 @@ ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES *client_rses; BACKEND *candidate = NULL; int i; -int master_host = -1; +BACKEND *master_host = NULL; LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, @@ -367,6 +371,11 @@ int master_host = -1; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; #endif + /** + * Find the Master host from available servers + */ + master_host = get_root_master(inst->servers); + /** * Find a backend server to connect to. This is the extent of the * load balancing algorithm we need to implement for this simple @@ -402,21 +411,32 @@ int master_host = -1; if (SERVER_IN_MAINT(inst->servers[i]->server)) continue; - /* - * If router_options=slave, get the running master - * It will be used if there are no running slaves at all - */ - if (inst->bitvalue == SERVER_SLAVE) { - if (master_host < 0 && (SERVER_IS_MASTER(inst->servers[i]->server))) { - master_host = i; - } - } - + /* Check server status bits against bitvalue from router_options */ if (inst->servers[i] && - SERVER_IS_RUNNING(inst->servers[i]->server) && - (inst->servers[i]->server->status & inst->bitmask) == - inst->bitvalue) + SERVER_IS_RUNNING(inst->servers[i]->server) && + (inst->servers[i]->server->status & inst->bitmask & inst->bitvalue)) { + if (master_host) { + if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_SLAVE)) { + /* skip root Master here, as it could also be slave of an external server + * that is not in the configuration. + * Intermediate masters (Relay Servers) are also slave and will be selected + * as Slave(s) + */ + + continue; + } + if (inst->servers[i] == master_host && (inst->bitvalue & SERVER_MASTER)) { + /* If option is "master" return only the root Master as there + * could be intermediate masters (Relay Servers) + * and they must not be selected. + */ + + candidate = master_host; + break; + } + } + /* If no candidate set, set first running server as our initial candidate server */ if (candidate == NULL) @@ -453,8 +473,8 @@ int master_host = -1; * Otherwise, just clean up and return NULL */ if (!candidate) { - if (master_host >= 0) { - candidate = inst->servers[master_host]; + if (master_host) { + candidate = master_host; } else { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -855,3 +875,50 @@ static uint8_t getCapabilities( { return 0; } + +/******************************** + * This routine return the root master server from MySQL replication tree + * Get the root Master rule: + * + * (1) find server(s) with lowest replication depth level + * (2) check for SERVER_MASTER bitvalue in those servers + * + * @param servers The list of servers + * @return The Master found + * + */ + +static BACKEND *get_root_master(BACKEND **servers) { + int i = 0; + BACKEND * master_host = NULL; + + /* (1) find root server(s) with lowest replication depth level */ + for (i = 0; servers[i]; i++) { + if (servers[i] && SERVER_IS_RUNNING(servers[i]->server)) { + if (master_host && servers[i]->server->depth < master_host->server->depth) { + master_host = servers[i]; + } else { + if (master_host == NULL) { + master_host = servers[i]; + } + } + } + } + + /* (2) get the status of server(s) with lowest replication level and check it against SERVER_MASTER bitvalue */ + if (master_host) { + int found = 0; + for (i = 0; servers[i]; i++) { + if (servers[i] && SERVER_IS_RUNNING(servers[i]->server) && (servers[i]->server->depth == master_host->server->depth)) { + if (servers[i]->server->status & SERVER_MASTER) { + master_host = servers[i]; + found = 1; + } + } + } + if (!found) + master_host = NULL; + } + + return master_host; +} diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index d00d8d7df..f7a7fc583 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -244,6 +244,10 @@ static bool handle_error_new_connection( GWBUF* errmsg); static bool handle_error_reply_client(SESSION* ses, GWBUF* errmsg); +static BACKEND *get_root_master( + backend_ref_t *servers, + int router_nservers); + static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -619,7 +623,8 @@ static void* newSession( } /** Copy backend pointers to router session. */ client_rses->rses_master_ref = master_ref; - ss_dassert(SERVER_IS_MASTER(master_ref->bref_backend->backend_server)); + /* assert with master_host */ + ss_dassert(master_ref && (master_ref->bref_backend->backend_server && SERVER_MASTER)); client_rses->rses_backend_ref = backend_ref; client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; @@ -800,6 +805,7 @@ static bool get_dcb( int smallest_nconn = -1; int i; bool succp = false; + BACKEND *master_host = NULL; CHK_CLIENT_RSES(rses); ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); @@ -810,13 +816,18 @@ static bool get_dcb( } backend_ref = rses->rses_backend_ref; + /* get root master from availbal servers */ + master_host = get_root_master(backend_ref, rses->rses_nbackends); + if (btype == BE_SLAVE) { for (i=0; irses_nbackends; i++) { BACKEND* b = backend_ref[i].bref_backend; + /* check slave bit, also for relay servers (Master & Servers) */ if (BREF_IS_IN_USE((&backend_ref[i])) && - SERVER_IS_SLAVE(b->backend_server) && + (SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) && + (master_host != NULL && b->backend_server != master_host->backend_server) && (smallest_nconn == -1 || b->backend_conn_count < smallest_nconn)) { @@ -835,11 +846,12 @@ static bool get_dcb( { *p_dcb = backend_ref->bref_dcb; succp = true; + ss_dassert(backend_ref->bref_dcb->state != DCB_STATE_ZOMBIE); ss_dassert( - SERVER_IS_MASTER(backend_ref->bref_backend->backend_server) && - smallest_nconn == -1); + (master_host && (backend_ref->bref_backend->backend_server == master_host->backend_server)) && + smallest_nconn == -1); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -858,8 +870,9 @@ static bool get_dcb( { BACKEND* b = backend_ref[i].bref_backend; + /* removed SERVER_IS_MASTER and use master_host */ if (BREF_IS_IN_USE((&backend_ref[i])) && - (SERVER_IS_MASTER(b->backend_server))) + (master_host && (b->backend_server == master_host->backend_server))) { *p_dcb = backend_ref[i].bref_dcb; succp = true; @@ -1550,6 +1563,7 @@ static bool select_connect_backend_servers( const int min_nslaves = 0; /*< not configurable at the time */ bool is_synced_master; int (*p)(const void *, const void *); + BACKEND *master_host = NULL; if (p_master_ref == NULL || backend_ref == NULL) { @@ -1557,7 +1571,10 @@ static bool select_connect_backend_servers( succp = false; goto return_succp; } - + + /* get the root Master */ + master_host = get_root_master(backend_ref, router_nservers); + /** Master is already chosen and connected. This is slave failure case */ if (*p_master_ref != NULL && BREF_IS_IN_USE((*p_master_ref))) @@ -1571,7 +1588,8 @@ static bool select_connect_backend_servers( master_found = true; master_connected = true; - ss_dassert(SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server)); + /* assert with master_host */ + ss_dassert(master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER); } /** New session or master failure case */ else @@ -1622,8 +1640,9 @@ static bool select_connect_backend_servers( b->backend_conn_count))); } #endif + /* assert with master_host */ ss_dassert(!master_connected || - SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server)); + (master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER)); /** * Sort the pointer list to servers according to connection counts. As * a consequence those backends having least connections are in the @@ -1688,13 +1707,15 @@ static bool select_connect_backend_servers( STRSRVSTATUS(b->backend_server), b->backend_conn_count, router->bitmask))); - + if (SERVER_IS_RUNNING(b->backend_server) && ((b->backend_server->status & router->bitmask) == router->bitvalue)) { + /* check also for relay servers and don't take the master_host */ if (slaves_found < max_nslaves && - SERVER_IS_SLAVE(b->backend_server)) + (SERVER_IS_SLAVE(b->backend_server) || SERVER_IS_RELAY_SERVER(b->backend_server)) && + (master_host != NULL && (b->backend_server != master_host->backend_server))) { slaves_found += 1; @@ -1752,7 +1773,8 @@ static bool select_connect_backend_servers( } } } - else if (SERVER_IS_MASTER(b->backend_server)) + /* take the master_host for master */ + else if (master_host && (b->backend_server == master_host->backend_server)) { *p_master_ref = &backend_ref[i]; @@ -1819,8 +1841,9 @@ static bool select_connect_backend_servers( b->backend_server->port, b->backend_conn_count))); } + /* assert with master_host */ ss_dassert(!master_connected || - SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server)); + (master_host && ((*p_master_ref)->bref_backend->backend_server == master_host->backend_server) && SERVER_MASTER)); #endif /** @@ -3175,7 +3198,6 @@ return_rc: return rc; } - static sescmd_cursor_t* backend_ref_get_sescmd_cursor ( backend_ref_t* bref) { @@ -3242,3 +3264,44 @@ static bool prep_stmt_drop( return true; } #endif /*< PREP_STMT_CACHING */ + + +static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers) { + int i = 0; + BACKEND * master_host = NULL; + + /* (1) find root server(s) with lowest replication depth level */ + for (i = 0; i< router_nservers; i++) { + BACKEND* b = NULL; + b = servers[i].bref_backend; + if (b && SERVER_IS_RUNNING(b->backend_server)) { + if (master_host && b->backend_server->depth < master_host->backend_server->depth) { + master_host = b; + } else { + if (master_host == NULL) { + master_host = b; + } + } + } + } + + /* (2) get the status of server(s) with lowest replication level and check it against SERVER_MASTER bitvalue */ + if (master_host) { + int found = 0; + for (i = 0; ibackend_server) && (b->backend_server->depth == master_host->backend_server->depth)) { + if (b->backend_server->status & SERVER_MASTER) { + master_host = b; + found = 1; + } + } + } + if (!found) + master_host = NULL; + } + + return master_host; +} +