Rename readwritesplit variables
Renamed variables to better represent the types of variables they represent. Reordered some of the functions so that the functions don't need to be declared before they are used.
This commit is contained in:
parent
aa61c8a30b
commit
37b6cf250d
@ -157,7 +157,7 @@ int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses)
|
||||
*
|
||||
* @return backend reference pointer if succeed or NULL
|
||||
*/
|
||||
SRWBackend get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
|
||||
SRWBackend get_backend_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb)
|
||||
{
|
||||
ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
|
||||
CHK_DCB(dcb);
|
||||
@ -327,9 +327,9 @@ static void handle_error_reply_client(MXS_SESSION *ses, ROUTER_CLIENT_SES *rses,
|
||||
mxs_session_state_t sesstate = ses->state;
|
||||
DCB *client_dcb = ses->client_dcb;
|
||||
|
||||
SRWBackend bref = get_bref_from_dcb(rses, backend_dcb);
|
||||
SRWBackend backend = get_backend_from_dcb(rses, backend_dcb);
|
||||
|
||||
bref->close();
|
||||
backend->close();
|
||||
|
||||
if (sesstate == SESSION_STATE_ROUTER_READY)
|
||||
{
|
||||
@ -351,19 +351,19 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, const SRWBackend&
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
if (bref->in_use() && bref != old &&
|
||||
!SERVER_IS_MASTER(bref->server()) &&
|
||||
SERVER_IS_SLAVE(bref->server()))
|
||||
if (backend->in_use() && backend != old &&
|
||||
!SERVER_IS_MASTER(backend->server()) &&
|
||||
SERVER_IS_SLAVE(backend->server()))
|
||||
{
|
||||
/** Found a valid candidate; a non-master slave that's in use */
|
||||
if (bref->write(stored))
|
||||
if (backend->write(stored))
|
||||
{
|
||||
MXS_INFO("Retrying failed read at '%s'.", bref->server()->unique_name);
|
||||
ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE);
|
||||
LOG_RS(bref, REPLY_STATE_START);
|
||||
bref->set_reply_state(REPLY_STATE_START);
|
||||
MXS_INFO("Retrying failed read at '%s'.", backend->server()->unique_name);
|
||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||
LOG_RS(backend, REPLY_STATE_START);
|
||||
backend->set_reply_state(REPLY_STATE_START);
|
||||
rses->expected_responses++;
|
||||
success = true;
|
||||
break;
|
||||
@ -412,12 +412,12 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
DCB *backend_dcb, GWBUF *errmsg)
|
||||
{
|
||||
ROUTER_CLIENT_SES *myrses = *rses;
|
||||
SRWBackend bref = get_bref_from_dcb(myrses, backend_dcb);
|
||||
SRWBackend backend = get_backend_from_dcb(myrses, backend_dcb);
|
||||
|
||||
MXS_SESSION* ses = backend_dcb->session;
|
||||
CHK_SESSION(ses);
|
||||
|
||||
if (bref->is_waiting_result())
|
||||
if (backend->is_waiting_result())
|
||||
{
|
||||
/**
|
||||
* A query was sent through the backend and it is waiting for a reply.
|
||||
@ -428,8 +428,8 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
const SERVER *target;
|
||||
|
||||
if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
|
||||
target != bref->backend()->server ||
|
||||
!reroute_stored_statement(*rses, bref, stored))
|
||||
target != backend->backend()->server ||
|
||||
!reroute_stored_statement(*rses, backend, stored))
|
||||
{
|
||||
/**
|
||||
* We failed to route the stored statement or no statement was
|
||||
@ -439,7 +439,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
gwbuf_free(stored);
|
||||
myrses->expected_responses--;
|
||||
|
||||
if (bref->session_command_count())
|
||||
if (backend->session_command_count())
|
||||
{
|
||||
/**
|
||||
* The backend was executing a command that requires a reply.
|
||||
@ -462,7 +462,7 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
|
||||
}
|
||||
|
||||
/** Close the current connection */
|
||||
bref->close();
|
||||
backend->close();
|
||||
|
||||
int max_nslaves = rses_get_max_slavecount(myrses);
|
||||
bool succp;
|
||||
@ -584,60 +584,60 @@ bool route_stored_query(ROUTER_CLIENT_SES *rses)
|
||||
/**
|
||||
* @brief Check if we have received a complete reply from the backend
|
||||
*
|
||||
* @param bref Backend reference
|
||||
* @param buffer Buffer containing the response
|
||||
* @param backend Backend reference
|
||||
* @param buffer Buffer containing the response
|
||||
*
|
||||
* @return True if the complete response has been received
|
||||
*/
|
||||
bool reply_is_complete(SRWBackend bref, GWBUF *buffer)
|
||||
bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
|
||||
{
|
||||
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->dcb()->session);
|
||||
mysql_server_cmd_t cmd = mxs_mysql_current_command(backend->dcb()->session);
|
||||
|
||||
if (bref->get_reply_state() == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer))
|
||||
if (backend->get_reply_state() == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer))
|
||||
{
|
||||
if (cmd == MYSQL_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer))
|
||||
{
|
||||
/** Not a result set, we have the complete response */
|
||||
LOG_RS(bref, REPLY_STATE_DONE);
|
||||
bref->set_reply_state(REPLY_STATE_DONE);
|
||||
LOG_RS(backend, REPLY_STATE_DONE);
|
||||
backend->set_reply_state(REPLY_STATE_DONE);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
bool more = false;
|
||||
int old_eof = bref->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||
int old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
|
||||
int n_eof = modutil_count_signal_packets(buffer, old_eof, &more);
|
||||
|
||||
if (n_eof == 0)
|
||||
{
|
||||
/** Waiting for the EOF packet after the column definitions */
|
||||
LOG_RS(bref, REPLY_STATE_RSET_COLDEF);
|
||||
bref->set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||
LOG_RS(backend, REPLY_STATE_RSET_COLDEF);
|
||||
backend->set_reply_state(REPLY_STATE_RSET_COLDEF);
|
||||
}
|
||||
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
|
||||
{
|
||||
/** Waiting for the EOF packet after the rows */
|
||||
LOG_RS(bref, REPLY_STATE_RSET_ROWS);
|
||||
bref->set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||
LOG_RS(backend, REPLY_STATE_RSET_ROWS);
|
||||
backend->set_reply_state(REPLY_STATE_RSET_ROWS);
|
||||
}
|
||||
else
|
||||
{
|
||||
/** We either have a complete result set or a response to
|
||||
* a COM_FIELD_LIST command */
|
||||
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
|
||||
LOG_RS(bref, REPLY_STATE_DONE);
|
||||
bref->set_reply_state(REPLY_STATE_DONE);
|
||||
LOG_RS(backend, REPLY_STATE_DONE);
|
||||
backend->set_reply_state(REPLY_STATE_DONE);
|
||||
|
||||
if (more)
|
||||
{
|
||||
/** The server will send more resultsets */
|
||||
LOG_RS(bref, REPLY_STATE_START);
|
||||
bref->set_reply_state(REPLY_STATE_START);
|
||||
LOG_RS(backend, REPLY_STATE_START);
|
||||
backend->set_reply_state(REPLY_STATE_START);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return bref->get_reply_state() == REPLY_STATE_DONE;
|
||||
return backend->get_reply_state() == REPLY_STATE_DONE;
|
||||
}
|
||||
|
||||
void close_all_connections(ROUTER_CLIENT_SES* rses)
|
||||
@ -645,11 +645,11 @@ void close_all_connections(ROUTER_CLIENT_SES* rses)
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
if (bref->in_use())
|
||||
if (backend->in_use())
|
||||
{
|
||||
bref->close();
|
||||
backend->close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1108,19 +1108,19 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
* and
|
||||
*/
|
||||
|
||||
SRWBackend bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
|
||||
SRWBackend backend = get_backend_from_dcb(router_cli_ses, backend_dcb);
|
||||
|
||||
/** Statement was successfully executed, free the stored statement */
|
||||
session_clear_stmt(backend_dcb->session);
|
||||
ss_dassert(bref->get_reply_state() != REPLY_STATE_DONE);
|
||||
ss_dassert(backend->get_reply_state() != REPLY_STATE_DONE);
|
||||
|
||||
if (reply_is_complete(bref, writebuf))
|
||||
if (reply_is_complete(backend, writebuf))
|
||||
{
|
||||
/** Got a complete reply, acknowledge the write decrement expected response count */
|
||||
bref->ack_write();
|
||||
backend->ack_write();
|
||||
router_cli_ses->expected_responses--;
|
||||
ss_dassert(router_cli_ses->expected_responses >= 0);
|
||||
ss_dassert(bref->get_reply_state() == REPLY_STATE_DONE);
|
||||
ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1131,13 +1131,13 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
* Active cursor means that reply is from session command
|
||||
* execution.
|
||||
*/
|
||||
if (bref->session_command_count())
|
||||
if (backend->session_command_count())
|
||||
{
|
||||
check_session_command_reply(writebuf, bref);
|
||||
check_session_command_reply(writebuf, backend);
|
||||
|
||||
/** This discards all responses that have already been sent to the client */
|
||||
bool rconn = false;
|
||||
process_sescmd_response(router_cli_ses, bref, &writebuf, &rconn);
|
||||
process_sescmd_response(router_cli_ses, backend, &writebuf, &rconn);
|
||||
|
||||
if (rconn && !router_inst->rwsplit_config.disable_sescmd_history)
|
||||
{
|
||||
@ -1176,12 +1176,12 @@ static void clientReply(MXS_ROUTER *instance,
|
||||
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
|
||||
}
|
||||
/** Check pending session commands */
|
||||
else if (!queue_routed && bref->session_command_count())
|
||||
else if (!queue_routed && backend->session_command_count())
|
||||
{
|
||||
MXS_INFO("Backend [%s]:%d processed reply and starts to execute active cursor.",
|
||||
bref->server()->name, bref->server()->port);
|
||||
backend->server()->name, backend->server()->port);
|
||||
|
||||
if (bref->execute_session_command())
|
||||
if (backend->execute_session_command())
|
||||
{
|
||||
router_cli_ses->expected_responses++;
|
||||
}
|
||||
@ -1250,7 +1250,7 @@ static void handleError(MXS_ROUTER *instance,
|
||||
MXS_SESSION *session = problem_dcb->session;
|
||||
ss_dassert(session);
|
||||
|
||||
SRWBackend bref = get_bref_from_dcb(rses, problem_dcb);
|
||||
SRWBackend backend = get_backend_from_dcb(rses, problem_dcb);
|
||||
|
||||
switch (action)
|
||||
{
|
||||
@ -1266,7 +1266,7 @@ static void handleError(MXS_ROUTER *instance,
|
||||
bool can_continue = false;
|
||||
|
||||
if (rses->rses_config.master_failure_mode != RW_FAIL_INSTANTLY &&
|
||||
(!bref || !bref->is_waiting_result()))
|
||||
(!backend || !backend->is_waiting_result()))
|
||||
{
|
||||
/** The failure of a master is not considered a critical
|
||||
* failure as partial functionality still remains. Reads
|
||||
@ -1289,9 +1289,9 @@ static void handleError(MXS_ROUTER *instance,
|
||||
|
||||
*succp = can_continue;
|
||||
|
||||
if (bref)
|
||||
if (backend)
|
||||
{
|
||||
bref->close(mxs::Backend::CLOSE_FATAL);
|
||||
backend->close(mxs::Backend::CLOSE_FATAL);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1299,7 +1299,7 @@ static void handleError(MXS_ROUTER *instance,
|
||||
"corresponding backend ref.", srv->name, srv->port);
|
||||
}
|
||||
}
|
||||
else if (bref)
|
||||
else if (backend)
|
||||
{
|
||||
/** Check whether problem_dcb is same as dcb of rses->target_node
|
||||
* and within READ ONLY transaction:
|
||||
@ -1324,14 +1324,14 @@ static void handleError(MXS_ROUTER *instance,
|
||||
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
|
||||
}
|
||||
|
||||
if (bref)
|
||||
if (backend)
|
||||
{
|
||||
/** This is a valid DCB for a backend ref */
|
||||
if (bref->in_use() && bref->dcb() == problem_dcb)
|
||||
if (backend->in_use() && backend->dcb() == problem_dcb)
|
||||
{
|
||||
ss_dassert(false);
|
||||
MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.",
|
||||
bref->server()->unique_name);
|
||||
backend->server()->unique_name);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -51,7 +51,7 @@ bool send_readonly_error(DCB *dcb);
|
||||
* The following are implemented in readwritesplit.c
|
||||
*/
|
||||
int router_handle_state_switch(DCB *dcb, DCB_REASON reason, void *data);
|
||||
SRWBackend get_bref_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
|
||||
SRWBackend get_backend_from_dcb(ROUTER_CLIENT_SES *rses, DCB *dcb);
|
||||
int rses_get_max_slavecount(ROUTER_CLIENT_SES *rses);
|
||||
int rses_get_max_replication_lag(ROUTER_CLIENT_SES *rses);
|
||||
|
||||
|
@ -275,33 +275,34 @@ void closed_session_reply(GWBUF *querybuf)
|
||||
/**
|
||||
* @brief Check the reply from a backend server to a session command
|
||||
*
|
||||
* If the reply is an error, a message may be logged.
|
||||
* If the reply is an error, a message is logged.
|
||||
*
|
||||
* @param writebuf Query buffer containing reply data
|
||||
* @param scur Session cursor
|
||||
* @param bref Router session data for a backend server
|
||||
* @param buffer Query buffer containing reply data
|
||||
* @param backend Router session data for a backend server
|
||||
*/
|
||||
void check_session_command_reply(GWBUF *writebuf, SRWBackend bref)
|
||||
void check_session_command_reply(GWBUF *buffer, SRWBackend backend)
|
||||
{
|
||||
if (MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(writebuf))))
|
||||
if (MYSQL_IS_ERROR_PACKET(((uint8_t *)GWBUF_DATA(buffer))))
|
||||
{
|
||||
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(writebuf));
|
||||
size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer));
|
||||
char replybuf[replylen];
|
||||
gwbuf_copy_data(writebuf, 0, gwbuf_length(writebuf), (uint8_t*)replybuf);
|
||||
gwbuf_copy_data(buffer, 0, gwbuf_length(buffer), (uint8_t*)replybuf);
|
||||
std::string err;
|
||||
std::string msg;
|
||||
err.append(replybuf + 8, 5);
|
||||
msg.append(replybuf + 13, replylen - 4 - 5);
|
||||
|
||||
MXS_ERROR("Failed to execute session command in [%s]:%d. Error was: %s %s",
|
||||
bref->server()->name, bref->server()->port,
|
||||
backend->server()->name, backend->server()->port,
|
||||
err.c_str(), msg.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an error message to the client telling that the server is in read only mode
|
||||
* @brief Send an error message to the client telling that the server is in read only mode
|
||||
*
|
||||
* @param dcb Client DCB
|
||||
*
|
||||
* @return True if sending the message was successful, false if an error occurred
|
||||
*/
|
||||
bool send_readonly_error(DCB *dcb)
|
||||
|
@ -31,7 +31,7 @@
|
||||
|
||||
extern int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *);
|
||||
|
||||
static SRWBackend get_root_master_bref(ROUTER_CLIENT_SES *rses);
|
||||
static SRWBackend get_root_master_backend(ROUTER_CLIENT_SES *rses);
|
||||
|
||||
/**
|
||||
* Find out which of the two backend servers has smaller value for select
|
||||
@ -74,18 +74,18 @@ void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend bref = *it;
|
||||
SRWBackend backend = *it;
|
||||
|
||||
if (bref->in_use() && bref != target && !bref->is_waiting_result())
|
||||
if (backend->in_use() && backend != target && !backend->is_waiting_result())
|
||||
{
|
||||
ss_debug(nserv++);
|
||||
int diff = hkheartbeat - bref->dcb()->last_read;
|
||||
int diff = hkheartbeat - backend->dcb()->last_read;
|
||||
|
||||
if (diff > keepalive)
|
||||
{
|
||||
MXS_INFO("Pinging %s, idle for %d seconds",
|
||||
bref->server()->unique_name, diff / 10);
|
||||
modutil_ignorable_ping(bref->dcb());
|
||||
backend->server()->unique_name, diff / 10);
|
||||
modutil_ignorable_ping(backend->dcb());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -246,20 +246,20 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t comma
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
if (bref->in_use())
|
||||
if (backend->in_use())
|
||||
{
|
||||
bref->append_session_command(sescmd);
|
||||
backend->append_session_command(sescmd);
|
||||
|
||||
uint64_t current_pos = bref->next_session_command()->get_position();
|
||||
uint64_t current_pos = backend->next_session_command()->get_position();
|
||||
|
||||
if (current_pos < lowest_pos)
|
||||
{
|
||||
lowest_pos = current_pos;
|
||||
}
|
||||
|
||||
if (bref->execute_session_command())
|
||||
if (backend->execute_session_command())
|
||||
{
|
||||
nsucc += 1;
|
||||
|
||||
@ -269,13 +269,13 @@ bool route_session_write(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t comma
|
||||
}
|
||||
|
||||
MXS_INFO("Route query to %s \t[%s]:%d",
|
||||
SERVER_IS_MASTER(bref->server()) ? "master" : "slave",
|
||||
bref->server()->name, bref->server()->port);
|
||||
SERVER_IS_MASTER(backend->server()) ? "master" : "slave",
|
||||
backend->server()->name, backend->server()->port);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to execute session command in [%s]:%d",
|
||||
bref->server()->name, bref->server()->port);
|
||||
backend->server()->name, backend->server()->port);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -339,10 +339,8 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
return rses->target_node;
|
||||
}
|
||||
|
||||
bool succp = false;
|
||||
|
||||
/** get root master from available servers */
|
||||
SRWBackend master_bref = get_root_master_bref(rses);
|
||||
SRWBackend master = get_root_master_backend(rses);
|
||||
|
||||
if (name) /*< Choose backend by name from a hint */
|
||||
{
|
||||
@ -351,17 +349,17 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
/** The server must be a valid slave, relay server, or master */
|
||||
|
||||
if (bref->in_use() && bref->is_active() &&
|
||||
(strcasecmp(name, bref->server()->unique_name) == 0) &&
|
||||
(SERVER_IS_SLAVE(bref->server()) ||
|
||||
SERVER_IS_RELAY_SERVER(bref->server()) ||
|
||||
SERVER_IS_MASTER(bref->server())))
|
||||
if (backend->in_use() && backend->is_active() &&
|
||||
(strcasecmp(name, backend->server()->unique_name) == 0) &&
|
||||
(SERVER_IS_SLAVE(backend->server()) ||
|
||||
SERVER_IS_RELAY_SERVER(backend->server()) ||
|
||||
SERVER_IS_MASTER(backend->server())))
|
||||
{
|
||||
return bref;
|
||||
return backend;
|
||||
}
|
||||
}
|
||||
|
||||
@ -376,14 +374,14 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
/**
|
||||
* Unused backend or backend which is not master nor
|
||||
* slave can't be used
|
||||
*/
|
||||
if (!bref->in_use() || !bref->is_active() ||
|
||||
(!SERVER_IS_MASTER(bref->server()) && !SERVER_IS_SLAVE(bref->server())))
|
||||
if (!backend->in_use() || !backend->is_active() ||
|
||||
(!SERVER_IS_MASTER(backend->server()) && !SERVER_IS_SLAVE(backend->server())))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
@ -397,10 +395,10 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
* Ensure that master has not changed during
|
||||
* session and abort if it has.
|
||||
*/
|
||||
if (SERVER_IS_MASTER(bref->server()) && bref == rses->current_master)
|
||||
if (SERVER_IS_MASTER(backend->server()) && backend == rses->current_master)
|
||||
{
|
||||
/** found master */
|
||||
rval = bref;
|
||||
rval = backend;
|
||||
}
|
||||
/**
|
||||
* Ensure that max replication lag is not set
|
||||
@ -408,11 +406,11 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
* maximum allowed replication lag.
|
||||
*/
|
||||
else if (max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(bref->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
bref->server()->rlag <= max_rlag))
|
||||
(backend->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
backend->server()->rlag <= max_rlag))
|
||||
{
|
||||
/** found slave */
|
||||
rval = bref;
|
||||
rval = backend;
|
||||
}
|
||||
}
|
||||
/**
|
||||
@ -420,36 +418,36 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
* replication lag limits replaces it.
|
||||
*/
|
||||
else if (SERVER_IS_MASTER(rval->server()) &&
|
||||
SERVER_IS_SLAVE(bref->server()) &&
|
||||
SERVER_IS_SLAVE(backend->server()) &&
|
||||
(max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(bref->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
bref->server()->rlag <= max_rlag)) &&
|
||||
(backend->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
backend->server()->rlag <= max_rlag)) &&
|
||||
!rses->rses_config.master_accept_reads)
|
||||
{
|
||||
/** found slave */
|
||||
rval = bref;
|
||||
rval = backend;
|
||||
}
|
||||
/**
|
||||
* When candidate exists, compare it against the current
|
||||
* backend and update assign it to new candidate if
|
||||
* necessary.
|
||||
*/
|
||||
else if (SERVER_IS_SLAVE(bref->server()) ||
|
||||
else if (SERVER_IS_SLAVE(backend->server()) ||
|
||||
(rses->rses_config.master_accept_reads &&
|
||||
SERVER_IS_MASTER(bref->server())))
|
||||
SERVER_IS_MASTER(backend->server())))
|
||||
{
|
||||
if (max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(bref->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
bref->server()->rlag <= max_rlag))
|
||||
(backend->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
backend->server()->rlag <= max_rlag))
|
||||
{
|
||||
rval = check_candidate_bref(rval, bref, rses->rses_config.slave_selection_criteria);
|
||||
rval = compare_backends(rval, backend, rses->rses_config.slave_selection_criteria);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Server [%s]:%d is too much behind the master "
|
||||
"(%d seconds) and can't be chosen",
|
||||
bref->server()->name, bref->server()->port,
|
||||
bref->server()->rlag);
|
||||
backend->server()->name, backend->server()->port,
|
||||
backend->server()->rlag);
|
||||
}
|
||||
}
|
||||
} /*< for */
|
||||
@ -460,32 +458,32 @@ SRWBackend get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
*/
|
||||
else if (btype == BE_MASTER)
|
||||
{
|
||||
if (master_bref && master_bref->is_active())
|
||||
if (master && master->is_active())
|
||||
{
|
||||
/** It is possible for the server status to change at any point in time
|
||||
* so copying it locally will make possible error messages
|
||||
* easier to understand */
|
||||
SERVER server;
|
||||
server.status = master_bref->server()->status;
|
||||
server.status = master->server()->status;
|
||||
|
||||
if (master_bref->in_use())
|
||||
if (master->in_use())
|
||||
{
|
||||
if (SERVER_IS_MASTER(&server))
|
||||
{
|
||||
rval = master_bref;
|
||||
rval = master;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Server '%s' should be master but is %s instead "
|
||||
"and can't be chosen as the master.",
|
||||
master_bref->server()->unique_name,
|
||||
master->server()->unique_name,
|
||||
STRSRVSTATUS(&server));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Server '%s' is not in use and can't be chosen as the master.",
|
||||
master_bref->server()->unique_name);
|
||||
master->server()->unique_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1019,8 +1017,8 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
GWBUF *querybuf, SRWBackend& target, bool store)
|
||||
{
|
||||
/**
|
||||
* If the transaction is READ ONLY set forced_node to bref
|
||||
* That SLAVE backend will be used until COMMIT is seen
|
||||
* If the transaction is READ ONLY set forced_node to this backend.
|
||||
* This SLAVE backend will be used until the COMMIT is seen.
|
||||
*/
|
||||
if (!rses->target_node &&
|
||||
session_trx_is_read_only(rses->client_dcb->session))
|
||||
@ -1116,7 +1114,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
* @return pointer to backend reference of the root master or NULL
|
||||
*
|
||||
*/
|
||||
static SRWBackend get_root_master_bref(ROUTER_CLIENT_SES *rses)
|
||||
static SRWBackend get_root_master_backend(ROUTER_CLIENT_SES *rses)
|
||||
{
|
||||
SRWBackend candidate;
|
||||
SERVER master = {};
|
||||
@ -1124,21 +1122,21 @@ static SRWBackend get_root_master_bref(ROUTER_CLIENT_SES *rses)
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
if (bref->in_use())
|
||||
SRWBackend& backend = *it;
|
||||
if (backend->in_use())
|
||||
{
|
||||
if (bref == rses->current_master)
|
||||
if (backend == rses->current_master)
|
||||
{
|
||||
/** Store master state for better error reporting */
|
||||
master.status = bref->server()->status;
|
||||
master.status = backend->server()->status;
|
||||
}
|
||||
|
||||
if (SERVER_IS_MASTER(bref->server()))
|
||||
if (SERVER_IS_MASTER(backend->server()))
|
||||
{
|
||||
if (!candidate ||
|
||||
(bref->server()->depth < candidate->server()->depth))
|
||||
(backend->server()->depth < candidate->server()->depth))
|
||||
{
|
||||
candidate = bref;
|
||||
candidate = backend;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -28,55 +28,30 @@
|
||||
* not intended to be called from elsewhere.
|
||||
*/
|
||||
|
||||
static void log_server_connections(select_criteria_t select_criteria,
|
||||
ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static SERVER_REF *get_root_master(ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static int bref_cmp_global_conn(const void *bref1, const void *bref2);
|
||||
|
||||
static int bref_cmp_router_conn(const void *bref1, const void *bref2);
|
||||
|
||||
static int bref_cmp_behind_master(const void *bref1, const void *bref2);
|
||||
|
||||
static int bref_cmp_current_load(const void *bref1, const void *bref2);
|
||||
|
||||
/**
|
||||
* The order of functions _must_ match with the order the select criteria are
|
||||
* listed in select_criteria_t definition in readwritesplit.h
|
||||
*/
|
||||
int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *) =
|
||||
{
|
||||
NULL,
|
||||
bref_cmp_global_conn,
|
||||
bref_cmp_router_conn,
|
||||
bref_cmp_behind_master,
|
||||
bref_cmp_current_load
|
||||
};
|
||||
|
||||
/**
|
||||
* Check whether it's possible to use this server as a slave
|
||||
*
|
||||
* @param bref Backend reference
|
||||
* @param master_host The master server
|
||||
* @param server The slave candidate
|
||||
* @param master The master server or NULL if no master is available
|
||||
*
|
||||
* @return True if this server is a valid slave candidate
|
||||
*/
|
||||
static bool valid_for_slave(const SERVER *server, const SERVER *master_host)
|
||||
static bool valid_for_slave(const SERVER *server, const SERVER *master)
|
||||
{
|
||||
return (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) &&
|
||||
(master_host == NULL || (server != master_host));
|
||||
(master == NULL || (server != master));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Find the best slave candidate
|
||||
*
|
||||
* This function iterates through @c bref and tries to find the best backend
|
||||
* This function iterates through @c backend and tries to find the best backend
|
||||
* reference that is not in use. @c cmpfun will be called to compare the backends.
|
||||
*
|
||||
* @param bref Backend reference
|
||||
* @param n Size of @c bref
|
||||
* @param rses Router client session
|
||||
* @param master The master server
|
||||
* @param cmpfun qsort() compatible comparison function
|
||||
*
|
||||
* @return The best slave backend reference or NULL if no candidates could be found
|
||||
*/
|
||||
SRWBackend get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
|
||||
@ -87,21 +62,21 @@ SRWBackend get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
if (!bref->in_use() && bref->can_connect() &&
|
||||
valid_for_slave(bref->server(), master))
|
||||
if (!backend->in_use() && backend->can_connect() &&
|
||||
valid_for_slave(backend->server(), master))
|
||||
{
|
||||
if (candidate)
|
||||
{
|
||||
if (cmpfun(&candidate, &bref) > 0)
|
||||
if (cmpfun(&candidate, &backend) > 0)
|
||||
{
|
||||
candidate = bref;
|
||||
candidate = backend;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
candidate = bref;
|
||||
candidate = backend;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -109,6 +84,198 @@ SRWBackend get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
|
||||
return candidate;
|
||||
}
|
||||
|
||||
/** Compare number of connections from this router in backend servers */
|
||||
static int backend_cmp_router_conn(const void *v1, const void *v2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(v1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(v2);
|
||||
SERVER_REF *first = a->backend();
|
||||
SERVER_REF *second = b->backend();
|
||||
|
||||
if (first->weight == 0 && second->weight == 0)
|
||||
{
|
||||
return first->connections - second->connections;
|
||||
}
|
||||
else if (first->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (second->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * first->connections) / first->weight) -
|
||||
((1000 + 1000 * second->connections) / second->weight);
|
||||
}
|
||||
|
||||
/** Compare number of global connections in backend servers */
|
||||
static int backend_cmp_global_conn(const void *v1, const void *v2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(v1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(v2);
|
||||
SERVER_REF *first = a->backend();
|
||||
SERVER_REF *second = b->backend();
|
||||
|
||||
if (first->weight == 0 && second->weight == 0)
|
||||
{
|
||||
return first->server->stats.n_current -
|
||||
second->server->stats.n_current;
|
||||
}
|
||||
else if (first->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (second->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * first->server->stats.n_current) / first->weight) -
|
||||
((1000 + 1000 * second->server->stats.n_current) / second->weight);
|
||||
}
|
||||
|
||||
/** Compare replication lag between backend servers */
|
||||
static int backend_cmp_behind_master(const void *v1, const void *v2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(v1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(v2);
|
||||
SERVER_REF *first = a->backend();
|
||||
SERVER_REF *second = b->backend();
|
||||
|
||||
if (first->weight == 0 && second->weight == 0)
|
||||
{
|
||||
return first->server->rlag -
|
||||
second->server->rlag;
|
||||
}
|
||||
else if (first->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (second->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * first->server->rlag) / first->weight) -
|
||||
((1000 + 1000 * second->server->rlag) / second->weight);
|
||||
}
|
||||
|
||||
/** Compare number of current operations in backend servers */
|
||||
static int backend_cmp_current_load(const void *v1, const void *v2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(v1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(v2);
|
||||
SERVER_REF *first = a->backend();
|
||||
SERVER_REF *second = b->backend();
|
||||
|
||||
if (first->weight == 0 && second->weight == 0)
|
||||
{
|
||||
return first->server->stats.n_current_ops - second->server->stats.n_current_ops;
|
||||
}
|
||||
else if (first->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (second->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * first->server->stats.n_current_ops) / first->weight) -
|
||||
((1000 + 1000 * second->server->stats.n_current_ops) / second->weight);
|
||||
}
|
||||
|
||||
/**
|
||||
* The order of functions _must_ match with the order the select criteria are
|
||||
* listed in select_criteria_t definition in readwritesplit.h
|
||||
*/
|
||||
int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *) =
|
||||
{
|
||||
NULL,
|
||||
backend_cmp_global_conn,
|
||||
backend_cmp_router_conn,
|
||||
backend_cmp_behind_master,
|
||||
backend_cmp_current_load
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Log server connections
|
||||
*
|
||||
* @param criteria Slave selection criteria
|
||||
* @param rses Router client session
|
||||
*/
|
||||
static void log_server_connections(select_criteria_t criteria,
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
MXS_INFO("Servers and %s connection counts:",
|
||||
criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale" : "router");
|
||||
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SERVER_REF* b = (*it)->backend();
|
||||
|
||||
switch (criteria)
|
||||
{
|
||||
case LEAST_GLOBAL_CONNECTIONS:
|
||||
MXS_INFO("MaxScale connections : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_ROUTER_CONNECTIONS:
|
||||
MXS_INFO("RWSplit connections : %d in \t[%s]:%d %s",
|
||||
b->connections, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_CURRENT_OPERATIONS:
|
||||
MXS_INFO("current operations : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current_ops,
|
||||
b->server->name, b->server->port,
|
||||
STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_BEHIND_MASTER:
|
||||
MXS_INFO("replication lag : %d in \t[%s]:%d %s",
|
||||
b->server->rlag, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
default:
|
||||
ss_dassert(!true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @brief Find the master server that is at the root of the replication tree
|
||||
*
|
||||
* @param rses Router client session
|
||||
*
|
||||
* @return The root master reference or NULL if no master is found
|
||||
*/
|
||||
static SERVER_REF* get_root_master(ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
SERVER_REF *master_host = NULL;
|
||||
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SERVER_REF* b = (*it)->backend();
|
||||
|
||||
if (SERVER_IS_MASTER(b->server))
|
||||
{
|
||||
if (master_host == NULL ||
|
||||
(b->server->depth < master_host->server->depth))
|
||||
{
|
||||
master_host = b;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return master_host;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Search suitable backend servers from those of router instance
|
||||
*
|
||||
@ -117,15 +284,15 @@ SRWBackend get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
|
||||
* backend references than connected backends because only those in correct state
|
||||
* are connected to.
|
||||
*
|
||||
* @param p_master_ref Pointer to location where master's backend reference is to be stored
|
||||
* @param backend_ref Pointer to backend server reference object array
|
||||
* @param router_nservers Number of backend server pointers pointed to by @p backend_ref
|
||||
* @param max_nslaves Upper limit for the number of slaves
|
||||
* @param max_slave_rlag Maximum allowed replication lag for any slave
|
||||
* @param router_nservers Number of backend servers
|
||||
* @param max_nslaves Upper limit for the number of slaves
|
||||
* @param select_criteria Slave selection criteria
|
||||
* @param session Client session
|
||||
* @param router Router instance
|
||||
* @return true, if at least one master and one slave was found.
|
||||
* @param session Client session
|
||||
* @param router Router instance
|
||||
* @param rses Router client session
|
||||
* @param type Connection type, ALL for all types, SLAVE for slaves only
|
||||
*
|
||||
* @return True if at least one master and one slave was found
|
||||
*/
|
||||
bool select_connect_backend_servers(int router_nservers,
|
||||
int max_nslaves,
|
||||
@ -178,13 +345,13 @@ bool select_connect_backend_servers(int router_nservers,
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
if (bref->can_connect() && master_host && bref->server() == master_host)
|
||||
if (backend->can_connect() && master_host && backend->server() == master_host)
|
||||
{
|
||||
if (bref->connect(session))
|
||||
if (backend->connect(session))
|
||||
{
|
||||
rses->current_master = bref;
|
||||
rses->current_master = backend;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -194,13 +361,13 @@ bool select_connect_backend_servers(int router_nservers,
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
SRWBackend& backend = *it;
|
||||
|
||||
if (bref->can_connect() && valid_for_slave(bref->server(), master_host))
|
||||
if (backend->can_connect() && valid_for_slave(backend->server(), master_host))
|
||||
{
|
||||
slaves_found += 1;
|
||||
|
||||
if (bref->in_use())
|
||||
if (backend->in_use())
|
||||
{
|
||||
slaves_connected += 1;
|
||||
}
|
||||
@ -210,17 +377,17 @@ bool select_connect_backend_servers(int router_nservers,
|
||||
ss_dassert(slaves_connected < max_nslaves || max_nslaves == 0);
|
||||
|
||||
/** Connect to all possible slaves */
|
||||
for (SRWBackend bref(get_slave_candidate(rses, master_host, cmpfun));
|
||||
bref && slaves_connected < max_nslaves;
|
||||
bref = get_slave_candidate(rses, master_host, cmpfun))
|
||||
for (SRWBackend backend(get_slave_candidate(rses, master_host, cmpfun));
|
||||
backend && slaves_connected < max_nslaves;
|
||||
backend = get_slave_candidate(rses, master_host, cmpfun))
|
||||
{
|
||||
if (bref->can_connect() && bref->connect(session))
|
||||
if (backend->can_connect() && backend->connect(session))
|
||||
{
|
||||
if (rses->sescmd_list.size())
|
||||
{
|
||||
bref->append_session_command(rses->sescmd_list);
|
||||
backend->append_session_command(rses->sescmd_list);
|
||||
|
||||
if (bref->execute_session_command())
|
||||
if (backend->execute_session_command())
|
||||
{
|
||||
rses->expected_responses++;
|
||||
slaves_connected++;
|
||||
@ -249,11 +416,11 @@ bool select_connect_backend_servers(int router_nservers,
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SRWBackend& bref = *it;
|
||||
if (bref->in_use())
|
||||
SRWBackend& backend = *it;
|
||||
if (backend->in_use())
|
||||
{
|
||||
MXS_INFO("Selected %s in \t[%s]:%d", STRSRVSTATUS(bref->server()),
|
||||
bref->server()->name, bref->server()->port);
|
||||
MXS_INFO("Selected %s in \t[%s]:%d", STRSRVSTATUS(backend->server()),
|
||||
backend->server()->name, backend->server()->port);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -268,190 +435,3 @@ bool select_connect_backend_servers(int router_nservers,
|
||||
|
||||
return succp;
|
||||
}
|
||||
|
||||
/** Compare number of connections from this router in backend servers */
|
||||
static int bref_cmp_router_conn(const void *bref1, const void *bref2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(bref1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(bref2);
|
||||
SERVER_REF *b1 = a->backend();
|
||||
SERVER_REF *b2 = b->backend();
|
||||
|
||||
if (b1->weight == 0 && b2->weight == 0)
|
||||
{
|
||||
return b1->connections - b2->connections;
|
||||
}
|
||||
else if (b1->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (b2->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * b1->connections) / b1->weight) -
|
||||
((1000 + 1000 * b2->connections) / b2->weight);
|
||||
}
|
||||
|
||||
/** Compare number of global connections in backend servers */
|
||||
static int bref_cmp_global_conn(const void *bref1, const void *bref2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(bref1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(bref2);
|
||||
SERVER_REF *b1 = a->backend();
|
||||
SERVER_REF *b2 = b->backend();
|
||||
|
||||
if (b1->weight == 0 && b2->weight == 0)
|
||||
{
|
||||
return b1->server->stats.n_current -
|
||||
b2->server->stats.n_current;
|
||||
}
|
||||
else if (b1->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (b2->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * b1->server->stats.n_current) / b1->weight) -
|
||||
((1000 + 1000 * b2->server->stats.n_current) / b2->weight);
|
||||
}
|
||||
|
||||
/** Compare replication lag between backend servers */
|
||||
static int bref_cmp_behind_master(const void *bref1, const void *bref2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(bref1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(bref2);
|
||||
SERVER_REF *b1 = a->backend();
|
||||
SERVER_REF *b2 = b->backend();
|
||||
|
||||
if (b1->weight == 0 && b2->weight == 0)
|
||||
{
|
||||
return b1->server->rlag -
|
||||
b2->server->rlag;
|
||||
}
|
||||
else if (b1->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (b2->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * b1->server->rlag) / b1->weight) -
|
||||
((1000 + 1000 * b2->server->rlag) / b2->weight);
|
||||
}
|
||||
|
||||
/** Compare number of current operations in backend servers */
|
||||
static int bref_cmp_current_load(const void *bref1, const void *bref2)
|
||||
{
|
||||
const SRWBackend& a = *reinterpret_cast<const SRWBackend*>(bref1);
|
||||
const SRWBackend& b = *reinterpret_cast<const SRWBackend*>(bref2);
|
||||
SERVER_REF *b1 = a->backend();
|
||||
SERVER_REF *b2 = b->backend();
|
||||
|
||||
if (b1->weight == 0 && b2->weight == 0)
|
||||
{
|
||||
return b1->server->stats.n_current_ops - b2->server->stats.n_current_ops;
|
||||
}
|
||||
else if (b1->weight == 0)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else if (b2->weight == 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
return ((1000 + 1000 * b1->server->stats.n_current_ops) / b1->weight) -
|
||||
((1000 + 1000 * b2->server->stats.n_current_ops) / b2->weight);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Log server connections
|
||||
*
|
||||
* @param select_criteria Slave selection criteria
|
||||
* @param backend_ref Backend reference array
|
||||
* @param router_nservers Number of backends in @p backend_ref
|
||||
*/
|
||||
static void log_server_connections(select_criteria_t select_criteria,
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
MXS_INFO("Servers and %s connection counts:",
|
||||
select_criteria == LEAST_GLOBAL_CONNECTIONS ? "all MaxScale"
|
||||
: "router");
|
||||
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SERVER_REF* b = (*it)->backend();
|
||||
|
||||
switch (select_criteria)
|
||||
{
|
||||
case LEAST_GLOBAL_CONNECTIONS:
|
||||
MXS_INFO("MaxScale connections : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_ROUTER_CONNECTIONS:
|
||||
MXS_INFO("RWSplit connections : %d in \t[%s]:%d %s",
|
||||
b->connections, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_CURRENT_OPERATIONS:
|
||||
MXS_INFO("current operations : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current_ops,
|
||||
b->server->name, b->server->port,
|
||||
STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_BEHIND_MASTER:
|
||||
MXS_INFO("replication lag : %d in \t[%s]:%d %s",
|
||||
b->server->rlag, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
default:
|
||||
ss_dassert(!true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
/********************************
|
||||
* This routine returns the root master server from MySQL replication tree
|
||||
* Get the root Master rule:
|
||||
*
|
||||
* find server with the lowest replication depth level
|
||||
* and the SERVER_MASTER bitval
|
||||
* Servers are checked even if they are in 'maintenance'
|
||||
*
|
||||
* @param servers The list of servers
|
||||
* @param router_nservers The number of servers
|
||||
* @return The Master found
|
||||
*
|
||||
*/
|
||||
static SERVER_REF *get_root_master(ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
SERVER_REF *master_host = NULL;
|
||||
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SERVER_REF* b = (*it)->backend();
|
||||
|
||||
if (SERVER_IS_MASTER(b->server))
|
||||
{
|
||||
if (master_host == NULL ||
|
||||
(b->server->depth < master_host->server->depth))
|
||||
{
|
||||
master_host = b;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return master_host;
|
||||
}
|
||||
|
@ -26,21 +26,22 @@
|
||||
* Functions for session command handling
|
||||
*/
|
||||
|
||||
void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& bref, GWBUF** ppPacket, bool* reconnect)
|
||||
void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& backend,
|
||||
GWBUF** ppPacket, bool* pReconnect)
|
||||
{
|
||||
if (bref->session_command_count())
|
||||
if (backend->session_command_count())
|
||||
{
|
||||
/** We are executing a session command */
|
||||
if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket)))
|
||||
{
|
||||
uint8_t cmd;
|
||||
gwbuf_copy_data(*ppPacket, MYSQL_HEADER_LEN, 1, &cmd);
|
||||
uint64_t id = bref->complete_session_command();
|
||||
uint64_t id = backend->complete_session_command();
|
||||
|
||||
if (rses->recv_sescmd < rses->sent_sescmd &&
|
||||
id == rses->recv_sescmd + 1 &&
|
||||
(!rses->current_master || // Session doesn't have a master
|
||||
rses->current_master == bref)) // This is the master's response
|
||||
rses->current_master == backend)) // This is the master's response
|
||||
{
|
||||
/** First reply to this session command, route it to the client */
|
||||
++rses->recv_sescmd;
|
||||
@ -60,9 +61,9 @@ void process_sescmd_response(ROUTER_CLIENT_SES* rses, SRWBackend& bref, GWBUF**
|
||||
{
|
||||
MXS_ERROR("Slave server '%s': response differs from master's response. "
|
||||
"Closing connection due to inconsistent session state.",
|
||||
bref->server()->unique_name);
|
||||
bref->close(mxs::Backend::CLOSE_FATAL);
|
||||
*reconnect = true;
|
||||
backend->server()->unique_name);
|
||||
backend->close(mxs::Backend::CLOSE_FATAL);
|
||||
*pReconnect = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user