Refactor routing code to use the Backend class
The Backend class is now used to handle the interaction with the backend servers in the code that decides where each query is routed.
This commit is contained in:
@ -31,7 +31,7 @@
|
||||
|
||||
extern int (*criteria_cmpfun[LAST_CRITERIA])(const void *, const void *);
|
||||
|
||||
static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses);
|
||||
static SRWBackend get_root_master_bref(ROUTER_CLIENT_SES *rses);
|
||||
|
||||
/**
|
||||
* Find out which of the two backend servers has smaller value for select
|
||||
@ -45,9 +45,9 @@ static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses);
|
||||
* value in selection criteria. If either reference pointer is NULL then the
|
||||
* other reference pointer value is returned.
|
||||
*/
|
||||
static backend_ref_t *check_candidate_bref(backend_ref_t *cand,
|
||||
backend_ref_t *new_bref,
|
||||
select_criteria_t sc)
|
||||
static SRWBackend& check_candidate_bref(SRWBackend& cand,
|
||||
SRWBackend& new_bref,
|
||||
select_criteria_t sc)
|
||||
{
|
||||
int (*p)(const void *, const void *);
|
||||
/** get compare function */
|
||||
@ -57,7 +57,7 @@ static backend_ref_t *check_candidate_bref(backend_ref_t *cand,
|
||||
{
|
||||
return cand;
|
||||
}
|
||||
else if (cand == NULL || (p((void *)cand, (void *)new_bref) > 0))
|
||||
else if (cand == NULL || (p((void *)&cand, (void *)&new_bref) > 0))
|
||||
{
|
||||
return new_bref;
|
||||
}
|
||||
@ -68,31 +68,32 @@ static backend_ref_t *check_candidate_bref(backend_ref_t *cand,
|
||||
}
|
||||
|
||||
void handle_connection_keepalive(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
DCB *target_dcb)
|
||||
SRWBackend& target)
|
||||
{
|
||||
ss_dassert(target_dcb);
|
||||
ss_dassert(target);
|
||||
ss_debug(int nserv = 0);
|
||||
/** Each heartbeat is 1/10th of a second */
|
||||
int keepalive = inst->rwsplit_config.connection_keepalive * 10;
|
||||
|
||||
for (int i = 0; i < rses->rses_nbackends; i++)
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
/** Each heartbeat is 1/10th of a second */
|
||||
int keepalive = inst->rwsplit_config.connection_keepalive * 10;
|
||||
backend_ref_t *bref = &rses->rses_backend_ref[i];
|
||||
SRWBackend bref = *it;
|
||||
|
||||
if (bref->bref_dcb != target_dcb && BREF_IS_IN_USE(bref) &&
|
||||
!BREF_IS_WAITING_RESULT(bref))
|
||||
if (bref->in_use() && bref != target && !bref->is_waiting_result())
|
||||
{
|
||||
ss_debug(nserv++);
|
||||
int diff = hkheartbeat - bref->bref_dcb->last_read;
|
||||
int diff = hkheartbeat - bref->dcb()->last_read;
|
||||
|
||||
if (diff > keepalive)
|
||||
{
|
||||
MXS_INFO("Pinging %s, idle for %d seconds",
|
||||
bref->bref_dcb->server->unique_name, diff / 10);
|
||||
modutil_ignorable_ping(bref->bref_dcb);
|
||||
bref->server()->unique_name, diff / 10);
|
||||
modutil_ignorable_ping(bref->dcb());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ss_dassert(nserv < rses->rses_nbackends);
|
||||
}
|
||||
|
||||
@ -111,7 +112,6 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
{
|
||||
uint32_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
uint8_t packet_type;
|
||||
DCB *target_dcb = NULL;
|
||||
route_target_t route_target;
|
||||
bool succp = false;
|
||||
bool non_empty_packet;
|
||||
@ -158,13 +158,14 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
rses->rses_load_data_sent + gwbuf_length(querybuf));
|
||||
}
|
||||
|
||||
SRWBackend target;
|
||||
|
||||
if (TARGET_IS_ALL(route_target))
|
||||
{
|
||||
succp = handle_target_is_all(route_target, inst, rses, querybuf, packet_type, qtype);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Now we have a lock on the router session */
|
||||
bool store_stmt = false;
|
||||
/**
|
||||
* There is a hint which either names the target backend or
|
||||
@ -174,37 +175,36 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
if (TARGET_IS_NAMED_SERVER(route_target) ||
|
||||
TARGET_IS_RLAG_MAX(route_target))
|
||||
{
|
||||
succp = handle_hinted_target(rses, querybuf, route_target, &target_dcb);
|
||||
succp = handle_hinted_target(rses, querybuf, route_target, target);
|
||||
}
|
||||
else if (TARGET_IS_SLAVE(route_target))
|
||||
{
|
||||
succp = handle_slave_is_target(inst, rses, &target_dcb);
|
||||
succp = handle_slave_is_target(inst, rses, target);
|
||||
store_stmt = rses->rses_config.retry_failed_reads;
|
||||
}
|
||||
else if (TARGET_IS_MASTER(route_target))
|
||||
{
|
||||
succp = handle_master_is_target(inst, rses, &target_dcb);
|
||||
succp = handle_master_is_target(inst, rses, target);
|
||||
|
||||
if (!rses->rses_config.strict_multi_stmt &&
|
||||
rses->forced_node == rses->rses_master_ref)
|
||||
rses->target_node == rses->current_master)
|
||||
{
|
||||
/** Reset the forced node as we're in relaxed multi-statement mode */
|
||||
rses->forced_node = NULL;
|
||||
rses->target_node.reset();
|
||||
}
|
||||
}
|
||||
|
||||
if (target_dcb && succp) /*< Have DCB of the target backend */
|
||||
if (target && succp) /*< Have DCB of the target backend */
|
||||
{
|
||||
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
|
||||
handle_got_target(inst, rses, querybuf, target_dcb, store_stmt);
|
||||
handle_got_target(inst, rses, querybuf, target, store_stmt);
|
||||
}
|
||||
}
|
||||
|
||||
if (succp && inst->rwsplit_config.connection_keepalive &&
|
||||
(TARGET_IS_SLAVE(route_target) || TARGET_IS_MASTER(route_target)))
|
||||
{
|
||||
handle_connection_keepalive(inst, rses, target_dcb);
|
||||
handle_connection_keepalive(inst, rses, target);
|
||||
}
|
||||
|
||||
return succp;
|
||||
@ -439,174 +439,76 @@ return_succp:
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Function to hash keys in read-write split router
|
||||
* Provide the router with a reference to a suitable backend
|
||||
*
|
||||
* Used to store information about temporary tables.
|
||||
* @param rses Pointer to router client session
|
||||
* @param btype Backend type
|
||||
* @param name Name of the backend which is primarily searched. May be NULL.
|
||||
* @param max_rlag Maximum replication lag
|
||||
* @param target The target backend
|
||||
*
|
||||
* @param key key to be hashed, actually a character string
|
||||
* @result the hash value integer
|
||||
* @return True if a backend was found
|
||||
*/
|
||||
int rwsplit_hashkeyfun(const void *key)
|
||||
bool get_target_backend(ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
char *name, int max_rlag, SRWBackend& target)
|
||||
{
|
||||
if (key == NULL)
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
/** Check whether using rses->target_node as target SLAVE */
|
||||
if (rses->target_node && session_trx_is_read_only(rses->client_dcb->session))
|
||||
{
|
||||
return 0;
|
||||
MXS_DEBUG("In READ ONLY transaction, using server '%s'",
|
||||
rses->target_node->server()->unique_name);
|
||||
target = rses->target_node;
|
||||
return true;
|
||||
}
|
||||
|
||||
unsigned int hash = 0, c = 0;
|
||||
const char *ptr = (const char *)key;
|
||||
|
||||
while ((c = *ptr++))
|
||||
{
|
||||
hash = c + (hash << 6) + (hash << 16) - hash;
|
||||
}
|
||||
return hash;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Function to compare hash keys in read-write split router
|
||||
*
|
||||
* Used to manage information about temporary tables.
|
||||
*
|
||||
* @param key first key to be compared, actually a character string
|
||||
* @param v2 second key to be compared, actually a character string
|
||||
* @result 1 if keys are equal, 0 otherwise
|
||||
*/
|
||||
int rwsplit_hashcmpfun(const void *v1, const void *v2)
|
||||
{
|
||||
const char *i1 = (const char *)v1;
|
||||
const char *i2 = (const char *)v2;
|
||||
|
||||
return strcmp(i1, i2);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Function to duplicate a hash value in read-write split router
|
||||
*
|
||||
* Used to manage information about temporary tables.
|
||||
*
|
||||
* @param fval value to be duplicated, actually a character string
|
||||
* @result the duplicated value, actually a character string
|
||||
*/
|
||||
void *rwsplit_hstrdup(const void *fval)
|
||||
{
|
||||
char *str = (char *)fval;
|
||||
return MXS_STRDUP(str);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Function to free hash values in read-write split router
|
||||
*
|
||||
* Used to manage information about temporary tables.
|
||||
*
|
||||
* @param key value to be freed
|
||||
*/
|
||||
void rwsplit_hfree(void *fval)
|
||||
{
|
||||
MXS_FREE(fval);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide the router with a pointer to a suitable backend dcb.
|
||||
*
|
||||
* Detect failures in server statuses and reselect backends if necessary.
|
||||
* If name is specified, server name becomes primary selection criteria.
|
||||
* Similarly, if max replication lag is specified, skip backends which lag too
|
||||
* much.
|
||||
*
|
||||
* @param p_dcb Address of the pointer to the resulting DCB
|
||||
* @param rses Pointer to router client session
|
||||
* @param btype Backend type
|
||||
* @param name Name of the backend which is primarily searched. May be NULL.
|
||||
*
|
||||
* @return True if proper DCB was found, false otherwise.
|
||||
*/
|
||||
bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
char *name, int max_rlag)
|
||||
{
|
||||
backend_ref_t *backend_ref;
|
||||
backend_ref_t *master_bref;
|
||||
int i;
|
||||
bool succp = false;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
|
||||
|
||||
if (p_dcb == NULL)
|
||||
{
|
||||
goto return_succp;
|
||||
}
|
||||
backend_ref = rses->rses_backend_ref;
|
||||
|
||||
/** Check whether using rses->forced_node as target SLAVE */
|
||||
if (rses->forced_node &&
|
||||
session_trx_is_read_only(rses->client_dcb->session))
|
||||
{
|
||||
*p_dcb = rses->forced_node->bref_dcb;
|
||||
succp = true;
|
||||
|
||||
MXS_DEBUG("force_node found in READ ONLY transaction: use slave %s",
|
||||
(*p_dcb)->server->unique_name);
|
||||
|
||||
goto return_succp;
|
||||
}
|
||||
|
||||
/** get root master from available servers */
|
||||
master_bref = get_root_master_bref(rses);
|
||||
SRWBackend master_bref = get_root_master_bref(rses);
|
||||
|
||||
if (name != NULL) /*< Choose backend by name from a hint */
|
||||
if (name) /*< Choose backend by name from a hint */
|
||||
{
|
||||
ss_dassert(btype != BE_MASTER); /*< Master dominates and no name should be passed with it */
|
||||
|
||||
for (i = 0; i < rses->rses_nbackends; i++)
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SERVER_REF *b = backend_ref[i].ref;
|
||||
SERVER server;
|
||||
server.status = b->server->status;
|
||||
/**
|
||||
* To become chosen:
|
||||
* backend must be in use, name must match,
|
||||
* backend's role must be either slave, relay
|
||||
* server, or master.
|
||||
*/
|
||||
if (BREF_IS_IN_USE((&backend_ref[i])) &&
|
||||
SERVER_REF_IS_ACTIVE(b) &&
|
||||
(strncasecmp(name, b->server->unique_name, PATH_MAX) == 0) &&
|
||||
(SERVER_IS_SLAVE(&server) || SERVER_IS_RELAY_SERVER(&server) ||
|
||||
SERVER_IS_MASTER(&server)))
|
||||
SRWBackend& bref = *it;
|
||||
|
||||
/** The server must be a valid slave, relay server, or master */
|
||||
|
||||
if (bref->in_use() && bref->is_active() &&
|
||||
(strcasecmp(name, bref->server()->unique_name) == 0) &&
|
||||
(SERVER_IS_SLAVE(bref->server()) ||
|
||||
SERVER_IS_RELAY_SERVER(bref->server()) ||
|
||||
SERVER_IS_MASTER(bref->server())))
|
||||
{
|
||||
*p_dcb = backend_ref[i].bref_dcb;
|
||||
succp = true;
|
||||
ss_dassert(backend_ref[i].bref_dcb->state != DCB_STATE_ZOMBIE);
|
||||
break;
|
||||
target = bref;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if (succp)
|
||||
{
|
||||
goto return_succp;
|
||||
}
|
||||
else
|
||||
{
|
||||
btype = BE_SLAVE;
|
||||
}
|
||||
|
||||
/** No server found, use a normal slave for it */
|
||||
btype = BE_SLAVE;
|
||||
}
|
||||
|
||||
if (btype == BE_SLAVE)
|
||||
{
|
||||
backend_ref_t *candidate_bref = NULL;
|
||||
SRWBackend candidate_bref;
|
||||
|
||||
for (i = 0; i < rses->rses_nbackends; i++)
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
SERVER_REF *b = backend_ref[i].ref;
|
||||
SERVER server;
|
||||
SERVER candidate;
|
||||
server.status = b->server->status;
|
||||
SRWBackend& bref = *it;
|
||||
|
||||
/**
|
||||
* Unused backend or backend which is not master nor
|
||||
* slave can't be used
|
||||
*/
|
||||
if (!BREF_IS_IN_USE(&backend_ref[i]) || !SERVER_REF_IS_ACTIVE(b) ||
|
||||
(!SERVER_IS_MASTER(&server) && !SERVER_IS_SLAVE(&server)))
|
||||
if (!bref->in_use() || !bref->is_active() ||
|
||||
(!SERVER_IS_MASTER(bref->server()) && !SERVER_IS_SLAVE(bref->server())))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
@ -614,17 +516,16 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
* If there are no candidates yet accept both master or
|
||||
* slave.
|
||||
*/
|
||||
else if (candidate_bref == NULL)
|
||||
else if (!candidate_bref)
|
||||
{
|
||||
/**
|
||||
* Ensure that master has not changed dunring
|
||||
* Ensure that master has not changed during
|
||||
* session and abort if it has.
|
||||
*/
|
||||
if (SERVER_IS_MASTER(&server) && &backend_ref[i] == master_bref)
|
||||
if (SERVER_IS_MASTER(bref->server()) && bref == rses->current_master)
|
||||
{
|
||||
/** found master */
|
||||
candidate_bref = &backend_ref[i];
|
||||
candidate.status = candidate_bref->ref->server->status;
|
||||
candidate_bref = bref;
|
||||
succp = true;
|
||||
}
|
||||
/**
|
||||
@ -633,12 +534,11 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
* maximum allowed replication lag.
|
||||
*/
|
||||
else if (max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(b->server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
b->server->rlag <= max_rlag))
|
||||
(bref->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
bref->server()->rlag <= max_rlag))
|
||||
{
|
||||
/** found slave */
|
||||
candidate_bref = &backend_ref[i];
|
||||
candidate.status = candidate_bref->ref->server->status;
|
||||
candidate_bref = bref;
|
||||
succp = true;
|
||||
}
|
||||
}
|
||||
@ -646,15 +546,15 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
* If candidate is master, any slave which doesn't break
|
||||
* replication lag limits replaces it.
|
||||
*/
|
||||
else if (SERVER_IS_MASTER(&candidate) && SERVER_IS_SLAVE(&server) &&
|
||||
else if (SERVER_IS_MASTER(candidate_bref->server()) &&
|
||||
SERVER_IS_SLAVE(bref->server()) &&
|
||||
(max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(b->server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
b->server->rlag <= max_rlag)) &&
|
||||
(bref->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
bref->server()->rlag <= max_rlag)) &&
|
||||
!rses->rses_config.master_accept_reads)
|
||||
{
|
||||
/** found slave */
|
||||
candidate_bref = &backend_ref[i];
|
||||
candidate.status = candidate_bref->ref->server->status;
|
||||
candidate_bref = bref;
|
||||
succp = true;
|
||||
}
|
||||
/**
|
||||
@ -662,76 +562,73 @@ bool rwsplit_get_dcb(DCB **p_dcb, ROUTER_CLIENT_SES *rses, backend_type_t btype,
|
||||
* backend and update assign it to new candidate if
|
||||
* necessary.
|
||||
*/
|
||||
else if (SERVER_IS_SLAVE(&server) ||
|
||||
(rses->rses_config.master_accept_reads && SERVER_IS_MASTER(&server)))
|
||||
else if (SERVER_IS_SLAVE(bref->server()) ||
|
||||
(rses->rses_config.master_accept_reads &&
|
||||
SERVER_IS_MASTER(bref->server())))
|
||||
{
|
||||
if (max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(b->server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
b->server->rlag <= max_rlag))
|
||||
(bref->server()->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
bref->server()->rlag <= max_rlag))
|
||||
{
|
||||
candidate_bref = check_candidate_bref(candidate_bref, &backend_ref[i],
|
||||
candidate_bref = check_candidate_bref(candidate_bref, bref,
|
||||
rses->rses_config.slave_selection_criteria);
|
||||
candidate.status = candidate_bref->ref->server->status;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Server [%s]:%d is too much behind the master, %d s. and can't be chosen.",
|
||||
b->server->name, b->server->port, b->server->rlag);
|
||||
MXS_INFO("Server [%s]:%d is too much behind the master "
|
||||
"(%d seconds) and can't be chosen",
|
||||
bref->server()->name, bref->server()->port,
|
||||
bref->server()->rlag);
|
||||
}
|
||||
}
|
||||
} /*< for */
|
||||
|
||||
/** Assign selected DCB's pointer value */
|
||||
if (candidate_bref != NULL)
|
||||
if (candidate_bref)
|
||||
{
|
||||
*p_dcb = candidate_bref->bref_dcb;
|
||||
target = candidate_bref;
|
||||
}
|
||||
|
||||
goto return_succp;
|
||||
} /*< if (btype == BE_SLAVE) */
|
||||
}
|
||||
/**
|
||||
* If target was originally master only then the execution jumps
|
||||
* directly here.
|
||||
*/
|
||||
if (btype == BE_MASTER)
|
||||
else if (btype == BE_MASTER)
|
||||
{
|
||||
if (master_bref && SERVER_REF_IS_ACTIVE(master_bref->ref))
|
||||
if (master_bref && master_bref->is_active())
|
||||
{
|
||||
/** It is possible for the server status to change at any point in time
|
||||
* so copying it locally will make possible error messages
|
||||
* easier to understand */
|
||||
SERVER server;
|
||||
server.status = master_bref->ref->server->status;
|
||||
server.status = master_bref->server()->status;
|
||||
|
||||
if (BREF_IS_IN_USE(master_bref))
|
||||
if (master_bref->in_use())
|
||||
{
|
||||
if (SERVER_IS_MASTER(&server))
|
||||
{
|
||||
*p_dcb = master_bref->bref_dcb;
|
||||
target = master_bref;
|
||||
succp = true;
|
||||
/** if bref is in use DCB should not be closed */
|
||||
ss_dassert(master_bref->bref_dcb->state != DCB_STATE_ZOMBIE);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Server '%s' should be master but "
|
||||
"is %s instead and can't be chosen as the master.",
|
||||
master_bref->ref->server->unique_name,
|
||||
MXS_ERROR("Server '%s' should be master but is %s instead "
|
||||
"and can't be chosen as the master.",
|
||||
master_bref->server()->unique_name,
|
||||
STRSRVSTATUS(&server));
|
||||
succp = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Server '%s' is not in use and can't be "
|
||||
"chosen as the master.",
|
||||
master_bref->ref->server->unique_name);
|
||||
MXS_ERROR("Server '%s' is not in use and can't be chosen as the master.",
|
||||
master_bref->server()->unique_name);
|
||||
succp = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
@ -754,7 +651,7 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
|
||||
mxs_target_t use_sql_variables_in = rses->rses_config.use_sql_variables_in;
|
||||
int target = TARGET_UNDEFINED;
|
||||
|
||||
if (rses->forced_node && rses->forced_node == rses->rses_master_ref)
|
||||
if (rses->target_node && rses->target_node == rses->current_master)
|
||||
{
|
||||
target = TARGET_MASTER;
|
||||
}
|
||||
@ -953,12 +850,11 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||
* effective since we don't have a node to force queries to. In this
|
||||
* situation, assigning QUERY_TYPE_WRITE for the query will trigger
|
||||
* the error processing. */
|
||||
if ((rses->forced_node == NULL || rses->forced_node != rses->rses_master_ref) &&
|
||||
if ((rses->target_node == NULL || rses->target_node != rses->current_master) &&
|
||||
check_for_multi_stmt(querybuf, rses->client_dcb->protocol, packet_type))
|
||||
{
|
||||
if (rses->rses_master_ref)
|
||||
if (rses->current_master)
|
||||
{
|
||||
rses->forced_node = rses->rses_master_ref;
|
||||
rses->target_node = rses->current_master;
|
||||
MXS_INFO("Multi-statement query, routing all future queries to master.");
|
||||
}
|
||||
@ -1041,15 +937,13 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||
* @return bool - true if succeeded, false otherwise
|
||||
*/
|
||||
bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||
route_target_t route_target, DCB **target_dcb)
|
||||
route_target_t route_target, SRWBackend& target)
|
||||
{
|
||||
HINT *hint;
|
||||
char *named_server = NULL;
|
||||
backend_type_t btype; /*< target backend type */
|
||||
int rlag_max = MAX_RLAG_UNDEFINED;
|
||||
bool succp;
|
||||
|
||||
hint = querybuf->hint;
|
||||
HINT* hint = querybuf->hint;
|
||||
|
||||
while (hint != NULL)
|
||||
{
|
||||
@ -1084,13 +978,13 @@ bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||
}
|
||||
|
||||
/** target may be master or slave */
|
||||
btype = route_target & TARGET_SLAVE ? BE_SLAVE : BE_MASTER;
|
||||
backend_type_t btype = route_target & TARGET_SLAVE ? BE_SLAVE : BE_MASTER;
|
||||
|
||||
/**
|
||||
* Search backend server by name or replication lag.
|
||||
* If it fails, then try to find valid slave or master.
|
||||
*/
|
||||
succp = rwsplit_get_dcb(target_dcb, rses, btype, named_server, rlag_max);
|
||||
succp = get_target_backend(rses, btype, named_server, rlag_max, target);
|
||||
|
||||
if (!succp)
|
||||
{
|
||||
@ -1122,14 +1016,14 @@ bool handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||
* @return bool - true if succeeded, false otherwise
|
||||
*/
|
||||
bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
DCB **target_dcb)
|
||||
SRWBackend& target)
|
||||
{
|
||||
int rlag_max = rses_get_max_replication_lag(rses);
|
||||
|
||||
/**
|
||||
* Search suitable backend server, get DCB in target_dcb
|
||||
*/
|
||||
if (rwsplit_get_dcb(target_dcb, rses, BE_SLAVE, NULL, rlag_max))
|
||||
if (get_target_backend(rses, BE_SLAVE, NULL, rlag_max, target))
|
||||
{
|
||||
atomic_add_uint64(&inst->stats.n_slave, 1);
|
||||
return true;
|
||||
@ -1147,7 +1041,7 @@ bool handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
* @param rses Router session
|
||||
*/
|
||||
static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
|
||||
DCB *master_dcb, DCB *curr_master_dcb)
|
||||
SRWBackend& old_master, SRWBackend& curr_master)
|
||||
{
|
||||
char errmsg[MAX_SERVER_ADDRESS_LEN * 2 + 100]; // Extra space for error message
|
||||
|
||||
@ -1155,28 +1049,28 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
|
||||
{
|
||||
sprintf(errmsg, "Could not find a valid master connection");
|
||||
}
|
||||
else if (master_dcb && curr_master_dcb)
|
||||
else if (old_master && curr_master)
|
||||
{
|
||||
/** We found a master but it's not the same connection */
|
||||
ss_dassert(master_dcb != curr_master_dcb);
|
||||
if (master_dcb->server != curr_master_dcb->server)
|
||||
ss_dassert(old_master != curr_master);
|
||||
if (old_master->server() != curr_master->server())
|
||||
{
|
||||
sprintf(errmsg, "Master server changed from '%s' to '%s'",
|
||||
master_dcb->server->unique_name,
|
||||
curr_master_dcb->server->unique_name);
|
||||
old_master->server()->unique_name,
|
||||
curr_master->server()->unique_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(false); // Currently we don't reconnect to the master
|
||||
sprintf(errmsg, "Connection to master '%s' was recreated",
|
||||
curr_master_dcb->server->unique_name);
|
||||
curr_master->server()->unique_name);
|
||||
}
|
||||
}
|
||||
else if (master_dcb)
|
||||
else if (old_master)
|
||||
{
|
||||
/** We have an original master connection but we couldn't find it */
|
||||
sprintf(errmsg, "The connection to master server '%s' is not available",
|
||||
master_dcb->server->unique_name);
|
||||
old_master->server()->unique_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1211,44 +1105,31 @@ static void log_master_routing_failure(ROUTER_CLIENT_SES *rses, bool found,
|
||||
* @return bool - true if succeeded, false otherwise
|
||||
*/
|
||||
bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
DCB **target_dcb)
|
||||
SRWBackend& target)
|
||||
{
|
||||
DCB *master_dcb = rses->rses_master_ref ? rses->rses_master_ref->bref_dcb : NULL;
|
||||
DCB *curr_master_dcb = NULL;
|
||||
bool succp = rwsplit_get_dcb(&curr_master_dcb, rses, BE_MASTER, NULL, MAX_RLAG_UNDEFINED);
|
||||
bool succp = get_target_backend(rses, BE_MASTER, NULL, MAX_RLAG_UNDEFINED, target);
|
||||
|
||||
if (succp && master_dcb == curr_master_dcb)
|
||||
if (succp && target == rses->current_master)
|
||||
{
|
||||
atomic_add_uint64(&inst->stats.n_master, 1);
|
||||
*target_dcb = master_dcb;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (succp && master_dcb == curr_master_dcb)
|
||||
/** The original master is not available, we can't route the write */
|
||||
if (rses->rses_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
||||
{
|
||||
atomic_add_uint64(&inst->stats.n_master, 1);
|
||||
*target_dcb = master_dcb;
|
||||
succp = send_readonly_error(rses->client_dcb);
|
||||
|
||||
if (rses->current_master && rses->current_master->in_use())
|
||||
{
|
||||
rses->current_master->close();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/** The original master is not available, we can't route the write */
|
||||
if (rses->rses_config.master_failure_mode == RW_ERROR_ON_WRITE)
|
||||
{
|
||||
succp = send_readonly_error(rses->client_dcb);
|
||||
|
||||
if (rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref))
|
||||
{
|
||||
close_failed_bref(rses->rses_master_ref, true);
|
||||
RW_CHK_DCB(rses->rses_master_ref, rses->rses_master_ref->bref_dcb);
|
||||
dcb_close(rses->rses_master_ref->bref_dcb);
|
||||
RW_CLOSE_BREF(rses->rses_master_ref);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
log_master_routing_failure(rses, succp, master_dcb, curr_master_dcb);
|
||||
succp = false;
|
||||
}
|
||||
log_master_routing_failure(rses, succp, rses->current_master, target);
|
||||
succp = false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1276,34 +1157,26 @@ static inline bool query_creates_reply(mysql_server_cmd_t cmd)
|
||||
*/
|
||||
bool
|
||||
handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
GWBUF *querybuf, DCB *target_dcb, bool store)
|
||||
GWBUF *querybuf, SRWBackend& target, bool store)
|
||||
{
|
||||
backend_ref_t *bref;
|
||||
|
||||
bref = get_bref_from_dcb(rses, target_dcb);
|
||||
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
|
||||
|
||||
/**
|
||||
* If the transaction is READ ONLY set forced_node to bref
|
||||
* That SLAVE backend will be used until COMMIT is seen
|
||||
*/
|
||||
if (!rses->forced_node &&
|
||||
if (!rses->target_node &&
|
||||
session_trx_is_read_only(rses->client_dcb->session))
|
||||
{
|
||||
rses->forced_node = bref;
|
||||
rses->target_node = get_backend_from_bref(rses, bref);
|
||||
MXS_DEBUG("Setting forced_node SLAVE to %s within an opened READ ONLY transaction\n",
|
||||
target_dcb->server->unique_name);
|
||||
rses->target_node = target;
|
||||
MXS_DEBUG("Setting forced_node SLAVE to %s within an opened READ ONLY transaction",
|
||||
target->server()->unique_name);
|
||||
}
|
||||
|
||||
ss_dassert(target_dcb != NULL);
|
||||
|
||||
MXS_INFO("Route query to %s \t[%s]:%d <",
|
||||
(SERVER_IS_MASTER(bref->ref->server) ? "master" : "slave"),
|
||||
bref->ref->server->name, bref->ref->server->port);
|
||||
(SERVER_IS_MASTER(target->server()) ? "master" : "slave"),
|
||||
target->server()->name, target->server()->port);
|
||||
|
||||
/** The session command cursor must not be active */
|
||||
ss_dassert(!sescmd_cursor_is_active(&bref->bref_sescmd_cur));
|
||||
ss_dassert(target->session_command_count() == 0);
|
||||
|
||||
/** We only want the complete response to the preparation */
|
||||
if (MYSQL_GET_COMMAND(GWBUF_DATA(querybuf)) == MYSQL_COM_STMT_PREPARE)
|
||||
@ -1311,28 +1184,31 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
gwbuf_set_type(querybuf, GWBUF_TYPE_COLLECT_RESULT);
|
||||
}
|
||||
|
||||
if (target_dcb->func.write(target_dcb, gwbuf_clone(querybuf)) == 1)
|
||||
mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE;
|
||||
mysql_server_cmd_t cmd = mxs_mysql_current_command(rses->client_dcb->session);
|
||||
|
||||
if (rses->load_data_state != LOAD_DATA_ACTIVE &&
|
||||
query_creates_reply(cmd))
|
||||
{
|
||||
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target_dcb->server))
|
||||
response = mxs::Backend::EXPECT_RESPONSE;
|
||||
}
|
||||
|
||||
if (target->write(gwbuf_clone(querybuf), response))
|
||||
{
|
||||
if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server()))
|
||||
{
|
||||
MXS_ERROR("Failed to store current statement, it won't be retried if it fails.");
|
||||
}
|
||||
|
||||
atomic_add_uint64(&inst->stats.n_queries, 1);
|
||||
|
||||
mysql_server_cmd_t cmd = mxs_mysql_current_command(rses->client_dcb->session);
|
||||
|
||||
if (rses->load_data_state != LOAD_DATA_ACTIVE && query_creates_reply(cmd))
|
||||
if (response == mxs::Backend::EXPECT_RESPONSE)
|
||||
{
|
||||
/** The server will reply to this command */
|
||||
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
|
||||
ss_dassert(target->get_reply_state() == REPLY_STATE_DONE);
|
||||
|
||||
bref = get_bref_from_dcb(rses, target_dcb);
|
||||
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||
|
||||
LOG_RS(bref, REPLY_STATE_START);
|
||||
bref->reply_state = REPLY_STATE_START;
|
||||
LOG_RS(target, REPLY_STATE_START);
|
||||
target->set_reply_state(REPLY_STATE_START);
|
||||
rses->expected_responses++;
|
||||
|
||||
if (rses->load_data_state == LOAD_DATA_START)
|
||||
@ -1353,12 +1229,11 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
/**
|
||||
* If a READ ONLY transaction is ending set forced_node to NULL
|
||||
*/
|
||||
if (rses->forced_node &&
|
||||
if (rses->target_node &&
|
||||
session_trx_is_read_only(rses->client_dcb->session) &&
|
||||
session_trx_is_ending(rses->client_dcb->session))
|
||||
{
|
||||
MXS_DEBUG("An opened READ ONLY transaction ends: forced_node is set to NULL");
|
||||
rses->forced_node = NULL;
|
||||
rses->target_node.reset();
|
||||
}
|
||||
return true;
|
||||
@ -1431,41 +1306,40 @@ int rses_property_add(ROUTER_CLIENT_SES *rses, rses_property_t *prop)
|
||||
* @return pointer to backend reference of the root master or NULL
|
||||
*
|
||||
*/
|
||||
static backend_ref_t *get_root_master_bref(ROUTER_CLIENT_SES *rses)
|
||||
static SRWBackend get_root_master_bref(ROUTER_CLIENT_SES *rses)
|
||||
{
|
||||
backend_ref_t *bref;
|
||||
backend_ref_t *candidate_bref = NULL;
|
||||
SRWBackend candidate;
|
||||
SERVER master = {};
|
||||
|
||||
for (int i = 0; i < rses->rses_nbackends; i++)
|
||||
for (SRWBackendList::iterator it = rses->backends.begin();
|
||||
it != rses->backends.end(); it++)
|
||||
{
|
||||
bref = &rses->rses_backend_ref[i];
|
||||
if (bref && BREF_IS_IN_USE(bref))
|
||||
SRWBackend& bref = *it;
|
||||
if (bref->in_use())
|
||||
{
|
||||
ss_dassert(!BREF_IS_CLOSED(bref) && !BREF_HAS_FAILED(bref));
|
||||
if (bref == rses->rses_master_ref)
|
||||
if (bref == rses->current_master)
|
||||
{
|
||||
/** Store master state for better error reporting */
|
||||
master.status = bref->ref->server->status;
|
||||
master.status = bref->server()->status;
|
||||
}
|
||||
|
||||
if (SERVER_IS_MASTER(bref->ref->server))
|
||||
if (SERVER_IS_MASTER(bref->server()))
|
||||
{
|
||||
if (candidate_bref == NULL ||
|
||||
(bref->ref->server->depth < candidate_bref->ref->server->depth))
|
||||
if (!candidate ||
|
||||
(bref->server()->depth < candidate->server()->depth))
|
||||
{
|
||||
candidate_bref = bref;
|
||||
candidate = bref;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (candidate_bref == NULL && rses->rses_config.master_failure_mode == RW_FAIL_INSTANTLY &&
|
||||
rses->rses_master_ref && BREF_IS_IN_USE(rses->rses_master_ref))
|
||||
if (!candidate && rses->rses_config.master_failure_mode == RW_FAIL_INSTANTLY &&
|
||||
rses->current_master && rses->current_master->in_use())
|
||||
{
|
||||
MXS_ERROR("Could not find master among the backend servers. "
|
||||
"Previous master's state : %s", STRSRVSTATUS(&master));
|
||||
}
|
||||
|
||||
return candidate_bref;
|
||||
return candidate;
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ static bool valid_for_slave(const SERVER *server, const SERVER *master_host)
|
||||
* @return The best slave backend reference or NULL if no candidates could be found
|
||||
*/
|
||||
SRWBackend* get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
|
||||
int (*cmpfun)(const void *, const void *))
|
||||
int (*cmpfun)(const void *, const void *))
|
||||
{
|
||||
SRWBackend candidate;
|
||||
|
||||
@ -129,7 +129,6 @@ SRWBackend* get_slave_candidate(ROUTER_CLIENT_SES* rses, const SERVER *master,
|
||||
*/
|
||||
bool select_connect_backend_servers(int router_nservers,
|
||||
int max_nslaves,
|
||||
int max_slave_rlag,
|
||||
select_criteria_t select_criteria,
|
||||
MXS_SESSION *session,
|
||||
ROUTER_INSTANCE *router,
|
||||
@ -382,32 +381,32 @@ static void log_server_connections(select_criteria_t select_criteria,
|
||||
|
||||
switch (select_criteria)
|
||||
{
|
||||
case LEAST_GLOBAL_CONNECTIONS:
|
||||
MXS_INFO("MaxScale connections : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
case LEAST_GLOBAL_CONNECTIONS:
|
||||
MXS_INFO("MaxScale connections : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_ROUTER_CONNECTIONS:
|
||||
MXS_INFO("RWSplit connections : %d in \t[%s]:%d %s",
|
||||
b->connections, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
case LEAST_ROUTER_CONNECTIONS:
|
||||
MXS_INFO("RWSplit connections : %d in \t[%s]:%d %s",
|
||||
b->connections, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_CURRENT_OPERATIONS:
|
||||
MXS_INFO("current operations : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current_ops,
|
||||
b->server->name, b->server->port,
|
||||
STRSRVSTATUS(b->server));
|
||||
break;
|
||||
case LEAST_CURRENT_OPERATIONS:
|
||||
MXS_INFO("current operations : %d in \t[%s]:%d %s",
|
||||
b->server->stats.n_current_ops,
|
||||
b->server->name, b->server->port,
|
||||
STRSRVSTATUS(b->server));
|
||||
break;
|
||||
|
||||
case LEAST_BEHIND_MASTER:
|
||||
MXS_INFO("replication lag : %d in \t[%s]:%d %s",
|
||||
b->server->rlag, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
default:
|
||||
ss_dassert(!true);
|
||||
break;
|
||||
case LEAST_BEHIND_MASTER:
|
||||
MXS_INFO("replication lag : %d in \t[%s]:%d %s",
|
||||
b->server->rlag, b->server->name,
|
||||
b->server->port, STRSRVSTATUS(b->server));
|
||||
default:
|
||||
ss_dassert(!true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user