Merge branch 'develop' into 1.2.1-binlog_router
Conflicts: server/core/server.c server/include/server.h server/modules/include/blr.h server/modules/routing/binlog/blr.c server/modules/routing/binlog/blr_file.c server/modules/routing/binlog/blr_master.c server/modules/routing/binlog/blr_slave.c
This commit is contained in:
@ -2,20 +2,20 @@ if(BUILD_TESTS)
|
||||
add_subdirectory(test)
|
||||
add_library(testroute SHARED testroute.c)
|
||||
target_link_libraries(testroute log_manager utils)
|
||||
install(TARGETS testroute DESTINATION modules)
|
||||
install(TARGETS testroute DESTINATION ${MAXSCALE_LIBDIR})
|
||||
endif()
|
||||
|
||||
add_library(readconnroute SHARED readconnroute.c)
|
||||
target_link_libraries(readconnroute log_manager utils)
|
||||
install(TARGETS readconnroute DESTINATION modules)
|
||||
install(TARGETS readconnroute DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
add_library(debugcli SHARED debugcli.c debugcmd.c)
|
||||
target_link_libraries(debugcli log_manager utils)
|
||||
install(TARGETS debugcli DESTINATION modules)
|
||||
install(TARGETS debugcli DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
add_library(cli SHARED cli.c debugcmd.c)
|
||||
target_link_libraries(cli log_manager utils)
|
||||
install(TARGETS cli DESTINATION modules)
|
||||
install(TARGETS cli DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
add_subdirectory(readwritesplit)
|
||||
add_subdirectory(schemarouter)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
add_library(binlogrouter SHARED blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c)
|
||||
set_target_properties(binlogrouter PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib)
|
||||
set_target_properties(binlogrouter PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${MAXSCALE_LIBDIR})
|
||||
target_link_libraries(binlogrouter ssl pthread log_manager)
|
||||
install(TARGETS binlogrouter DESTINATION modules)
|
||||
install(TARGETS binlogrouter DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
@ -35,6 +35,8 @@
|
||||
* 02/04/2014 Mark Riddoch Initial implementation
|
||||
* 17/02/2015 Massimiliano Pinto Addition of slave port and username in diagnostics
|
||||
* 18/02/2015 Massimiliano Pinto Addition of dcb_close in closeSession
|
||||
* 07/05/2015 Massimiliano Pinto Addition of MariaDB 10 compatibility support
|
||||
* 12/06/2015 Massimiliano Pinto Addition of MariaDB 10 events in diagnostics()
|
||||
* 29/06/2015 Massimiliano Pinto Addition of master.ini for easy startup configuration
|
||||
* If not found router goes into BLRM_UNCONFIGURED state.
|
||||
* Cache dir is 'cache' under router->binlogdir.
|
||||
@ -117,6 +119,9 @@ static void stats_func(void *);
|
||||
|
||||
static bool rses_begin_locked_router_action(ROUTER_SLAVE *);
|
||||
static void rses_end_locked_router_action(ROUTER_SLAVE *);
|
||||
void my_uuid_init(ulong seed1, ulong seed2);
|
||||
void my_uuid(char *guid);
|
||||
GWBUF *blr_cache_read_response(ROUTER_INSTANCE *router, char *response);
|
||||
|
||||
static SPINLOCK instlock;
|
||||
static ROUTER_INSTANCE *instances;
|
||||
@ -184,8 +189,45 @@ char path[PATH_MAX+1] = "";
|
||||
char filename[PATH_MAX+1] = "";
|
||||
int master_info = 0;
|
||||
int rc = 0;
|
||||
char *defuuid;
|
||||
|
||||
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
||||
if(service->credentials.name == NULL ||
|
||||
service->credentials.authdata == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"%s: Error: Service is missing user credentials."
|
||||
" Add the missing username or passwd parameter to the service.",
|
||||
service->name);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if(options == NULL || options[0] == NULL)
|
||||
{
|
||||
skygw_log_write(LE,
|
||||
"%s: Error: No router options supplied for binlogrouter",
|
||||
service->name);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* We only support one server behind this router, since the server is
|
||||
* the master from which we replicate binlog records. Therefore check
|
||||
* that only one server has been defined.
|
||||
*
|
||||
* A later improvement will be to define multiple servers and have the
|
||||
* router use the information that is supplied by the monitor to find
|
||||
* which of these servers is currently the master and replicate from
|
||||
* that server.
|
||||
*/
|
||||
if (service->dbref == NULL || service->dbref->next != NULL)
|
||||
{
|
||||
skygw_log_write(LE,
|
||||
"%s: Error : Exactly one database server may be "
|
||||
"for use with the binlog router.",
|
||||
service->name);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -215,6 +257,7 @@ int rc = 0;
|
||||
inst->retry_backoff = 1;
|
||||
inst->binlogdir = NULL;
|
||||
inst->heartbeat = 300; // Default is every 5 minutes
|
||||
inst->mariadb10_compat = false;
|
||||
|
||||
inst->user = strdup(service->credentials.name);
|
||||
inst->password = strdup(service->credentials.authdata);
|
||||
@ -234,22 +277,6 @@ int rc = 0;
|
||||
defuuid[12], defuuid[13], defuuid[14], defuuid[15]);
|
||||
}
|
||||
|
||||
/*
|
||||
* We only support one server behind this router, since the server is
|
||||
* the master from which we replicate binlog records. Therefore check
|
||||
* that only one server has been defined.
|
||||
*
|
||||
*/
|
||||
|
||||
if (service->dbref && service->dbref->next != NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Exactly one database server may be "
|
||||
"for use with the binlog router.")));
|
||||
/* report as error whether a server is defined in the service */
|
||||
}
|
||||
|
||||
/*
|
||||
* Process the options.
|
||||
* We have an array of attribute values passed to us that we must
|
||||
@ -302,6 +329,10 @@ int rc = 0;
|
||||
{
|
||||
inst->masterid = atoi(value);
|
||||
}
|
||||
else if (strcmp(options[i], "mariadb10-compatibility") == 0)
|
||||
{
|
||||
inst->mariadb10_compat = config_truth_value(value);
|
||||
}
|
||||
else if (strcmp(options[i], "filestem") == 0)
|
||||
{
|
||||
inst->fileroot = strdup(value);
|
||||
@ -376,7 +407,7 @@ int rc = 0;
|
||||
else
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR, "%s: No router options supplied for binlogrouter",
|
||||
LOGFILE_ERROR, "%s: Error: No router options supplied for binlogrouter",
|
||||
service->name)));
|
||||
}
|
||||
|
||||
@ -508,6 +539,7 @@ int rc = 0;
|
||||
/*
|
||||
* Read any cached response messages
|
||||
*/
|
||||
|
||||
blr_cache_read_master_data(inst);
|
||||
|
||||
/*
|
||||
@ -627,6 +659,7 @@ ROUTER_SLAVE *slave;
|
||||
strcpy(slave->binlogfile, "unassigned");
|
||||
slave->connect_time = time(0);
|
||||
slave->lastEventTimestamp = 0;
|
||||
slave->mariadb10_compat = false;
|
||||
|
||||
/**
|
||||
* Add this session to the list of active sessions.
|
||||
@ -810,6 +843,15 @@ static char *event_names[] = {
|
||||
"Anonymous GTID Event", "Previous GTIDS Event"
|
||||
};
|
||||
|
||||
/* New MariaDB event numbers starts from 0xa0 */
|
||||
static char *event_names_mariadb10[] = {
|
||||
"Annotate Rows Event",
|
||||
/* New MariaDB 10.x event numbers */
|
||||
"Binlog Checkpoint Event",
|
||||
"GTID Event",
|
||||
"GTID List Event"
|
||||
};
|
||||
|
||||
/**
|
||||
* Display an entry from the spinlock statistics data
|
||||
*
|
||||
@ -929,14 +971,31 @@ struct tm tm;
|
||||
buf);
|
||||
dcb_printf(dcb, "\t (%d seconds ago)\n",
|
||||
time(0) - router_inst->stats.lastReply);
|
||||
dcb_printf(dcb, "\tLast event from master: 0x%x, %s",
|
||||
|
||||
if (!router_inst->mariadb10_compat) {
|
||||
dcb_printf(dcb, "\tLast event from master: 0x%x, %s",
|
||||
router_inst->lastEventReceived,
|
||||
(router_inst->lastEventReceived >= 0 &&
|
||||
router_inst->lastEventReceived < 0x24) ?
|
||||
router_inst->lastEventReceived <= MAX_EVENT_TYPE) ?
|
||||
event_names[router_inst->lastEventReceived] : "unknown");
|
||||
} else {
|
||||
char *ptr = NULL;
|
||||
if (router_inst->lastEventReceived >= 0 && router_inst->lastEventReceived <= MAX_EVENT_TYPE) {
|
||||
ptr = event_names[router_inst->lastEventReceived];
|
||||
} else {
|
||||
/* Check MariaDB 10 new events */
|
||||
if (router_inst->lastEventReceived >= MARIADB_NEW_EVENTS_BEGIN && router_inst->lastEventReceived <= MAX_EVENT_TYPE_MARIADB10) {
|
||||
ptr = event_names_mariadb10[(router_inst->lastEventReceived - MARIADB_NEW_EVENTS_BEGIN)];
|
||||
}
|
||||
}
|
||||
|
||||
dcb_printf(dcb, "\tLast event from master: 0x%x, %s",
|
||||
router_inst->lastEventReceived, (ptr != NULL) ? ptr : "unknown");
|
||||
}
|
||||
|
||||
if (router_inst->lastEventTimestamp)
|
||||
{
|
||||
localtime_r(&router_inst->lastEventTimestamp, &tm);
|
||||
localtime_r((const time_t*)&router_inst->lastEventTimestamp, &tm);
|
||||
asctime_r(&tm, buf);
|
||||
dcb_printf(dcb, "\tLast binlog event timestamp: %ld (%s)\n",
|
||||
router_inst->lastEventTimestamp, buf);
|
||||
@ -946,11 +1005,17 @@ struct tm tm;
|
||||
if (router_inst->reconnect_pending)
|
||||
dcb_printf(dcb, "\tRouter pending reconnect to master\n");
|
||||
dcb_printf(dcb, "\tEvents received:\n");
|
||||
for (i = 0; i < 0x24; i++)
|
||||
for (i = 0; i <= MAX_EVENT_TYPE; i++)
|
||||
{
|
||||
dcb_printf(dcb, "\t\t%-38s %u\n", event_names[i], router_inst->stats.events[i]);
|
||||
}
|
||||
|
||||
if (router_inst->mariadb10_compat) {
|
||||
/* Display MariaDB 10 new events */
|
||||
for (i = MARIADB_NEW_EVENTS_BEGIN; i <= MAX_EVENT_TYPE_MARIADB10; i++)
|
||||
dcb_printf(dcb, "\t\tMariaDB 10 %-38s %u\n", event_names_mariadb10[(i - MARIADB_NEW_EVENTS_BEGIN)], router_inst->stats.events[i]);
|
||||
}
|
||||
|
||||
#if SPINLOCK_PROFILE
|
||||
dcb_printf(dcb, "\tSpinlock statistics (instlock):\n");
|
||||
spinlock_stats(&instlock, spin_reporter, dcb);
|
||||
@ -1058,7 +1123,7 @@ struct tm tm;
|
||||
if (session->lastEventTimestamp
|
||||
&& router_inst->lastEventTimestamp)
|
||||
{
|
||||
localtime_r(&session->lastEventTimestamp, &tm);
|
||||
localtime_r((const time_t*)&session->lastEventTimestamp, &tm);
|
||||
asctime_r(&tm, buf);
|
||||
dcb_printf(dcb, "\t\tLast binlog event timestamp %u, %s", session->lastEventTimestamp, buf);
|
||||
dcb_printf(dcb, "\t\tSeconds behind master %u\n", router_inst->lastEventTimestamp - session->lastEventTimestamp);
|
||||
@ -1156,7 +1221,8 @@ static void
|
||||
errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
|
||||
int error, len;
|
||||
int error;
|
||||
socklen_t len;
|
||||
char msg[85], *errmsg;
|
||||
unsigned long mysql_errno;
|
||||
|
||||
@ -1329,10 +1395,10 @@ unsigned long len;
|
||||
|
||||
snprintf(result, 1000,
|
||||
"Uptime: %u Threads: %u Events: %u Slaves: %u Master State: %s",
|
||||
time(0) - router->connect_time,
|
||||
config_threadcount(),
|
||||
router->stats.n_binlogs_ses,
|
||||
router->stats.n_slaves,
|
||||
(unsigned int)(time(0) - router->connect_time),
|
||||
(unsigned int)config_threadcount(),
|
||||
(unsigned int)router->stats.n_binlogs_ses,
|
||||
(unsigned int)router->stats.n_slaves,
|
||||
blrm_states[router->master_state]);
|
||||
if ((ret = gwbuf_alloc(4 + strlen(result))) == NULL)
|
||||
return 0;
|
||||
@ -1684,5 +1750,24 @@ int mkdir_rval;
|
||||
strncat(path, "/dbusers", PATH_MAX);
|
||||
|
||||
return dbusers_save(service->users, path);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a numeric field from a packet of the specified number of bits
|
||||
*
|
||||
* @param src The raw packet source
|
||||
* @param birs The number of bits to extract (multiple of 8)
|
||||
*/
|
||||
uint32_t
|
||||
extract_field(uint8_t *src, int bits)
|
||||
{
|
||||
uint32_t rval = 0, shift = 0;
|
||||
|
||||
while (bits > 0)
|
||||
{
|
||||
rval |= (*src++) << shift;
|
||||
shift += 8;
|
||||
bits -= 8;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
*
|
||||
* Date Who Description
|
||||
* 14/04/2014 Mark Riddoch Initial implementation
|
||||
* 07/05/2015 Massimiliano Pinto Added MAX_EVENT_TYPE_MARIADB10
|
||||
* 08/06/2015 Massimiliano Pinto Addition of blr_cache_read_master_data()
|
||||
* 15/06/2015 Massimiliano Pinto Addition of blr_file_get_next_binlogname()
|
||||
* 23/06/2015 Massimiliano Pinto Addition of blr_file_use_binlog, blr_file_create_binlog
|
||||
@ -50,7 +51,7 @@
|
||||
#include <blr.h>
|
||||
#include <dcb.h>
|
||||
#include <spinlock.h>
|
||||
|
||||
#include <gwdirs.h>
|
||||
#include <skygw_types.h>
|
||||
#include <skygw_utils.h>
|
||||
#include <log_manager.h>
|
||||
@ -62,7 +63,6 @@ extern __thread log_info_t tls_log_info;
|
||||
|
||||
static int blr_file_create(ROUTER_INSTANCE *router, char *file);
|
||||
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
|
||||
static uint32_t extract_field(uint8_t *src, int bits);
|
||||
static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
|
||||
void blr_cache_read_master_data(ROUTER_INSTANCE *router);
|
||||
int blr_file_get_next_binlogname(ROUTER_INSTANCE *router);
|
||||
@ -81,7 +81,7 @@ int blr_file_write_master_config(ROUTER_INSTANCE *router, char *error);
|
||||
int
|
||||
blr_file_init(ROUTER_INSTANCE *router)
|
||||
{
|
||||
char *ptr, path[PATH_MAX], filename[PATH_MAX];
|
||||
char *ptr, path[PATH_MAX+1], filename[PATH_MAX+1];
|
||||
int file_found, n = 1;
|
||||
int root_len, i;
|
||||
DIR *dirp;
|
||||
@ -89,12 +89,8 @@ struct dirent *dp;
|
||||
|
||||
if (router->binlogdir == NULL)
|
||||
{
|
||||
strcpy(path, "/usr/local/mariadb-maxscale");
|
||||
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
|
||||
{
|
||||
strncpy(path, ptr,PATH_MAX);
|
||||
}
|
||||
strncat(path, "/",PATH_MAX);
|
||||
strcpy(path, get_datadir());
|
||||
strncat(path,"/",PATH_MAX);
|
||||
strncat(path, router->service->name,PATH_MAX);
|
||||
|
||||
if (access(path, R_OK) == -1)
|
||||
@ -452,15 +448,26 @@ struct stat statb;
|
||||
hdr->next_pos = EXTRACT32(&hdbuf[13]);
|
||||
hdr->flags = EXTRACT16(&hdbuf[17]);
|
||||
|
||||
if (hdr->event_type > MAX_EVENT_TYPE)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Invalid event type 0x%x. "
|
||||
if (router->mariadb10_compat) {
|
||||
if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10) {
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Invalid MariaDB 10 event type 0x%x. "
|
||||
"Binlog file is %s, position %d",
|
||||
hdr->event_type,
|
||||
file->binlogname, pos)));
|
||||
return NULL;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
if (hdr->event_type > MAX_EVENT_TYPE) {
|
||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||
"Invalid event type 0x%x. "
|
||||
"Binlog file is %s, position %d",
|
||||
hdr->event_type,
|
||||
file->binlogname, pos)));
|
||||
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
@ -600,26 +607,6 @@ blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
|
||||
free(file);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a numeric field from a packet of the specified number of bits
|
||||
*
|
||||
* @param src The raw packet source
|
||||
* @param birs The number of bits to extract (multiple of 8)
|
||||
*/
|
||||
static uint32_t
|
||||
extract_field(uint8_t *src, int bits)
|
||||
{
|
||||
uint32_t rval = 0, shift = 0;
|
||||
|
||||
while (bits > 0)
|
||||
{
|
||||
rval |= (*src++) << shift;
|
||||
shift += 8;
|
||||
bits -= 8;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Log the event header of binlog event
|
||||
*
|
||||
@ -671,10 +658,10 @@ struct stat statb;
|
||||
void
|
||||
blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf)
|
||||
{
|
||||
char path[4097], *ptr;
|
||||
char path[PATH_MAX+1], *ptr;
|
||||
int fd;
|
||||
|
||||
strncpy(path, router->binlogdir, 4096);
|
||||
strncpy(path, router->binlogdir, PATH_MAX);
|
||||
strncat(path, "/cache", 4096);
|
||||
|
||||
if (access(path, R_OK) == -1) {
|
||||
@ -682,8 +669,8 @@ int fd;
|
||||
mkdir_ret = mkdir(path, 0700);
|
||||
}
|
||||
|
||||
strncat(path, "/", 4096);
|
||||
strncat(path, response, 4096);
|
||||
strncat(path, "/", PATH_MAX);
|
||||
strncat(path, response, PATH_MAX);
|
||||
|
||||
if ((fd = open(path, O_WRONLY|O_CREAT|O_TRUNC, 0666)) == -1)
|
||||
return;
|
||||
@ -707,14 +694,14 @@ GWBUF *
|
||||
blr_cache_read_response(ROUTER_INSTANCE *router, char *response)
|
||||
{
|
||||
struct stat statb;
|
||||
char path[4097], *ptr;
|
||||
char path[PATH_MAX+1], *ptr;
|
||||
int fd;
|
||||
GWBUF *buf;
|
||||
|
||||
strncpy(path, router->binlogdir, 4096);
|
||||
strncat(path, "/cache", 4096);
|
||||
strncat(path, "/", 4096);
|
||||
strncat(path, response, 4096);
|
||||
strncpy(path, router->binlogdir, PATH_MAX);
|
||||
strncat(path, "/cache", PATH_MAX);
|
||||
strncat(path, "/", PATH_MAX);
|
||||
strncat(path, response, PATH_MAX);
|
||||
|
||||
if ((fd = open(path, O_RDONLY)) == -1)
|
||||
return NULL;
|
||||
|
||||
@ -32,12 +32,13 @@
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* 02/04/2014 Mark Riddoch Initial implementation
|
||||
* 25/05/2015 Massimiliano Pinto Added BLRM_SLAVE_STOPPED state
|
||||
* 08/06/2015 Massimiliano Pinto Added m_errno and m_errmsg
|
||||
* 23/06/2015 Massimiliano Pinto Master communication goes into BLRM_SLAVE_STOPPED state
|
||||
* when an error is encountered in BLRM_BINLOGDUMP state.
|
||||
* Server error code and msg are reported via SHOW SLAVE STATUS
|
||||
* 02/04/2014 Mark Riddoch Initial implementation
|
||||
* 07/05/2015 Massimiliano Pinto Added MariaDB 10 Compatibility
|
||||
* 25/05/2015 Massimiliano Pinto Added BLRM_SLAVE_STOPPED state
|
||||
* 08/06/2015 Massimiliano Pinto Added m_errno and m_errmsg
|
||||
* 23/06/2015 Massimiliano Pinto Master communication goes into BLRM_SLAVE_STOPPED state
|
||||
* when an error is encountered in BLRM_BINLOGDUMP state.
|
||||
* Server error code and msg are reported via SHOW SLAVE STATUS
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -81,11 +82,11 @@ static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *
|
||||
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
|
||||
static void *CreateMySQLAuthData(char *username, char *password, char *database);
|
||||
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
|
||||
inline uint32_t extract_field(uint8_t *src, int bits);
|
||||
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
|
||||
void blr_master_close(ROUTER_INSTANCE *);
|
||||
static char *blr_extract_column(GWBUF *buf, int col);
|
||||
|
||||
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
static int keepalive = 1;
|
||||
|
||||
/**
|
||||
@ -96,8 +97,9 @@ static int keepalive = 1;
|
||||
* @param router The router instance
|
||||
*/
|
||||
void
|
||||
blr_start_master(ROUTER_INSTANCE *router)
|
||||
blr_start_master(void* data)
|
||||
{
|
||||
ROUTER_INSTANCE *router = (ROUTER_INSTANCE*)data;
|
||||
DCB *client;
|
||||
GWBUF *buf;
|
||||
|
||||
@ -488,11 +490,27 @@ char query[128];
|
||||
GWBUF_CONSUME_ALL(router->saved_master.chksum2);
|
||||
router->saved_master.chksum2 = buf;
|
||||
blr_cache_response(router, "chksum2", buf);
|
||||
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
|
||||
router->master_state = BLRM_GTIDMODE;
|
||||
|
||||
if (router->mariadb10_compat) {
|
||||
buf = blr_make_query("SET @mariadb_slave_capability=4");
|
||||
router->master_state = BLRM_MARIADB10;
|
||||
} else {
|
||||
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
|
||||
router->master_state = BLRM_GTIDMODE;
|
||||
}
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
}
|
||||
case BLRM_MARIADB10:
|
||||
// Response to the SET @mariadb_slave_capability=4, should be stored
|
||||
if (router->saved_master.mariadb10)
|
||||
GWBUF_CONSUME_ALL(router->saved_master.mariadb10);
|
||||
router->saved_master.mariadb10 = buf;
|
||||
blr_cache_response(router, "mariadb10", buf);
|
||||
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
|
||||
router->master_state = BLRM_MUUID;
|
||||
router->master->func.write(router->master, buf);
|
||||
break;
|
||||
case BLRM_GTIDMODE:
|
||||
// Response to the GTID_MODE, should be stored
|
||||
if (router->saved_master.gtid_mode)
|
||||
@ -763,7 +781,6 @@ int no_residual = 1;
|
||||
int preslen = -1;
|
||||
int prev_length = -1;
|
||||
int n_bufs = -1, pn_bufs = -1;
|
||||
static REP_HEADER phdr;
|
||||
|
||||
/*
|
||||
* Prepend any residual buffer to the buffer chain we have
|
||||
@ -950,7 +967,7 @@ static REP_HEADER phdr;
|
||||
}
|
||||
break;
|
||||
}
|
||||
phdr = hdr;
|
||||
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
/* set mysql errno to 0 */
|
||||
@ -961,6 +978,8 @@ static REP_HEADER phdr;
|
||||
free(router->m_errmsg);
|
||||
router->m_errmsg = NULL;
|
||||
|
||||
int event_limit;
|
||||
|
||||
/*
|
||||
* First check that the checksum we calculate matches the
|
||||
* checksum in the packet we received.
|
||||
@ -1001,8 +1020,11 @@ static REP_HEADER phdr;
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
|
||||
#endif
|
||||
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
|
||||
event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
|
||||
|
||||
if (hdr.event_type >= 0 && hdr.event_type <= event_limit)
|
||||
router->stats.events[hdr.event_type]++;
|
||||
|
||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
|
||||
{
|
||||
// Fake format description message
|
||||
@ -1224,26 +1246,6 @@ blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
|
||||
hdr->flags = EXTRACT16(&ptr[22]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a numeric field from a packet of the specified number of bits
|
||||
*
|
||||
* @param src The raw packet source
|
||||
* @param bits The number of bits to extract (multiple of 8)
|
||||
*/
|
||||
inline uint32_t
|
||||
extract_field(register uint8_t *src, int bits)
|
||||
{
|
||||
register uint32_t rval = 0, shift = 0;
|
||||
|
||||
while (bits > 0)
|
||||
{
|
||||
rval |= (*src++) << shift;
|
||||
shift += 8;
|
||||
bits -= 8;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a binlog rotate event.
|
||||
*
|
||||
@ -1313,8 +1315,8 @@ MYSQL_session *auth_info;
|
||||
|
||||
if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL)
|
||||
return NULL;
|
||||
strncpy(auth_info->user, username,MYSQL_USER_MAXLEN+1);
|
||||
strncpy(auth_info->db, database,MYSQL_DATABASE_MAXLEN+1);
|
||||
strncpy(auth_info->user, username,MYSQL_USER_MAXLEN);
|
||||
strncpy(auth_info->db, database,MYSQL_DATABASE_MAXLEN);
|
||||
gw_sha1_str((const uint8_t *)password, strlen(password), auth_info->client_sha1);
|
||||
|
||||
return auth_info;
|
||||
|
||||
@ -34,6 +34,8 @@
|
||||
* 18/02/2015 Massimiliano Pinto Addition of DISCONNECT ALL and DISCONNECT SERVER server_id
|
||||
* 18/03/2015 Markus Makela Better detection of CRC32 | NONE checksum
|
||||
* 19/03/2015 Massimiliano Pinto Addition of basic MariaDB 10 compatibility support
|
||||
* 07/05/2015 Massimiliano Pinto Added MariaDB 10 Compatibility
|
||||
* 11/05/2015 Massimiliano Pinto Only MariaDB 10 Slaves can register to binlog router with a MariaDB 10 Master
|
||||
* 25/05/2015 Massimiliano Pinto Addition of BLRM_SLAVE_STOPPED state and blr_start/stop_slave.
|
||||
* New commands STOP SLAVE, START SLAVE added.
|
||||
* 29/05/2015 Massimiliano Pinto Addition of CHANGE MASTER TO ...
|
||||
@ -48,6 +50,7 @@
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
@ -115,6 +118,8 @@ static void blr_master_restore_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG
|
||||
static void blr_master_set_empty_config(ROUTER_INSTANCE *router);
|
||||
static void blr_master_apply_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *prev_master);
|
||||
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
extern __thread log_info_t tls_log_info;
|
||||
@ -166,7 +171,28 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
slave->dcb->remote)));
|
||||
dcb_close(slave->dcb);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* If Master is MariaDB10 don't allow registration from
|
||||
* MariaDB/Mysql 5 Slaves
|
||||
*/
|
||||
|
||||
if (router->mariadb10_compat && !slave->mariadb10_compat) {
|
||||
slave->state = BLRS_ERRORED;
|
||||
blr_send_custom_error(slave->dcb, 1, 0,
|
||||
"MariaDB 10 Slave is required for Slave registration");
|
||||
|
||||
LOGIF(LE, (skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"%s: Slave %s: a MariaDB 10 Slave is required for Slave registration",
|
||||
router->service->name,
|
||||
slave->dcb->remote)));
|
||||
|
||||
dcb_close(slave->dcb);
|
||||
return 1;
|
||||
} else {
|
||||
/* Master and Slave version OK: continue with slave registration */
|
||||
return blr_slave_register(router, slave, queue);
|
||||
}
|
||||
break;
|
||||
@ -455,10 +481,17 @@ extern char *strcasestr();
|
||||
free(query_text);
|
||||
return blr_slave_replay(router, slave, router->saved_master.heartbeat);
|
||||
}
|
||||
else if (strcasecmp(word, "@mariadb_slave_capability") == 0)
|
||||
else if (strcasecmp(word, "@mariadb_slave_capability") == 0)
|
||||
{
|
||||
free(query_text);
|
||||
return blr_slave_send_ok(router, slave);
|
||||
/* mariadb10 compatibility is set for the slave */
|
||||
slave->mariadb10_compat=true;
|
||||
|
||||
free(query_text);
|
||||
if (router->mariadb10_compat) {
|
||||
return blr_slave_replay(router, slave, router->saved_master.mariadb10);
|
||||
} else {
|
||||
return blr_slave_send_ok(router, slave);
|
||||
}
|
||||
}
|
||||
else if (strcasecmp(word, "@master_binlog_checksum") == 0)
|
||||
{
|
||||
@ -1404,10 +1437,9 @@ blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
|
||||
{
|
||||
GWBUF *resp;
|
||||
uint8_t *ptr;
|
||||
int len, slen;
|
||||
int slen;
|
||||
|
||||
ptr = GWBUF_DATA(queue);
|
||||
len = extract_field(ptr, 24);
|
||||
ptr += 4; // Skip length and sequence number
|
||||
if (*ptr++ != COM_REGISTER_SLAVE)
|
||||
return 0;
|
||||
@ -1476,7 +1508,7 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
|
||||
{
|
||||
GWBUF *resp;
|
||||
uint8_t *ptr;
|
||||
int len, flags, serverid, rval, binlognamelen;
|
||||
int len, rval, binlognamelen;
|
||||
REP_HEADER hdr;
|
||||
uint32_t chksum;
|
||||
|
||||
@ -1504,9 +1536,7 @@ uint32_t chksum;
|
||||
|
||||
slave->binlog_pos = extract_field(ptr, 32);
|
||||
ptr += 4;
|
||||
flags = extract_field(ptr, 16);
|
||||
ptr += 2;
|
||||
serverid = extract_field(ptr, 32);
|
||||
ptr += 4;
|
||||
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
|
||||
slave->binlogfile[binlognamelen] = 0;
|
||||
@ -1588,28 +1618,6 @@ uint32_t chksum;
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract a numeric field from a packet of the specified number of bits,
|
||||
* the number of bits must be a multiple of 8.
|
||||
*
|
||||
* @param src The raw packet source
|
||||
* @param bits The number of bits to extract (multiple of 8)
|
||||
* @return The extracted value
|
||||
*/
|
||||
static uint32_t
|
||||
extract_field(uint8_t *src, int bits)
|
||||
{
|
||||
uint32_t rval = 0, shift = 0;
|
||||
|
||||
while (bits > 0)
|
||||
{
|
||||
rval |= (*src++) << shift;
|
||||
shift += 8;
|
||||
bits -= 8;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode a value into a number of bits in a MySQL packet
|
||||
*
|
||||
@ -1971,7 +1979,7 @@ int len = EXTRACT24(ptr + 9); // Extract the event length
|
||||
len = BINLOG_FNAMELEN;
|
||||
ptr += 19; // Skip header
|
||||
slave->binlog_pos = extract_field(ptr, 32);
|
||||
slave->binlog_pos += (extract_field(ptr+4, 32) << 32);
|
||||
slave->binlog_pos += (((uint64_t)extract_field(ptr+4, 32)) << 32);
|
||||
memcpy(slave->binlogfile, ptr + 8, len);
|
||||
slave->binlogfile[len] = 0;
|
||||
}
|
||||
@ -2006,6 +2014,9 @@ uint32_t chksum;
|
||||
|
||||
binlognamelen = strlen(slave->binlogfile);
|
||||
len = 19 + 8 + 4 + binlognamelen;
|
||||
/* no slave crc, remove 4 bytes */
|
||||
if (slave->nocrc)
|
||||
len -= 4;
|
||||
|
||||
/* no slave crc, remove 4 bytes */
|
||||
if (slave->nocrc)
|
||||
@ -2339,7 +2350,6 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
uint8_t *ptr;
|
||||
int len, seqno;
|
||||
GWBUF *pkt;
|
||||
int n = 1;
|
||||
|
||||
/* preparing output result */
|
||||
blr_slave_send_fieldcount(router, slave, 2);
|
||||
@ -2389,7 +2399,7 @@ blr_slave_disconnect_all(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
|
||||
strncpy((char *)ptr, state, strlen(state)); // Result string
|
||||
ptr += strlen(state);
|
||||
|
||||
n = slave->dcb->func.write(slave->dcb, pkt);
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
|
||||
/* force session close*/
|
||||
router_obj->closeSession(router->service->router_instance, sptr);
|
||||
|
||||
@ -43,6 +43,7 @@
|
||||
* 29/05/14 Mark Riddoch Add Filter support
|
||||
* 16/10/14 Mark Riddoch Add show eventq
|
||||
* 05/03/15 Massimiliano Pinto Added enable/disable feedback
|
||||
* 27/05/15 Martin Brampton Add show persistent [server]
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -154,6 +155,10 @@ struct subcommand showoptions[] = {
|
||||
"Show the monitors that are configured",
|
||||
"Show the monitors that are configured",
|
||||
{0, 0, 0} },
|
||||
{ "persistent", 1, dprintPersistentDCBs,
|
||||
"Show persistent pool for a named server, e.g. show persistent dbnode1",
|
||||
"Show persistent pool for a server, e.g. show persistent 0x485390. The address may also be replaced with the server name from the configuration file",
|
||||
{ARG_TYPE_SERVER, 0, 0} },
|
||||
{ "server", 1, dprintServer,
|
||||
"Show details for a named server, e.g. show server dbnode1",
|
||||
"Show details for a server, e.g. show server 0x485390. The address may also be repalced with the server name from the configuration file",
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
add_library(maxinfo SHARED maxinfo.c maxinfo_parse.c maxinfo_error.c maxinfo_exec.c)
|
||||
set_target_properties(maxinfo PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib)
|
||||
set_target_properties(maxinfo PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${MAXSCALE_LIBDIR})
|
||||
target_link_libraries(maxinfo pthread log_manager)
|
||||
install(TARGETS maxinfo DESTINATION modules)
|
||||
install(TARGETS maxinfo DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
@ -113,14 +113,10 @@ MAXINFO_TREE *col, *table;
|
||||
#endif
|
||||
default:
|
||||
*parse_error = PARSE_SYNTAX_ERROR;
|
||||
if (tree)
|
||||
free_tree(tree);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
*parse_error = PARSE_SYNTAX_ERROR;
|
||||
if (tree)
|
||||
free_tree(tree);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
@ -67,6 +67,7 @@
|
||||
* 06/03/2014 Massimiliano Pinto Server connection counter is now updated in closeSession
|
||||
* 24/06/2014 Massimiliano Pinto New rules for selecting the Master server
|
||||
* 27/06/2014 Mark Riddoch Addition of server weighting
|
||||
* 11/06/2015 Martin Brampton Remove decrement n_current (moved to dcb.c)
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -657,7 +658,6 @@ DCB* backend_dcb;
|
||||
if (rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
/* decrease server current connection counter */
|
||||
atomic_add(&router_cli_ses->backend->server->stats.n_current, -1);
|
||||
|
||||
backend_dcb = router_cli_ses->backend_dcb;
|
||||
router_cli_ses->backend_dcb = NULL;
|
||||
@ -723,16 +723,10 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
SERVER_IS_DOWN(router_cli_ses->backend->server))
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
LOGFILE_TRACE|LOGFILE_ERROR,
|
||||
"Error : Failed to route MySQL command %d to backend "
|
||||
"server.",
|
||||
mysql_command)));
|
||||
skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Failed to route MySQL command %d to backend "
|
||||
"server %s.",
|
||||
mysql_command,
|
||||
router_cli_ses->backend->server->unique_name);
|
||||
"server.%s",
|
||||
mysql_command,rses_is_closed ? " Session is closed." : "")));
|
||||
rc = 0;
|
||||
goto return_rc;
|
||||
|
||||
@ -833,12 +827,7 @@ clientReply(
|
||||
GWBUF *queue,
|
||||
DCB *backend_dcb)
|
||||
{
|
||||
DCB *client ;
|
||||
|
||||
client = backend_dcb->session->client;
|
||||
|
||||
ss_dassert(client != NULL);
|
||||
|
||||
ss_dassert(backend_dcb->session->client != NULL);
|
||||
SESSION_ROUTE_REPLY(backend_dcb->session, queue);
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
add_library(readwritesplit SHARED readwritesplit.c)
|
||||
target_link_libraries(readwritesplit ssl pthread log_manager utils query_classifier)
|
||||
install(TARGETS readwritesplit DESTINATION modules)
|
||||
install(TARGETS readwritesplit DESTINATION ${MAXSCALE_LIBDIR})
|
||||
if(BUILD_TESTS)
|
||||
add_subdirectory(test)
|
||||
endif()
|
||||
|
||||
@ -226,7 +226,7 @@ static rses_property_t* mysql_sescmd_get_property(
|
||||
static rses_property_t* rses_property_init(
|
||||
rses_property_type_t prop_type);
|
||||
|
||||
static void rses_property_add(
|
||||
static int rses_property_add(
|
||||
ROUTER_CLIENT_SES* rses,
|
||||
rses_property_t* prop);
|
||||
|
||||
@ -287,7 +287,7 @@ static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref);
|
||||
static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data);
|
||||
static bool handle_error_new_connection(
|
||||
ROUTER_INSTANCE* inst,
|
||||
ROUTER_CLIENT_SES* rses,
|
||||
ROUTER_CLIENT_SES** rses,
|
||||
DCB* backend_dcb,
|
||||
GWBUF* errmsg);
|
||||
static void handle_error_reply_client(
|
||||
@ -1022,7 +1022,6 @@ static void closeSession(
|
||||
*/
|
||||
dcb_close(dcb);
|
||||
/** decrease server current connection counters */
|
||||
atomic_add(&bref->bref_backend->backend_server->stats.n_current, -1);
|
||||
atomic_add(&bref->bref_backend->backend_conn_count, -1);
|
||||
}
|
||||
}
|
||||
@ -1244,7 +1243,8 @@ static bool get_dcb(
|
||||
SERVER_IS_SLAVE(b->backend_server) &&
|
||||
(max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
b->backend_server->rlag <= max_rlag)))
|
||||
b->backend_server->rlag <= max_rlag)) &&
|
||||
!rses->rses_config.master_reads)
|
||||
{
|
||||
/** found slave */
|
||||
candidate_bref = &backend_ref[i];
|
||||
@ -1636,7 +1636,8 @@ static skygw_query_type_t is_read_tmp_table(
|
||||
bool target_tmp_table = false;
|
||||
int tsize = 0, klen = 0,i;
|
||||
char** tbl = NULL;
|
||||
char *hkey,*dbname;
|
||||
char *dbname;
|
||||
char hkey[MYSQL_DATABASE_MAXLEN+MYSQL_TABLE_MAXLEN+2];
|
||||
MYSQL_session* data;
|
||||
|
||||
DCB* master_dcb = NULL;
|
||||
@ -1664,12 +1665,7 @@ static skygw_query_type_t is_read_tmp_table(
|
||||
/** Query targets at least one table */
|
||||
for(i = 0; i<tsize && !target_tmp_table && tbl[i]; i++)
|
||||
{
|
||||
klen = strlen(dbname) + strlen(tbl[i]) + 2;
|
||||
hkey = calloc(klen,sizeof(char));
|
||||
strcpy(hkey,dbname);
|
||||
strcat(hkey,".");
|
||||
strcat(hkey,tbl[i]);
|
||||
|
||||
sprintf(hkey,"%s.%s",dbname,tbl[i]);
|
||||
if (rses_prop_tmp &&
|
||||
rses_prop_tmp->rses_prop_data.temp_tables)
|
||||
{
|
||||
@ -1684,8 +1680,6 @@ static skygw_query_type_t is_read_tmp_table(
|
||||
"Query targets a temporary table: %s",hkey)));
|
||||
}
|
||||
}
|
||||
|
||||
free(hkey);
|
||||
}
|
||||
|
||||
}
|
||||
@ -2017,8 +2011,9 @@ static bool route_single_stmt(
|
||||
GWBUF* querybuf)
|
||||
{
|
||||
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
mysql_server_cmd_t packet_type;
|
||||
mysql_server_cmd_t packet_type = MYSQL_COM_UNDEFINED;
|
||||
uint8_t* packet;
|
||||
size_t packet_len;
|
||||
int ret = 0;
|
||||
DCB* master_dcb = NULL;
|
||||
DCB* target_dcb = NULL;
|
||||
@ -2026,11 +2021,8 @@ static bool route_single_stmt(
|
||||
bool succp = false;
|
||||
int rlag_max = MAX_RLAG_UNDEFINED;
|
||||
backend_type_t btype; /*< target backend type */
|
||||
|
||||
|
||||
|
||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||
packet = GWBUF_DATA(querybuf);
|
||||
packet_type = packet[4];
|
||||
|
||||
/**
|
||||
* Read stored master DCB pointer. If master is not set, routing must
|
||||
@ -2058,7 +2050,19 @@ static bool route_single_stmt(
|
||||
{
|
||||
querybuf = gwbuf_make_contiguous(querybuf);
|
||||
}
|
||||
|
||||
packet = GWBUF_DATA(querybuf);
|
||||
packet_len = gw_mysql_get_byte3(packet);
|
||||
|
||||
if(packet_len == 0)
|
||||
{
|
||||
route_target = TARGET_MASTER;
|
||||
packet_type = MYSQL_COM_UNDEFINED;
|
||||
}
|
||||
else
|
||||
{
|
||||
packet_type = packet[4];
|
||||
|
||||
switch(packet_type) {
|
||||
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
||||
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
||||
@ -2273,7 +2277,7 @@ static bool route_single_stmt(
|
||||
}
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
}
|
||||
/** Lock router session */
|
||||
if (!rses_begin_locked_router_action(rses))
|
||||
{
|
||||
@ -2493,8 +2497,8 @@ static bool route_single_stmt(
|
||||
rses_end_locked_router_action(rses);
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
|
||||
GWBUF* wbuf = gwbuf_clone(querybuf);
|
||||
if ((ret = target_dcb->func.write(target_dcb, wbuf)) == 1)
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
|
||||
@ -2508,7 +2512,8 @@ static bool route_single_stmt(
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
gwbuf_free(wbuf);
|
||||
LOGIF((LE|LT), (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Routing query failed.")));
|
||||
succp = false;
|
||||
@ -2839,8 +2844,8 @@ static void clientReply (
|
||||
/** There is one pending session command to be executed. */
|
||||
if (sescmd_cursor_is_active(scur))
|
||||
{
|
||||
bool succp;
|
||||
|
||||
bool succp;
|
||||
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Backend %s:%d processed reply and starts to execute "
|
||||
@ -2849,8 +2854,15 @@ static void clientReply (
|
||||
bref->bref_backend->backend_server->port)));
|
||||
|
||||
succp = execute_sescmd_in_backend(bref);
|
||||
|
||||
ss_dassert(succp);
|
||||
ss_dassert(succp);
|
||||
if(!succp)
|
||||
{
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"Backend %s:%d failed to execute session command.",
|
||||
bref->bref_backend->backend_server->name,
|
||||
bref->bref_backend->backend_server->port)));
|
||||
}
|
||||
}
|
||||
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
|
||||
{
|
||||
@ -2942,6 +2954,11 @@ static void bref_clear_state(
|
||||
backend_ref_t* bref,
|
||||
bref_state_t state)
|
||||
{
|
||||
if(bref == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
if (state != BREF_WAITING_RESULT)
|
||||
{
|
||||
bref->bref_state &= ~state;
|
||||
@ -2963,6 +2980,13 @@ static void bref_clear_state(
|
||||
prev2 = atomic_add(
|
||||
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
|
||||
ss_dassert(prev2 > 0);
|
||||
if(prev2 <= 0)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
|
||||
__FUNCTION__,
|
||||
&bref->bref_backend->backend_server->name,
|
||||
&bref->bref_backend->backend_server->port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2971,6 +2995,11 @@ static void bref_set_state(
|
||||
backend_ref_t* bref,
|
||||
bref_state_t state)
|
||||
{
|
||||
if(bref == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
if (state != BREF_WAITING_RESULT)
|
||||
{
|
||||
bref->bref_state |= state;
|
||||
@ -2983,11 +3012,24 @@ static void bref_set_state(
|
||||
/** Increase waiter count */
|
||||
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
|
||||
ss_dassert(prev1 >= 0);
|
||||
|
||||
if(prev1 < 0)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: negative number of connections waiting for results in backend %s:%u",
|
||||
__FUNCTION__,
|
||||
&bref->bref_backend->backend_server->name,
|
||||
&bref->bref_backend->backend_server->port);
|
||||
}
|
||||
/** Increase global operation count */
|
||||
prev2 = atomic_add(
|
||||
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
|
||||
ss_dassert(prev2 >= 0);
|
||||
ss_dassert(prev2 >= 0);
|
||||
if(prev2 < 0)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
|
||||
__FUNCTION__,
|
||||
&bref->bref_backend->backend_server->name,
|
||||
&bref->bref_backend->backend_server->port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -3534,7 +3576,8 @@ static rses_property_t* rses_property_init(
|
||||
prop = (rses_property_t*)calloc(1, sizeof(rses_property_t));
|
||||
if (prop == NULL)
|
||||
{
|
||||
goto return_prop;
|
||||
skygw_log_write(LE,"Error: Malloc returned NULL. (%s:%d)",__FILE__,__LINE__);
|
||||
return NULL;
|
||||
}
|
||||
prop->rses_prop_type = prop_type;
|
||||
#if defined(SS_DEBUG)
|
||||
@ -3542,7 +3585,6 @@ static rses_property_t* rses_property_init(
|
||||
prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
|
||||
#endif
|
||||
|
||||
return_prop:
|
||||
CHK_RSES_PROP(prop);
|
||||
return prop;
|
||||
}
|
||||
@ -3553,6 +3595,11 @@ return_prop:
|
||||
static void rses_property_done(
|
||||
rses_property_t* prop)
|
||||
{
|
||||
if(prop == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
CHK_RSES_PROP(prop);
|
||||
|
||||
switch (prop->rses_prop_type) {
|
||||
@ -3586,10 +3633,20 @@ static void rses_property_done(
|
||||
*
|
||||
* Router client session must be locked.
|
||||
*/
|
||||
static void rses_property_add(
|
||||
static int rses_property_add(
|
||||
ROUTER_CLIENT_SES* rses,
|
||||
rses_property_t* prop)
|
||||
{
|
||||
if(rses == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"Error: Router client session is NULL. (%s:%d)",__FILE__,__LINE__);
|
||||
return -1;
|
||||
}
|
||||
if(prop == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"Error: Router client session property is NULL. (%s:%d)",__FILE__,__LINE__);
|
||||
return -1;
|
||||
}
|
||||
rses_property_t* p;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
@ -3611,6 +3668,7 @@ static void rses_property_add(
|
||||
}
|
||||
p->rses_prop_next = prop;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3621,7 +3679,13 @@ static mysql_sescmd_t* rses_property_get_sescmd(
|
||||
rses_property_t* prop)
|
||||
{
|
||||
mysql_sescmd_t* sescmd;
|
||||
|
||||
|
||||
if(prop == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
CHK_RSES_PROP(prop);
|
||||
ss_dassert(prop->rses_prop_rsession == NULL ||
|
||||
SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock));
|
||||
@ -3634,22 +3698,6 @@ static mysql_sescmd_t* rses_property_get_sescmd(
|
||||
}
|
||||
return sescmd;
|
||||
}
|
||||
|
||||
/**
|
||||
static void rses_begin_locked_property_action(
|
||||
rses_property_t* prop)
|
||||
{
|
||||
CHK_RSES_PROP(prop);
|
||||
spinlock_acquire(&prop->rses_prop_lock);
|
||||
}
|
||||
|
||||
static void rses_end_locked_property_action(
|
||||
rses_property_t* prop)
|
||||
{
|
||||
CHK_RSES_PROP(prop);
|
||||
spinlock_release(&prop->rses_prop_lock);
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Create session command property.
|
||||
@ -3682,6 +3730,11 @@ static mysql_sescmd_t* mysql_sescmd_init (
|
||||
static void mysql_sescmd_done(
|
||||
mysql_sescmd_t* sescmd)
|
||||
{
|
||||
if(sescmd == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
CHK_RSES_PROP(sescmd->my_sescmd_prop);
|
||||
gwbuf_free(sescmd->my_sescmd_buf);
|
||||
memset(sescmd, 0, sizeof(mysql_sescmd_t));
|
||||
@ -3764,7 +3817,7 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
dcb_close(bref->bref_dcb);
|
||||
*reconnect = true;
|
||||
if(replybuf)
|
||||
gwbuf_consume(replybuf,gwbuf_length(replybuf));
|
||||
while((replybuf = gwbuf_consume(replybuf,gwbuf_length(replybuf))));
|
||||
}
|
||||
}
|
||||
/** This is a response from the master and it is the "right" one.
|
||||
@ -3777,7 +3830,7 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
/** Mark the rest session commands as replied */
|
||||
scmd->my_sescmd_is_replied = true;
|
||||
scmd->reply_cmd = *((unsigned char*)replybuf->start + 4);
|
||||
skygw_log_write(LOGFILE_DEBUG,"Master '%s' responded to a session command.",
|
||||
skygw_log_write(LT,"Master '%s' responded to a session command.",
|
||||
bref->bref_backend->backend_server->unique_name);
|
||||
int i;
|
||||
|
||||
@ -3797,6 +3850,11 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
if(ses->rses_backend_ref[i].bref_dcb)
|
||||
dcb_close(ses->rses_backend_ref[i].bref_dcb);
|
||||
*reconnect = true;
|
||||
skygw_log_write(LT,"Disabling slave %s:%d, result differs from master's result. Master: %d Slave: %d",
|
||||
ses->rses_backend_ref[i].bref_backend->backend_server->name,
|
||||
ses->rses_backend_ref[i].bref_backend->backend_server->port,
|
||||
bref->reply_cmd,
|
||||
ses->rses_backend_ref[i].reply_cmd);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3804,11 +3862,17 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
}
|
||||
else
|
||||
{
|
||||
skygw_log_write(LOGFILE_DEBUG,"Slave '%s' responded faster to a session command.",
|
||||
bref->bref_backend->backend_server->unique_name);
|
||||
skygw_log_write(LT,"Slave '%s' responded before master to a session command. Result: %d",
|
||||
bref->bref_backend->backend_server->unique_name,
|
||||
(int)bref->reply_cmd);
|
||||
if(bref->reply_cmd == 0xff)
|
||||
{
|
||||
SERVER* serv = bref->bref_backend->backend_server;
|
||||
skygw_log_write(LE,"Error: Slave '%s' (%s:%u) failed to execute session command.",
|
||||
serv->unique_name,serv->name,serv->port);
|
||||
}
|
||||
if(replybuf)
|
||||
gwbuf_free(replybuf);
|
||||
return NULL;
|
||||
while((replybuf = gwbuf_consume(replybuf,gwbuf_length(replybuf))));
|
||||
}
|
||||
|
||||
|
||||
@ -3854,6 +3918,12 @@ static bool sescmd_cursor_is_active(
|
||||
sescmd_cursor_t* sescmd_cursor)
|
||||
{
|
||||
bool succp;
|
||||
|
||||
if(sescmd_cursor == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return false;
|
||||
}
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock));
|
||||
|
||||
succp = sescmd_cursor->scmd_cur_active;
|
||||
@ -3879,9 +3949,14 @@ static GWBUF* sescmd_cursor_clone_querybuf(
|
||||
sescmd_cursor_t* scur)
|
||||
{
|
||||
GWBUF* buf;
|
||||
if(scur == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return NULL;
|
||||
}
|
||||
ss_dassert(scur->scmd_cur_cmd != NULL);
|
||||
|
||||
buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf);
|
||||
buf = gwbuf_clone_all(scur->scmd_cur_cmd->my_sescmd_buf);
|
||||
|
||||
CHK_GWBUF(buf);
|
||||
return buf;
|
||||
@ -3891,7 +3966,12 @@ static bool sescmd_cursor_history_empty(
|
||||
sescmd_cursor_t* scur)
|
||||
{
|
||||
bool succp;
|
||||
|
||||
|
||||
if(scur == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return true;
|
||||
}
|
||||
CHK_SESCMD_CUR(scur);
|
||||
|
||||
if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL)
|
||||
@ -3911,6 +3991,11 @@ static void sescmd_cursor_reset(
|
||||
sescmd_cursor_t* scur)
|
||||
{
|
||||
ROUTER_CLIENT_SES* rses;
|
||||
if(scur == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
CHK_SESCMD_CUR(scur);
|
||||
CHK_CLIENT_RSES(scur->scmd_cur_rses);
|
||||
rses = scur->scmd_cur_rses;
|
||||
@ -3927,6 +4012,11 @@ static bool execute_sescmd_history(
|
||||
{
|
||||
bool succp;
|
||||
sescmd_cursor_t* scur;
|
||||
if(bref == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return false;
|
||||
}
|
||||
CHK_BACKEND_REF(bref);
|
||||
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
@ -3962,7 +4052,12 @@ static bool execute_sescmd_in_backend(
|
||||
bool succp;
|
||||
int rc = 0;
|
||||
sescmd_cursor_t* scur;
|
||||
|
||||
GWBUF* buf;
|
||||
if(backend_ref == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return false;
|
||||
}
|
||||
if (BREF_IS_CLOSED(backend_ref))
|
||||
{
|
||||
succp = false;
|
||||
@ -3994,36 +4089,17 @@ static bool execute_sescmd_in_backend(
|
||||
/** Cursor is left active when function returns. */
|
||||
sescmd_cursor_set_active(scur, true);
|
||||
}
|
||||
#if defined(SS_DEBUG)
|
||||
LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses,
|
||||
"execute_sescmd_in_backend",
|
||||
backend_ref,
|
||||
sescmd_cursor_clone_querybuf(scur)));
|
||||
|
||||
{
|
||||
GWBUF* tmpbuf = sescmd_cursor_clone_querybuf(scur);
|
||||
uint8_t* ptr = GWBUF_DATA(tmpbuf);
|
||||
unsigned char cmd = MYSQL_GET_COMMAND(ptr);
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [execute_sescmd_in_backend] Just before write, fd "
|
||||
"%d : cmd %s.",
|
||||
pthread_self(),
|
||||
dcb->fd,
|
||||
STRPACKETTYPE(cmd))));
|
||||
gwbuf_free(tmpbuf);
|
||||
}
|
||||
#endif /*< SS_DEBUG */
|
||||
switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
|
||||
case MYSQL_COM_CHANGE_USER:
|
||||
/** This makes it possible to handle replies correctly */
|
||||
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
|
||||
buf = sescmd_cursor_clone_querybuf(scur);
|
||||
rc = dcb->func.auth(
|
||||
dcb,
|
||||
NULL,
|
||||
dcb->session,
|
||||
sescmd_cursor_clone_querybuf(scur));
|
||||
buf);
|
||||
break;
|
||||
|
||||
case MYSQL_COM_INIT_DB:
|
||||
@ -4049,10 +4125,12 @@ static bool execute_sescmd_in_backend(
|
||||
* Mark session command buffer, it triggers writing
|
||||
* MySQL command to protocol
|
||||
*/
|
||||
|
||||
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
|
||||
buf = sescmd_cursor_clone_querybuf(scur);
|
||||
rc = dcb->func.write(
|
||||
dcb,
|
||||
sescmd_cursor_clone_querybuf(scur));
|
||||
buf);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -4062,6 +4140,7 @@ static bool execute_sescmd_in_backend(
|
||||
}
|
||||
else
|
||||
{
|
||||
while((buf = GWBUF_CONSUME_ALL(buf)) != NULL);
|
||||
succp = false;
|
||||
}
|
||||
return_succp:
|
||||
@ -4083,6 +4162,12 @@ static bool sescmd_cursor_next(
|
||||
rses_property_t* prop_curr;
|
||||
rses_property_t* prop_next;
|
||||
|
||||
if(scur == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return false;
|
||||
}
|
||||
|
||||
ss_dassert(scur != NULL);
|
||||
ss_dassert(*(scur->scmd_cur_ptr_property) != NULL);
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(
|
||||
@ -4409,11 +4494,21 @@ static bool route_session_write(
|
||||
* prevent it from being released before properties
|
||||
* are cleaned up as a part of router sessionclean-up.
|
||||
*/
|
||||
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
|
||||
if((prop = rses_property_init(RSES_PROP_TYPE_SESCMD)) == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"Error: Router session property initialization failed");
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return false;
|
||||
}
|
||||
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
|
||||
|
||||
/** Add sescmd property to router client session */
|
||||
rses_property_add(router_cli_ses, prop);
|
||||
if(rses_property_add(router_cli_ses, prop) != 0)
|
||||
{
|
||||
skygw_log_write(LE,"Error: Session property addition failed.");
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
return false;
|
||||
}
|
||||
|
||||
for (i=0; i<router_cli_ses->rses_nbackends; i++)
|
||||
{
|
||||
@ -4536,7 +4631,10 @@ static void rwsplit_process_router_options(
|
||||
int i;
|
||||
char* value;
|
||||
select_criteria_t c;
|
||||
|
||||
|
||||
if(options == NULL)
|
||||
return;
|
||||
|
||||
for (i = 0; options[i]; i++)
|
||||
{
|
||||
if ((value = strchr(options[i], '=')) == NULL)
|
||||
@ -4589,6 +4687,10 @@ static void rwsplit_process_router_options(
|
||||
{
|
||||
router->rwsplit_config.disable_slave_recovery = config_truth_value(value);
|
||||
}
|
||||
else if(strcmp(options[i],"master_accept_reads") == 0)
|
||||
{
|
||||
router->rwsplit_config.master_reads = config_truth_value(value);
|
||||
}
|
||||
}
|
||||
} /*< for */
|
||||
}
|
||||
@ -4620,7 +4722,7 @@ static void handleError (
|
||||
SESSION* session;
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
|
||||
|
||||
|
||||
CHK_DCB(backend_dcb);
|
||||
|
||||
/** Reset error handle flag from a given DCB */
|
||||
@ -4687,14 +4789,15 @@ static void handleError (
|
||||
{
|
||||
/**
|
||||
* This is called in hope of getting replacement for
|
||||
* failed slave(s).
|
||||
* failed slave(s). This call may free rses.
|
||||
*/
|
||||
*succp = handle_error_new_connection(inst,
|
||||
rses,
|
||||
&rses,
|
||||
backend_dcb,
|
||||
errmsgbuf);
|
||||
}
|
||||
rses_end_locked_router_action(rses);
|
||||
/* Free the lock if rses still exists */
|
||||
if (rses) rses_end_locked_router_action(rses);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -4763,10 +4866,11 @@ static void handle_error_reply_client(
|
||||
*/
|
||||
static bool handle_error_new_connection(
|
||||
ROUTER_INSTANCE* inst,
|
||||
ROUTER_CLIENT_SES* rses,
|
||||
ROUTER_CLIENT_SES** rses,
|
||||
DCB* backend_dcb,
|
||||
GWBUF* errmsg)
|
||||
{
|
||||
ROUTER_CLIENT_SES* myrses;
|
||||
SESSION* ses;
|
||||
int router_nservers;
|
||||
int max_nslaves;
|
||||
@ -4774,7 +4878,8 @@ static bool handle_error_new_connection(
|
||||
backend_ref_t* bref;
|
||||
bool succp;
|
||||
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
|
||||
myrses = *rses;
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&myrses->rses_lock));
|
||||
|
||||
ses = backend_dcb->session;
|
||||
CHK_SESSION(ses);
|
||||
@ -4782,7 +4887,7 @@ static bool handle_error_new_connection(
|
||||
/**
|
||||
* If bref == NULL it has been replaced already with another one.
|
||||
*/
|
||||
if ((bref = get_bref_from_dcb(rses, backend_dcb)) == NULL)
|
||||
if ((bref = get_bref_from_dcb(myrses, backend_dcb)) == NULL)
|
||||
{
|
||||
succp = true;
|
||||
goto return_succp;
|
||||
@ -4825,25 +4930,25 @@ static bool handle_error_new_connection(
|
||||
(void *)bref);
|
||||
|
||||
router_nservers = router_get_servercount(inst);
|
||||
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
|
||||
max_slave_rlag = rses_get_max_replication_lag(rses);
|
||||
max_nslaves = rses_get_max_slavecount(myrses, router_nservers);
|
||||
max_slave_rlag = rses_get_max_replication_lag(myrses);
|
||||
/**
|
||||
* Try to get replacement slave or at least the minimum
|
||||
* number of slave connections for router session.
|
||||
*/
|
||||
if(inst->rwsplit_config.disable_slave_recovery)
|
||||
{
|
||||
succp = have_enough_servers(&rses,1,router_nservers,inst) ? true : false;
|
||||
succp = have_enough_servers(&myrses,1,router_nservers,inst) ? true : false;
|
||||
}
|
||||
else
|
||||
{
|
||||
succp = select_connect_backend_servers(
|
||||
&rses->rses_master_ref,
|
||||
rses->rses_backend_ref,
|
||||
&myrses->rses_master_ref,
|
||||
myrses->rses_backend_ref,
|
||||
router_nservers,
|
||||
max_nslaves,
|
||||
max_slave_rlag,
|
||||
rses->rses_config.rw_slave_select_criteria,
|
||||
myrses->rses_config.rw_slave_select_criteria,
|
||||
ses,
|
||||
inst);
|
||||
}
|
||||
@ -5080,10 +5185,9 @@ static int router_handle_state_switch(
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
int rc = 1;
|
||||
ROUTER_CLIENT_SES* rses;
|
||||
SESSION* ses;
|
||||
SERVER* srv;
|
||||
|
||||
ROUTER_CLIENT_SES* rses;
|
||||
SESSION* ses;
|
||||
CHK_DCB(dcb);
|
||||
bref = (backend_ref_t *)data;
|
||||
CHK_BACKEND_REF(bref);
|
||||
@ -5102,11 +5206,8 @@ static int router_handle_state_switch(
|
||||
srv->name,
|
||||
srv->port,
|
||||
STRSRVSTATUS(srv))));
|
||||
ses = dcb->session;
|
||||
CHK_SESSION(ses);
|
||||
|
||||
rses = (ROUTER_CLIENT_SES *)dcb->session->router_session;
|
||||
CHK_CLIENT_RSES(rses);
|
||||
CHK_SESSION(((SESSION*)dcb->session));
|
||||
CHK_CLIENT_RSES(((ROUTER_CLIENT_SES *)dcb->session->router_session));
|
||||
|
||||
switch (reason) {
|
||||
case DCB_REASON_NOT_RESPONDING:
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
add_library(schemarouter SHARED schemarouter.c sharding_common.c)
|
||||
target_link_libraries(schemarouter log_manager utils query_classifier)
|
||||
install(TARGETS schemarouter DESTINATION modules)
|
||||
install(TARGETS schemarouter DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
add_library(shardrouter SHARED shardrouter.c svcconn.c sharding_common.c)
|
||||
target_link_libraries(shardrouter log_manager utils query_classifier)
|
||||
install(TARGETS shardrouter DESTINATION modules)
|
||||
install(TARGETS shardrouter DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
if(BUILD_TESTS)
|
||||
add_subdirectory(test)
|
||||
|
||||
@ -1153,7 +1153,6 @@ static void closeSession(
|
||||
*/
|
||||
dcb_close(dcb);
|
||||
/** decrease server current connection counters */
|
||||
atomic_add(&bref->bref_backend->backend_server->stats.n_current, -1);
|
||||
atomic_add(&bref->bref_backend->backend_conn_count, -1);
|
||||
}
|
||||
}
|
||||
@ -2854,11 +2853,16 @@ int bref_cmp_current_load(
|
||||
return ((1000 * s1->stats.n_current_ops) - b1->weight)
|
||||
- ((1000 * s2->stats.n_current_ops) - b2->weight);
|
||||
}
|
||||
|
||||
|
||||
static void bref_clear_state(
|
||||
backend_ref_t* bref,
|
||||
bref_state_t state)
|
||||
{
|
||||
if(bref == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
if (state != BREF_WAITING_RESULT)
|
||||
{
|
||||
bref->bref_state &= ~state;
|
||||
@ -2867,10 +2871,10 @@ static void bref_clear_state(
|
||||
{
|
||||
int prev1;
|
||||
int prev2;
|
||||
|
||||
|
||||
/** Decrease waiter count */
|
||||
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
|
||||
|
||||
|
||||
if (prev1 <= 0) {
|
||||
atomic_add(&bref->bref_num_result_wait, 1);
|
||||
}
|
||||
@ -2880,14 +2884,26 @@ static void bref_clear_state(
|
||||
prev2 = atomic_add(
|
||||
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
|
||||
ss_dassert(prev2 > 0);
|
||||
}
|
||||
if(prev2 <= 0)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
|
||||
__FUNCTION__,
|
||||
&bref->bref_backend->backend_server->name,
|
||||
&bref->bref_backend->backend_server->port);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void bref_set_state(
|
||||
static void bref_set_state(
|
||||
backend_ref_t* bref,
|
||||
bref_state_t state)
|
||||
{
|
||||
if(bref == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: NULL parameter.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
if (state != BREF_WAITING_RESULT)
|
||||
{
|
||||
bref->bref_state |= state;
|
||||
@ -2896,15 +2912,28 @@ static void bref_set_state(
|
||||
{
|
||||
int prev1;
|
||||
int prev2;
|
||||
|
||||
|
||||
/** Increase waiter count */
|
||||
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
|
||||
ss_dassert(prev1 >= 0);
|
||||
|
||||
if(prev1 < 0)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: negative number of connections waiting for results in backend %s:%u",
|
||||
__FUNCTION__,
|
||||
&bref->bref_backend->backend_server->name,
|
||||
&bref->bref_backend->backend_server->port);
|
||||
}
|
||||
/** Increase global operation count */
|
||||
prev2 = atomic_add(
|
||||
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
|
||||
ss_dassert(prev2 >= 0);
|
||||
ss_dassert(prev2 >= 0);
|
||||
if(prev2 < 0)
|
||||
{
|
||||
skygw_log_write(LE,"[%s] Error: negative current operation count in backend %s:%u",
|
||||
__FUNCTION__,
|
||||
&bref->bref_backend->backend_server->name,
|
||||
&bref->bref_backend->backend_server->port);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -46,7 +46,7 @@ bool extract_database(GWBUF* buf, char* str)
|
||||
tok = strtok_r(query," ;",&saved);
|
||||
if(tok == NULL || strcasecmp(tok,"use") != 0)
|
||||
{
|
||||
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
|
||||
skygw_log_write(LOGFILE_ERROR,"extract_database: Malformed chage database packet.");
|
||||
succp = false;
|
||||
goto retblock;
|
||||
}
|
||||
@ -54,7 +54,7 @@ bool extract_database(GWBUF* buf, char* str)
|
||||
tok = strtok_r(NULL," ;",&saved);
|
||||
if(tok == NULL)
|
||||
{
|
||||
skygw_log_write(LOGFILE_ERROR,"Schemarouter: Malformed chage database packet.");
|
||||
skygw_log_write(LOGFILE_ERROR,"extract_database: Malformed chage database packet.");
|
||||
succp = false;
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
@ -319,7 +319,7 @@ parse_mapping_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
|
||||
if(PTR_IS_RESULTSET(((unsigned char*)buf->start)) &&
|
||||
modutil_count_signal_packets(buf,0,0,&more) == 2)
|
||||
{
|
||||
ptr = (char*)buf->start;
|
||||
ptr = (unsigned char*)buf->start;
|
||||
|
||||
if(ptr[5] != 1)
|
||||
{
|
||||
@ -1701,7 +1701,7 @@ routeQuery(ROUTER* instance,
|
||||
querybuf)))
|
||||
{
|
||||
extract_database(querybuf,db);
|
||||
snprintf(errbuf,"Unknown database: %s",db);
|
||||
snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db);
|
||||
create_error_reply(errbuf,router_cli_ses->replydcb);
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
|
||||
Reference in New Issue
Block a user