Added status bit BREF_QUERY_ACTIVE to indicate if a query, other than session command, sent to backend for execution. The flag is cleared when the first packet belonging to the response arrives.
The flag is part of the active operation counting, which is utilized in load balancing. The active operation count per backend is used by default as criteria when router chooses to which backend a query should be routed.
This commit is contained in:
parent
3e3c1af211
commit
6d672cb967
@ -52,13 +52,15 @@ typedef enum prep_stmt_state {
|
||||
|
||||
typedef enum bref_state {
|
||||
BREF_IN_USE = 0x01,
|
||||
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */
|
||||
BREF_CLOSED = 0x04
|
||||
BREF_WAITING_RESULT = 0x02, /*< for session commands only */
|
||||
BREF_QUERY_ACTIVE = 0x04, /*< for other queries */
|
||||
BREF_CLOSED = 0x08
|
||||
} bref_state_t;
|
||||
|
||||
#define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE)
|
||||
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
|
||||
#define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0)
|
||||
#define BREF_IS_QUERY_ACTIVE(s) (s->bref_state & BREF_QUERY_ACTIVE)
|
||||
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
|
||||
|
||||
typedef enum backend_type_t {
|
||||
|
@ -216,10 +216,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command(
|
||||
static bool sescmd_cursor_next(
|
||||
sescmd_cursor_t* scur);
|
||||
|
||||
static GWBUF* sescmd_cursor_process_replies(
|
||||
DCB* client_dcb,
|
||||
GWBUF* replybuf,
|
||||
backend_ref_t* bref);
|
||||
static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref);
|
||||
|
||||
static void tracelog_routed_query(
|
||||
ROUTER_CLIENT_SES* rses,
|
||||
@ -1108,13 +1105,15 @@ static int routeQuery(
|
||||
{
|
||||
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
|
||||
atomic_add(&inst->stats.n_slave, 1);
|
||||
/**
|
||||
* Add one waiter to backend reference
|
||||
* Add one query response waiter to backend reference
|
||||
*/
|
||||
bref_set_state(get_bref_from_dcb(router_cli_ses,
|
||||
slave_dcb),
|
||||
BREF_WAITING_RESULT);
|
||||
bref = get_bref_from_dcb(router_cli_ses, slave_dcb);
|
||||
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1164,12 +1163,16 @@ static int routeQuery(
|
||||
{
|
||||
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
|
||||
atomic_add(&inst->stats.n_master, 1);
|
||||
|
||||
/**
|
||||
* Add one waiter to backend reference.
|
||||
* Add one write response waiter to backend reference
|
||||
*/
|
||||
bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb),
|
||||
BREF_WAITING_RESULT);
|
||||
bref = get_bref_from_dcb(router_cli_ses, master_dcb);
|
||||
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
@ -1408,24 +1411,38 @@ static void clientReply (
|
||||
* the client. Return with buffer including response that
|
||||
* needs to be sent to client or NULL.
|
||||
*/
|
||||
writebuf = sescmd_cursor_process_replies(client_dcb,
|
||||
writebuf,
|
||||
bref);
|
||||
writebuf = sescmd_cursor_process_replies(writebuf, bref);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(false);
|
||||
}
|
||||
/**
|
||||
* If response will be sent to client, decrease waiter count.
|
||||
* This applies to session commands only. Counter decrement
|
||||
* for other type of queries is done outside this block.
|
||||
*/
|
||||
if (writebuf != NULL && client_dcb != NULL)
|
||||
{
|
||||
/** Set response status as replied */
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
|
||||
* This applies for queries other than session commands.
|
||||
*/
|
||||
else if (BREF_IS_QUERY_ACTIVE(bref))
|
||||
{
|
||||
bref_clear_state(bref, BREF_QUERY_ACTIVE);
|
||||
/** Set response status as replied */
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
|
||||
if (writebuf != NULL && client_dcb != NULL)
|
||||
{
|
||||
/** Write reply to client DCB */
|
||||
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
|
||||
#if 1
|
||||
/** Set response status as replied */
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
#endif
|
||||
}
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
@ -1449,6 +1466,8 @@ static void clientReply (
|
||||
bref->bref_backend->backend_server->port)));
|
||||
|
||||
succp = execute_sescmd_in_backend(bref);
|
||||
|
||||
ss_dassert(succp);
|
||||
}
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
@ -1515,12 +1534,12 @@ static void bref_clear_state(
|
||||
|
||||
/** Decrease waiter count */
|
||||
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
|
||||
ss_dassert(prev1 >= 0);
|
||||
ss_dassert(prev1 > 0);
|
||||
|
||||
/** Decrease global operation count */
|
||||
prev2 = atomic_add(
|
||||
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
|
||||
ss_dassert(prev2 >= 0);
|
||||
ss_dassert(prev2 > 0);
|
||||
|
||||
LOGIF(LT, (skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
@ -2246,13 +2265,9 @@ static void mysql_sescmd_done(
|
||||
* 9. s+q+
|
||||
*/
|
||||
static GWBUF* sescmd_cursor_process_replies(
|
||||
DCB* client_dcb,
|
||||
GWBUF* replybuf,
|
||||
backend_ref_t* bref)
|
||||
{
|
||||
const size_t headerlen = 4; /*< mysql packet header */
|
||||
size_t packetlen;
|
||||
uint8_t* packet;
|
||||
mysql_sescmd_t* scmd;
|
||||
sescmd_cursor_t* scur;
|
||||
|
||||
@ -2260,7 +2275,6 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock)));
|
||||
scmd = sescmd_cursor_get_command(scur);
|
||||
|
||||
CHK_DCB(client_dcb);
|
||||
CHK_GWBUF(replybuf);
|
||||
|
||||
/**
|
||||
@ -2275,25 +2289,18 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
bool last_packet = false;
|
||||
|
||||
CHK_GWBUF(replybuf);
|
||||
packet = (uint8_t *)GWBUF_DATA(replybuf);
|
||||
|
||||
while (!last_packet)
|
||||
{
|
||||
int buflen;
|
||||
|
||||
buflen = GWBUF_LENGTH(replybuf);
|
||||
|
||||
ss_debug(GWBUF_IS_TYPE_SESCMD_RESPONSE(replybuf) ==
|
||||
(scmd->my_sescmd_packet_type == MYSQL_COM_STMT_PREPARE));
|
||||
|
||||
last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf);
|
||||
/** discard packet */
|
||||
replybuf = gwbuf_consume(replybuf, buflen);
|
||||
}
|
||||
#if 1
|
||||
/** Set response status received */
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
#endif
|
||||
}
|
||||
/** Response is in the buffer and it will be sent to client. */
|
||||
else if (replybuf != NULL)
|
||||
@ -2941,7 +2948,6 @@ static void handleError (
|
||||
error_action_t action,
|
||||
bool* succp)
|
||||
{
|
||||
DCB* client_dcb;
|
||||
SESSION* session;
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
|
||||
|
Loading…
x
Reference in New Issue
Block a user