Merge branch 'develop' into binlog_server_wait_data

This commit is contained in:
MassimilianoPinto
2016-11-16 15:08:17 +01:00
32 changed files with 1454 additions and 402 deletions

View File

@ -412,8 +412,8 @@ blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint32_t size,
* Fill the gap with a self generated ignorable event
* Binlog file position is incremented by blr_write_special_event()
*/
if (hdr->next_pos && (hdr->next_pos > (file_offset + size)))
if (router->master_event_state == BLR_EVENT_DONE &&
hdr->next_pos && (hdr->next_pos > (file_offset + size)))
{
uint64_t hole_size = hdr->next_pos - file_offset - size;
if (!blr_write_special_event(router, file_offset, hole_size, hdr, BLRM_IGNORABLE))

View File

@ -1229,7 +1229,14 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
}
else
{
/* Terminate replication and exit from main loop */
blr_terminate_master_replication(router, ptr, len);
gwbuf_free(pkt);
pkt = NULL;
pkt_length = 0;
break;
}
if (hdr.ok == 0)

View File

@ -87,6 +87,7 @@
#include <zlib.h>
#include <maxscale/alloc.h>
static char* get_next_token(char *str, const char* delim, char **saveptr);
extern int load_mysql_users(SERV_LISTENER *listener);
extern void blr_master_close(ROUTER_INSTANCE* router);
extern int blr_file_new_binlog(ROUTER_INSTANCE *router, char *file);
@ -884,10 +885,16 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
spinlock_acquire(&router->lock);
/* Set the BLRM_UNCONFIGURED state */
router->master_state = BLRM_UNCONFIGURED;
blr_master_set_empty_config(router);
blr_master_free_config(current_master);
/* Remove any error message and errno */
free(router->m_errmsg);
router->m_errmsg = NULL;
router->m_errno = 0;
spinlock_release(&router->lock);
if (removed_cfg == -1)
@ -1031,11 +1038,19 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
router->master_state = BLRM_SLAVE_STOPPED;
spinlock_release(&router->lock);
}
if (!router->trx_safe)
{
/*
* The binlog server has just been configured
* master.ini file written in router->binlogdir.
* Now create the binlogfile specified in MASTER_LOG_FILE
*/
if (blr_file_new_binlog(router, router->binlog_name))
{
MXS_INFO("%s: 'master.ini' created, binlog file '%s' created", router->service->name, router->binlog_name);
}
blr_master_free_config(current_master);
return blr_slave_send_ok(router, slave);
}
if (router->trx_safe && router->pending_transaction)
@ -1051,17 +1066,24 @@ blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return blr_slave_send_warning_message(router, slave, message);
}
else
{
blr_master_free_config(current_master);
return blr_slave_send_ok(router, slave);
}
}
}
else
blr_master_free_config(current_master);
/*
* The CHAMGE MASTER command might specify a new binlog file.
* Let's create the binlogfile specified in MASTER_LOG_FILE
*/
if (strlen(router->prevbinlog) && strcmp(router->prevbinlog, router->binlog_name))
{
return blr_slave_send_ok(router, slave);
}
if (blr_file_new_binlog(router, router->binlog_name))
{
MXS_INFO("%s: created new binlog file '%s' by 'CHANGE MASTER TO' command",
router->service->name, router->binlog_name);
}
}
return blr_slave_send_ok(router, slave);
}
}
}
@ -3422,22 +3444,20 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
/* Send warning message to mysql command */
blr_slave_send_warning_message(router, slave, msg);
}
}
/* create new one */
/* No file has beem opened, create a new binlog file */
if (router->binlog_fd == -1)
{
blr_file_new_binlog(router, router->binlog_name);
}
else
{
if (router->binlog_fd == -1)
{
/* create new one */
blr_file_new_binlog(router, router->binlog_name);
}
else
{
/* use existing one */
blr_file_append(router, router->binlog_name);
}
/* A new binlog file has been created by CHANGE MASTER TO
* if no pending transaction is detected.
* use the existing one.
*/
blr_file_append(router, router->binlog_name);
}
/** Initialise SSL: exit on error */
@ -4328,6 +4348,148 @@ blr_set_master_password(ROUTER_INSTANCE *router, char *password)
return 0;
}
/**
* Get next token
*
* Works exactly like strtok_t except that a delim character which appears
* anywhere within quotes is ignored. For instance, if delim is "," then
* a string like "MASTER_USER='maxscale_repl_user',MASTER_PASSWORD='a,a'"
* will be tokenized into the following two tokens:
*
* MASTER_USER='maxscale_repl_user'
* MASTER_PASSWORD='a,a'
*
* @see strtok_r
*/
static char* get_next_token(char *str, const char* delim, char **saveptr)
{
if (str)
{
*saveptr = str;
}
if (!*saveptr)
{
return NULL;
}
bool delim_found = true;
// Skip any delims in the beginning.
while (**saveptr && delim_found)
{
const char* d = delim;
while (*d)
{
if (*d == **saveptr)
{
break;
}
++d;
}
if (*d == 0)
{
delim_found = false;
}
else
{
++*saveptr;
}
}
if (!**saveptr)
{
return NULL;
}
delim_found = false;
char *token = *saveptr;
char *p = *saveptr;
char quote = 0;
while (*p && !delim_found)
{
switch (*p)
{
case '\'':
case '"':
case '`':
if (!quote)
{
quote = *p;
}
else if (quote == *p)
{
quote = 0;
}
break;
default:
if (!quote)
{
const char *d = delim;
while (*d && !delim_found)
{
if (*p == *d)
{
delim_found = true;
*p = 0;
}
else
{
++d;
}
}
}
}
++p;
}
if (*p == 0)
{
*saveptr = NULL;
}
else if (delim_found)
{
*saveptr = p;
delim_found = true;
while (**saveptr && delim_found)
{
const char *d = delim;
while (*d)
{
if (**saveptr == *d)
{
break;
}
else
{
++d;
}
}
if (*d == 0)
{
delim_found = false;
}
else
{
++*saveptr;
}
}
}
return token;
}
/**
* Parse a CHANGE MASTER TO SQL command
*
@ -4342,7 +4504,7 @@ blr_parse_change_master_command(char *input, char *error_string, CHANGE_MASTER_O
char *sep = ",";
char *word, *brkb;
if ((word = strtok_r(input, sep, &brkb)) == NULL)
if ((word = get_next_token(input, sep, &brkb)) == NULL)
{
snprintf(error_string, BINLOG_ERROR_MSG_LEN, "Unable to parse query [%s]", input);
return 1;
@ -4356,7 +4518,7 @@ blr_parse_change_master_command(char *input, char *error_string, CHANGE_MASTER_O
}
}
while ((word = strtok_r(NULL, sep, &brkb)) != NULL)
while ((word = get_next_token(NULL, sep, &brkb)) != NULL)
{
/* parse options key=val */
if (blr_handle_change_master_token(word, error_string, config))
@ -4380,12 +4542,12 @@ static int
blr_handle_change_master_token(char *input, char *error, CHANGE_MASTER_OPTIONS *config)
{
/* space+TAB+= */
char *sep = " =";
char *sep = " \t=";
char *word, *brkb;
char *value = NULL;
char **option_field = NULL;
if ((word = strtok_r(input, sep, &brkb)) == NULL)
if ((word = get_next_token(input, sep, &brkb)) == NULL)
{
snprintf(error, BINLOG_ERROR_MSG_LEN, "error parsing %s", brkb);
return 1;
@ -4424,7 +4586,7 @@ static char *
blr_get_parsed_command_value(char *input)
{
/* space+TAB+= */
char *sep = " =";
char *sep = " \t=";
char *ret = NULL;
char *word;
char *value = NULL;
@ -4438,7 +4600,7 @@ blr_get_parsed_command_value(char *input)
return ret;
}
if ((word = strtok_r(NULL, sep, &input)) != NULL)
if ((word = get_next_token(NULL, sep, &input)) != NULL)
{
char *ptr;

View File

@ -527,8 +527,6 @@ exec_clear(DCB *dcb, MAXINFO_TREE *tree)
MXS_ERROR("%s", errmsg);
}
extern void shutdown_server();
/**
* MaxScale shutdown
* @param dcb Client DCB
@ -536,7 +534,7 @@ extern void shutdown_server();
*/
void exec_shutdown_maxscale(DCB *dcb, MAXINFO_TREE *tree)
{
shutdown_server();
maxscale_shutdown();
maxinfo_send_ok(dcb);
}

View File

@ -327,7 +327,7 @@ static void *newSession(ROUTER *router_inst, SESSION *session)
if (!select_connect_backend_servers(&master_ref, backend_ref, router_nservers,
max_nslaves, max_slave_rlag,
client_rses->rses_config.rw_slave_select_criteria,
session, router))
session, router, false))
{
/**
* Master and at least <min_nslaves> slaves must be found if the router is
@ -458,6 +458,40 @@ static void freeSession(ROUTER *router_instance, void *router_client_session)
return;
}
/**
* @brief Mark a backend reference as failed
*
* @param bref Backend reference to close
* @param fatal Whether the failure was fatal
*/
void close_failed_bref(backend_ref_t *bref, bool fatal)
{
if (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_QUERY_ACTIVE);
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
if (fatal)
{
bref_set_state(bref, BREF_FATAL_FAILURE);
}
if (sescmd_cursor_is_active(&bref->bref_sescmd_cur))
{
sescmd_cursor_set_active(&bref->bref_sescmd_cur, false);
}
if (bref->bref_pending_cmd)
{
gwbuf_free(bref->bref_pending_cmd);
bref->bref_pending_cmd = NULL;
}
}
/**
* @brief The main routing entry point for a query (API)
*
@ -652,7 +686,8 @@ static void clientReply(ROUTER *instance, void *router_session, GWBUF *writebuf,
router_cli_ses->rses_config.rw_max_slave_replication_lag,
router_cli_ses->rses_config.rw_slave_select_criteria,
router_cli_ses->rses_master_ref->bref_dcb->session,
router_cli_ses->router);
router_cli_ses->router,
true);
}
}
/**
@ -1296,29 +1331,11 @@ static void handleError(ROUTER *instance, void *router_session,
* If master has lost its Master status error can't be
* handled so that session could continue.
*/
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb &&
!SERVER_IS_MASTER(rses->rses_master_ref->ref->server))
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb)
{
SERVER *srv = rses->rses_master_ref->ref->server;
backend_ref_t *bref;
bref = get_bref_from_dcb(rses, problem_dcb);
if (bref != NULL)
{
CHK_BACKEND_REF(bref);
if (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
}
else
{
MXS_ERROR("server %s:%d lost the "
"master status but could not locate the "
"corresponding backend ref.",
srv->name, srv->port);
}
backend_ref_t *bref = get_bref_from_dcb(rses, problem_dcb);
bool can_continue = false;
if (rses->rses_config.rw_master_failure_mode != RW_FAIL_INSTANTLY &&
(bref == NULL || !BREF_IS_WAITING_RESULT(bref)))
@ -1332,38 +1349,41 @@ static void handleError(ROUTER *instance, void *router_session,
* can't be sure whether it was executed or not. In this
* case the safest thing to do is to close the client
* connection. */
*succp = true;
can_continue = true;
}
else if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged)
{
MXS_ERROR("Server %s:%d lost the master status. Readwritesplit "
"service can't locate the master. Client sessions "
"will be closed.", srv->name, srv->port);
srv->master_err_is_logged = true;
}
*succp = can_continue;
if (bref != NULL)
{
CHK_BACKEND_REF(bref);
close_failed_bref(bref, true);
}
else
{
if (!srv->master_err_is_logged)
{
MXS_ERROR("server %s:%d lost the "
"master status. Readwritesplit "
"service can't locate the master. "
"Client sessions will be closed.",
srv->name, srv->port);
srv->master_err_is_logged = true;
}
*succp = false;
}
MXS_ERROR("Server %s:%d lost the master status but could not locate the "
"corresponding backend ref.", srv->name, srv->port);
}
}
else
{
/**
* This is called in hope of getting replacement for
* failed slave(s). This call may free rses.
* failed slave(s).
*/
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
}
dcb_close(problem_dcb);
close_dcb = false;
/* Free the lock if rses still exists */
if (rses)
{
rses_end_locked_router_action(rses);
}
rses_end_locked_router_action(rses);
break;
}
@ -1419,13 +1439,7 @@ static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
if (BREF_IS_IN_USE(bref))
{
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
if (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
}
close_failed_bref(bref, false);
dcb_close(backend_dcb);
}
}
@ -1499,13 +1513,11 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
*/
if (BREF_IS_WAITING_RESULT(bref))
{
DCB *client_dcb;
client_dcb = ses->client_dcb;
DCB *client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
close_failed_bref(bref, false);
/**
* Error handler is already called for this DCB because
@ -1542,7 +1554,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
myrses->rses_nbackends,
max_nslaves, max_slave_rlag,
myrses->rses_config.rw_slave_select_criteria,
ses, inst);
ses, inst, true);
}
return_succp:

View File

@ -38,7 +38,7 @@ typedef enum bref_state
BREF_WAITING_RESULT = 0x02, /*< for session commands only */
BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
BREF_CLOSED = 0x08,
BREF_SESCMD_FAILED = 0x10 /*< Backend references that should be dropped */
BREF_FATAL_FAILURE = 0x10 /*< Backend references that should be dropped */
} bref_state_t;
#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE)
@ -46,7 +46,7 @@ typedef enum bref_state
#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0)
#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE)
#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED)
#define BREF_HAS_FAILED(s) ((s)->bref_state & BREF_SESCMD_FAILED)
#define BREF_HAS_FAILED(s) ((s)->bref_state & BREF_FATAL_FAILURE)
typedef enum backend_type_t
{

View File

@ -121,10 +121,11 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
bool select_connect_backend_servers(backend_ref_t **p_master_ref,
backend_ref_t *backend_ref,
int router_nservers, int max_nslaves,
int max_rlag,
int max_slave_rlag,
select_criteria_t select_criteria,
SESSION *session,
ROUTER_INSTANCE *router);
ROUTER_INSTANCE *router,
bool active_session);
/*
* The following are implemented in rwsplit_tmp_table_multi.c
@ -132,13 +133,14 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
mysql_server_cmd_t packet_type);
qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
qc_query_type_t type);
void check_create_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf, qc_query_type_t type);
bool check_for_multi_stmt(GWBUF *buf, void *protocol, mysql_server_cmd_t packet_type);
qc_query_type_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
void close_failed_bref(backend_ref_t *bref, bool fatal);
#ifdef __cplusplus
}

View File

@ -974,9 +974,9 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
if (rses->have_tmp_tables)
{
check_drop_tmp_table(rses, querybuf, packet_type);
if (is_packet_a_query(packet_type))
if (is_packet_a_query(packet_type) && is_read_tmp_table(rses, querybuf, *qtype))
{
*qtype = is_read_tmp_table(rses, querybuf, *qtype);
*qtype |= QUERY_TYPE_MASTER_READ;
}
}
check_create_tmp_table(rses, querybuf, *qtype);
@ -1116,6 +1116,64 @@ bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
}
}
/**
* @brief Log master write failure
*
* @param rses Router session
*/
static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
DCB *master_dcb, DCB *curr_master_dcb)
{
char errmsg[MAX_SERVER_NAME_LEN * 2 + 100]; // Extra space for error message
if (!found)
{
sprintf(errmsg, "Could not find a valid master connection");
}
else if (master_dcb && curr_master_dcb)
{
/** We found a master but it's not the same connection */
ss_dassert(master_dcb != curr_master_dcb);
if (master_dcb->server != curr_master_dcb->server)
{
sprintf(errmsg, "Master server changed from '%s' to '%s'",
master_dcb->server->unique_name,
curr_master_dcb->server->unique_name);
}
else
{
ss_dassert(false); // Currently we don't reconnect to the master
sprintf(errmsg, "Connection to master '%s' was recreated",
curr_master_dcb->server->unique_name);
}
}
else if (master_dcb)
{
/** We have an original master connection but we couldn't find it */
sprintf(errmsg, "The connection to master server '%s' is not available",
master_dcb->server->unique_name);
}
else
{
/** We never had a master connection, the session must be in read-only mode */
if (rses->rses_config.rw_master_failure_mode != RW_FAIL_INSTANTLY)
{
sprintf(errmsg, "Session is in read-only mode because it was created "
"when no master was available");
}
else
{
ss_dassert(false); // A session should always have a master reference
sprintf(errmsg, "Was supposed to route to master but couldn't "
"find master in a suitable state");
}
}
MXS_WARNING("[%s] Write query received from %s@%s. %s. Closing client connection.",
rses->router->service->name, rses->client_dcb->user,
rses->client_dcb->remote, errmsg);
}
/**
* @brief Handle master is the target
*
@ -1128,7 +1186,7 @@ bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
* @return bool - true if succeeded, false otherwise
*/
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
DCB **target_dcb)
DCB **target_dcb)
{
DCB *master_dcb = rses->rses_master_ref ? rses->rses_master_ref->bref_dcb : NULL;
DCB *curr_master_dcb = NULL;
@ -1141,29 +1199,26 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
}
else
{
if (succp && master_dcb != curr_master_dcb)
if (succp && master_dcb == curr_master_dcb)
{
MXS_INFO("Was supposed to route to master but master has changed.");
atomic_add(&inst->stats.n_master, 1);
*target_dcb = master_dcb;
}
else
{
MXS_INFO("Was supposed to route to master but couldn't find master"
" in a suitable state.");
}
if (rses->rses_config.rw_master_failure_mode == RW_ERROR_ON_WRITE)
{
/** Old master is no longer available */
succp = send_readonly_error(rses->client_dcb);
}
else
{
MXS_WARNING("[%s] Write query received from %s@%s when no master is "
"available, closing client connection.", inst->service->name,
rses->client_dcb->user, rses->client_dcb->remote);
succp = false;
/** The original master is not available, we can't route the write */
if (rses->rses_config.rw_master_failure_mode == RW_ERROR_ON_WRITE)
{
succp = send_readonly_error(rses->client_dcb);
}
else
{
log_master_routing_failure(rses, succp, master_dcb, curr_master_dcb);
succp = false;
}
}
}
return succp;
}
@ -1369,6 +1424,7 @@ static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses)
bref = &rses->rses_backend_ref[i];
if (bref && BREF_IS_IN_USE(bref))
{
ss_dassert(!BREF_IS_CLOSED(bref) && !BREF_HAS_FAILED(bref));
if (bref == rses->rses_master_ref)
{
/** Store master state for better error reporting */
@ -1386,13 +1442,11 @@ static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses)
}
}
if (candidate_bref == NULL && rses->rses_config.rw_master_failure_mode == RW_FAIL_INSTANTLY)
if (candidate_bref == NULL && rses->rses_config.rw_master_failure_mode == RW_FAIL_INSTANTLY &&
rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref))
{
MXS_ERROR("Could not find master among the backend "
"servers. Previous master's state : %s",
rses->rses_master_ref == NULL ? "No master found" :
(!BREF_IS_IN_USE(rses->rses_master_ref) ? "Master is not in use" :
STRSRVSTATUS(&master)));
MXS_ERROR("Could not find master among the backend servers. "
"Previous master's state : %s", STRSRVSTATUS(&master));
}
return candidate_bref;

View File

@ -61,12 +61,71 @@ int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *) =
bref_cmp_current_load
};
/*
* The following function is the only one that is called from elsewhere in
* the read write split router. It is not intended for use from outside this
* router. Other functions in this module are internal and are called
* directly or indirectly by this function.
/**
* @brief Check whether it's possible to connect to this server
*
* @param bref Backend reference
* @return True if a connection to this server can be attempted
*/
static bool bref_valid_for_connect(const backend_ref_t *bref)
{
return !BREF_HAS_FAILED(bref) && SERVER_IS_RUNNING(bref->ref->server);
}
/**
* Check whether it's possible to use this server as a slave
*
* @param bref Backend reference
* @param master_host The master server
* @return True if this server is a valid slave candidate
*/
static bool bref_valid_for_slave(const backend_ref_t *bref, const SERVER *master_host)
{
SERVER *server = bref->ref->server;
return (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) &&
(master_host == NULL || (server != master_host));
}
/**
* @brief Find the best slave candidate
*
* This function iterates through @c bref and tries to find the best backend
* reference that is not in use. @c cmpfun will be called to compare the backends.
*
* @param bref Backend reference
* @param n Size of @c bref
* @param master The master server
* @param cmpfun qsort() compatible comparison function
* @return The best slave backend reference or NULL if no candidates could be found
*/
backend_ref_t* get_slave_candidate(backend_ref_t *bref, int n, const SERVER *master,
int (*cmpfun)(const void *, const void *))
{
backend_ref_t *candidate = NULL;
for (int i = 0; i < n; i++)
{
if (!BREF_IS_IN_USE(&bref[i]) &&
bref_valid_for_connect(&bref[i]) &&
bref_valid_for_slave(&bref[i], master))
{
if (candidate)
{
if (cmpfun(candidate, &bref[i]) > 0)
{
candidate = &bref[i];
}
}
else
{
candidate = &bref[i];
}
}
}
return candidate;
}
/**
* @brief Search suitable backend servers from those of router instance
@ -92,7 +151,8 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
int max_slave_rlag,
select_criteria_t select_criteria,
SESSION *session,
ROUTER_INSTANCE *router)
ROUTER_INSTANCE *router,
bool active_session)
{
if (p_master_ref == NULL || backend_ref == NULL)
{
@ -103,31 +163,33 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
}
/* get the root Master */
SERVER_REF *master_host = get_root_master(backend_ref, router_nservers);
SERVER_REF *master_backend = get_root_master(backend_ref, router_nservers);
SERVER *master_host = master_backend ? master_backend->server : NULL;
if (router->rwsplit_config.rw_master_failure_mode == RW_FAIL_INSTANTLY &&
(master_host == NULL || !SERVER_REF_IS_ACTIVE(master_host) ||
SERVER_IS_DOWN(master_host->server)))
(master_host == NULL || SERVER_IS_DOWN(master_host)))
{
MXS_ERROR("Couldn't find suitable Master from %d candidates.", router_nservers);
return false;
}
/**
* Existing session : master is already chosen and connected.
* The function was called because new slave must be selected to replace
* failed one.
* New session:
*
* Connect to both master and slaves
*
* Existing session:
*
* Master is already connected or we don't have a master. The function was
* called because new slaves must be selected to replace failed ones.
*/
bool master_connected = *p_master_ref != NULL;
bool master_connected = active_session || *p_master_ref != NULL;
/** Check slave selection criteria and set compare function */
int (*p)(const void *, const void *) = criteria_cmpfun[select_criteria];
ss_dassert(p);
/** Sort the pointer list to servers according to slave selection criteria.
* The servers that match the criteria the best are at the beginning of
* the list. */
qsort(backend_ref, (size_t) router_nservers, sizeof(backend_ref_t), p);
SERVER *old_master = *p_master_ref ? (*p_master_ref)->ref->server : NULL;
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
@ -139,53 +201,59 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
const int min_nslaves = 0; /*< not configurable at the time */
bool succp = false;
/**
* Choose at least 1+min_nslaves (master and slave) and at most 1+max_nslaves
* servers from the sorted list. First master found is selected.
*/
for (int i = 0; i < router_nservers &&
(slaves_connected < max_nslaves || !master_connected); i++)
if (!master_connected)
{
SERVER *serv = backend_ref[i].ref->server;
if (!BREF_HAS_FAILED(&backend_ref[i]) &&
SERVER_REF_IS_ACTIVE(backend_ref[i].ref) &&
SERVER_IS_RUNNING(serv))
/** Find a master server */
for (int i = 0; i < router_nservers; i++)
{
/* check also for relay servers and don't take the master_host */
if (slaves_found < max_nslaves &&
(max_slave_rlag == MAX_RLAG_UNDEFINED ||
(serv->rlag != MAX_RLAG_NOT_AVAILABLE &&
serv->rlag <= max_slave_rlag)) &&
(SERVER_IS_SLAVE(serv) || SERVER_IS_RELAY_SERVER(serv)) &&
(master_host == NULL || (serv != master_host->server)))
{
slaves_found += 1;
SERVER *serv = backend_ref[i].ref->server;
if (BREF_IS_IN_USE((&backend_ref[i])) ||
connect_server(&backend_ref[i], session, true))
{
slaves_connected += 1;
}
}
/* take the master_host for master */
else if (master_host && (serv == master_host->server))
if (bref_valid_for_connect(&backend_ref[i]) &&
master_host && serv == master_host)
{
/** p_master_ref must be assigned with this backend_ref pointer
* because its original value may have been lost when backend
* references were sorted with qsort. */
*p_master_ref = &backend_ref[i];
if (!master_connected)
if (connect_server(&backend_ref[i], session, false))
{
if (connect_server(&backend_ref[i], session, false))
{
master_connected = true;
}
*p_master_ref = &backend_ref[i];
break;
}
}
}
} /*< for */
}
/** Calculate how many connections we already have */
for (int i = 0; i < router_nservers; i++)
{
if (bref_valid_for_connect(&backend_ref[i]) &&
bref_valid_for_slave(&backend_ref[i], master_host))
{
slaves_found += 1;
if (BREF_IS_IN_USE(&backend_ref[i]))
{
slaves_connected += 1;
}
}
}
ss_dassert(slaves_connected < max_nslaves);
backend_ref_t *bref = get_slave_candidate(backend_ref, router_nservers, master_host, p);
/** Connect to all possible slaves */
while (bref && slaves_connected < max_nslaves)
{
if (connect_server(bref, session, true))
{
slaves_connected += 1;
}
else
{
/** Failed to connect, mark server as failed */
bref_set_state(bref, BREF_FATAL_FAILURE);
}
bref = get_slave_candidate(backend_ref, router_nservers, master_host, p);
}
/**
* Successful cases
@ -218,11 +286,9 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
/** Failure cases */
else
{
if (slaves_connected < min_nslaves)
{
MXS_ERROR("Couldn't establish required amount of "
"slave connections for router session.");
}
MXS_ERROR("Couldn't establish required amount of slave connections for "
"router session. Would need between %d and %d slaves but only have %d.",
min_nslaves, max_nslaves, slaves_connected);
/** Clean up connections */
for (int i = 0; i < router_nservers; i++)
@ -231,8 +297,8 @@ bool select_connect_backend_servers(backend_ref_t **p_master_ref,
{
ss_dassert(backend_ref[i].ref->connections > 0);
/** disconnect opened connections */
bref_clear_state(&backend_ref[i], BREF_IN_USE);
close_failed_bref(&backend_ref[i], true);
/** Decrease backend's connection counter. */
atomic_add(&backend_ref[i].ref->connections, -1);
dcb_close(backend_ref[i].bref_dcb);

View File

@ -169,16 +169,13 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
/** Set response status received */
bref_clear_state(bref, BREF_WAITING_RESULT);
if (bref->reply_cmd != scmd->reply_cmd)
if (bref->reply_cmd != scmd->reply_cmd && BREF_IS_IN_USE(bref))
{
MXS_ERROR("Slave server '%s': response differs from master's response. "
"Closing connection due to inconsistent session state.",
bref->ref->server->unique_name);
sescmd_cursor_set_active(scur, false);
bref_clear_state(bref, BREF_QUERY_ACTIVE);
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
bref_set_state(bref, BREF_SESCMD_FAILED);
close_failed_bref(bref, true);
if (bref->bref_dcb)
{
dcb_close(bref->bref_dcb);
@ -213,12 +210,11 @@ GWBUF *sescmd_cursor_process_replies(GWBUF *replybuf,
{
/** This backend has already received a response */
if (ses->rses_backend_ref[i].reply_cmd != scmd->reply_cmd &&
!BREF_IS_CLOSED(&ses->rses_backend_ref[i]))
!BREF_IS_CLOSED(&ses->rses_backend_ref[i]) &&
BREF_IS_IN_USE(&ses->rses_backend_ref[i]))
{
bref_clear_state(&ses->rses_backend_ref[i], BREF_QUERY_ACTIVE);
bref_clear_state(&ses->rses_backend_ref[i], BREF_IN_USE);
bref_set_state(&ses->rses_backend_ref[i], BREF_CLOSED);
bref_set_state(bref, BREF_SESCMD_FAILED);
close_failed_bref(&ses->rses_backend_ref[i], true);
if (ses->rses_backend_ref[i].bref_dcb)
{
dcb_close(ses->rses_backend_ref[i].bref_dcb);

View File

@ -109,9 +109,9 @@ void check_drop_tmp_table(ROUTER_CLIENT_SES *router_cli_ses, GWBUF *querybuf,
* @param type The type of the query resolved so far
* @return The type of the query
*/
qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
bool is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
GWBUF *querybuf,
qc_query_type_t type)
qc_query_type_t qtype)
{
bool target_tmp_table = false;
@ -120,20 +120,20 @@ qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
char *dbname;
char hkey[MYSQL_DATABASE_MAXLEN + MYSQL_TABLE_MAXLEN + 2];
MYSQL_session *data;
qc_query_type_t qtype = type;
bool rval = false;
rses_property_t *rses_prop_tmp;
if (router_cli_ses == NULL || querybuf == NULL)
{
MXS_ERROR("[%s] Error: NULL parameters passed: %p %p", __FUNCTION__,
router_cli_ses, querybuf);
return type;
return false;
}
if (router_cli_ses->client_dcb == NULL)
{
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
return type;
return false;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
@ -142,7 +142,7 @@ qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
if (data == NULL)
{
MXS_ERROR("[%s] Error: User data in client DBC is NULL.", __FUNCTION__);
return qtype;
return false;
}
dbname = (char *)data->db;
@ -166,7 +166,7 @@ qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
if (hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables, hkey))
{
/**Query target is a temporary table*/
qtype = QUERY_TYPE_READ_TMP_TABLE;
rval = true;
MXS_INFO("Query targets a temporary table: %s", hkey);
break;
}
@ -184,7 +184,7 @@ qc_query_type_t is_read_tmp_table(ROUTER_CLIENT_SES *router_cli_ses,
MXS_FREE(tbl);
}
return qtype;
return rval;
}
/**