Added mechanism for choosing slave for a query based on the current load in all connected slaves. Counting operations is not correctly done here. Reading values and choosing accordingly is done.

Fixed several things in session command reply processing.
This commit is contained in:
VilhoRaatikka
2014-06-29 22:21:30 +03:00
parent 20637ee224
commit fcf67716fd
10 changed files with 593 additions and 297 deletions

View File

@ -115,12 +115,18 @@ int bref_cmp_behind_master(
const void* bref1,
const void* bref2);
int bref_cmp_current_load(
const void* bref1,
const void* bref2);
int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)=
{
NULL,
bref_cmp_global_conn,
bref_cmp_router_conn,
bref_cmp_behind_master
bref_cmp_behind_master,
bref_cmp_current_load
};
static bool select_connect_backend_servers(
@ -460,7 +466,6 @@ static void* newSession(
int max_nslaves; /*< max # of slaves used in this session */
int i;
const int min_nservers = 1; /*< hard-coded for now */
static uint64_t router_client_ses_seq; /*< ID for client session */
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
@ -1105,11 +1110,11 @@ static int routeQuery(
{
atomic_add(&inst->stats.n_slave, 1);
/**
* This backend_ref waits resultset, flag it.
* Add one waiter to backend reference
*/
bref_set_state(get_bref_from_dcb(router_cli_ses,
slave_dcb),
BREF_WAITING_RESULT);
BREF_WAITING_RESULT);
}
else
{
@ -1154,15 +1159,14 @@ static int routeQuery(
{
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
}
if (succp)
{
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
{
atomic_add(&inst->stats.n_master, 1);
/**
* This backend_ref waits reply to write stmt,
* flag it.
* Add one waiter to backend reference.
*/
bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb),
BREF_WAITING_RESULT);
@ -1368,8 +1372,7 @@ static void clientReply (
scur = &bref->bref_sescmd_cur;
/**
* Active cursor means that reply is from session command
* execution. Majority of the time there are no session commands
* being executed.
* execution.
*/
if (sescmd_cursor_is_active(scur))
{
@ -1380,8 +1383,7 @@ static void clientReply (
(uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf));
size_t len = MYSQL_GET_PACKET_LEN(buf);
char* cmdstr = (char *)malloc(len+1);
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
snprintf(cmdstr, len+1, "%s", &buf[5]);
LOGIF(LE, (skygw_log_write_flush(
@ -1393,13 +1395,13 @@ static void clientReply (
free(cmdstr);
/** Inform the client */
handle_error_reply_client(ses,writebuf);
handle_error_reply_client(ses, writebuf);
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto lock_failed;
}
else
else if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf))
{
/**
* Discard all those responses that have already been sent to
@ -1407,8 +1409,12 @@ static void clientReply (
* needs to be sent to client or NULL.
*/
writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf,
bref);
writebuf,
bref);
}
else
{
ss_dassert(false);
}
}
@ -1416,7 +1422,9 @@ static void clientReply (
{
/** Write reply to client DCB */
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
#if 0
bref_clear_state(bref, BREF_WAITING_RESULT);
#endif
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
@ -1427,7 +1435,7 @@ static void clientReply (
/** Log to debug that router was closed */
goto lock_failed;
}
/** There is one pending session command to be xexecuted. */
/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
{
bool succp;
@ -1479,21 +1487,78 @@ int bref_cmp_behind_master(
return 1;
}
int bref_cmp_current_load(
const void* bref1,
const void* bref2)
{
SERVER* s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server;
SERVER* s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server;
return ((s1->stats.n_current_ops < s2->stats.n_current_ops) ? -1 :
((s1->stats.n_current_ops > s2->stats.n_current_ops) ? 1 : 0));
}
static void bref_clear_state(
backend_ref_t* bref,
bref_state_t state)
{
bref->bref_state &= ~state;
if (state != BREF_WAITING_RESULT)
{
bref->bref_state &= ~state;
}
else
{
int prev1;
int prev2;
prev1 = atomic_add(&bref->bref_num_result_wait, -1);
ss_dassert(prev1 >= 0);
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, -1);
ss_dassert(prev2 >= 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d after in %s:%d",
prev1-1,
prev2-1,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
}
}
static void bref_set_state(
backend_ref_t* bref,
bref_state_t state)
{
bref->bref_state |= state;
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Set state %d for %s:%d fd %d",
if (state != BREF_WAITING_RESULT)
{
bref->bref_state |= state;
}
else
{
int prev1;
int prev2;
prev1 = atomic_add(&bref->bref_num_result_wait, 1);
ss_dassert(prev1 >= 0);
prev2 = atomic_add(
&bref->bref_backend->backend_server->stats.n_current_ops, 1);
ss_dassert(prev2 >= 0);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Current waiters %d and ops %d before in %s:%d",
prev1,
prev2,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port)));
}
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [bref_set_state] Set state %d for %s:%d fd %d",
pthread_self(),
bref->bref_state,
bref->bref_backend->backend_server->name,
bref->bref_backend->backend_server->port,
@ -1662,6 +1727,13 @@ static bool select_connect_backend_servers(
b->backend_conn_count)));
break;
case LEAST_CURRENT_OPERATIONS:
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"%s %d:%d",
b->backend_server->name,
b->backend_server->port,
b->backend_server->stats.n_current_ops)));
break;
default:
break;
}
@ -1681,13 +1753,11 @@ static bool select_connect_backend_servers(
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Examine server "
"%s:%d %s with %d connections. "
"router->bitvalue is %d",
"%s:%d %s with %d active operations.",
b->backend_server->name,
b->backend_server->port,
STRSRVSTATUS(b->backend_server),
b->backend_conn_count,
router->bitmask)));
b->backend_server->stats.n_current_ops)));
if (SERVER_IS_RUNNING(b->backend_server) &&
((b->backend_server->status & router->bitmask) ==
@ -2195,35 +2265,27 @@ static GWBUF* sescmd_cursor_process_replies(
/** Faster backend has already responded to client : discard */
if (scmd->my_sescmd_is_replied)
{
bool last_packet = false;
CHK_GWBUF(replybuf);
packet = (uint8_t *)GWBUF_DATA(replybuf);
/**
* If it is response to MYSQL_COM_STMT_PREPARE, then buffer
* only includes the response.
*/
if (scmd->my_sescmd_prop->rses_prop_data.sescmd.my_sescmd_packet_type ==
MYSQL_COM_STMT_PREPARE)
while (!last_packet)
{
while (replybuf != NULL)
{
#if defined(SS_DEBUG)
int buflen;
buflen = GWBUF_LENGTH(replybuf);
replybuf = gwbuf_consume(replybuf, buflen);
#else
replybuf = gwbuf_consume(
replybuf,
GWBUF_LENGTH(replybuf));
int buflen;
buflen = GWBUF_LENGTH(replybuf);
ss_debug(GWBUF_IS_TYPE_SESCMD_RESPONSE(replybuf) ==
(scmd->my_sescmd_packet_type == MYSQL_COM_STMT_PREPARE));
last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf);
/** discard packet */
replybuf = gwbuf_consume(replybuf, buflen);
}
#if 0
bref_clear_state(bref, BREF_WAITING_RESULT);
#endif
}
}
/** Only consume the leading packet */
else
{
packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
}
}
/** Response is in the buffer and it will be sent to client. */
else if (replybuf != NULL)
@ -2408,7 +2470,7 @@ static bool execute_sescmd_in_backend(
goto return_succp;
}
if (!sescmd_cursor_is_active(scur))
{
/** Cursor is left active when function returns. */
@ -2425,9 +2487,11 @@ static bool execute_sescmd_in_backend(
uint8_t* ptr = GWBUF_DATA(tmpbuf);
unsigned char cmd = MYSQL_GET_COMMAND(ptr);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Just before write, fd %d : cmd %s.",
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [execute_sescmd_in_backend] Just before write, fd "
"%d : cmd %s.",
pthread_self(),
dcb->fd,
STRPACKETTYPE(cmd))));
}
@ -2444,6 +2508,11 @@ static bool execute_sescmd_in_backend(
case MYSQL_COM_QUERY:
case MYSQL_COM_INIT_DB:
default:
/**
* Mark session command buffer, it triggers writing
* MySQL command to protocol
*/
gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD);
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
@ -2723,14 +2792,13 @@ static bool route_session_write(
rses_property_done(prop);
succp = false;
goto return_succp;
}
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
}
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
/** Add sescmd property to router client session */
@ -2745,12 +2813,11 @@ static bool route_session_write(
scur = backend_ref_get_sescmd_cursor(&backend_ref[i]);
/**
* This backend_ref waits reply, flag it.
* Add one waiter to backend reference.
*/
bref_set_state(get_bref_from_dcb(router_cli_ses,
backend_ref[i].bref_dcb),
BREF_WAITING_RESULT);
BREF_WAITING_RESULT);
/**
* Start execution if cursor is not already executing.
* Otherwise, cursor will execute pending commands
@ -2820,6 +2887,7 @@ static void rwsplit_process_options(
c == LEAST_GLOBAL_CONNECTIONS ||
c == LEAST_ROUTER_CONNECTIONS ||
c == LEAST_BEHIND_MASTER ||
c == LEAST_CURRENT_OPERATIONS ||
c == UNDEFINED_CRITERIA);
if (c == UNDEFINED_CRITERIA)
@ -2829,7 +2897,7 @@ static void rwsplit_process_options(
"slave selection criteria \"%s\". "
"Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", "
"LEAST_ROUTER_CONNECTIONS, "
"and \"LEAST_ROUTER_CONNECTIONS\".",
"and \"LEAST_CURRENT_OPERATIONS\".",
STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria))));
}
else
@ -2957,7 +3025,6 @@ static bool handle_error_new_connection(
/** failed DCB has already been replaced */
if (bref == NULL)
{
rses_end_locked_router_action(rses);
succp = true;
goto return_succp;
}
@ -2968,7 +3035,6 @@ static bool handle_error_new_connection(
*/
if (backend_dcb->state != DCB_STATE_POLLING)
{
rses_end_locked_router_action(rses);
succp = true;
goto return_succp;
}
@ -2980,7 +3046,9 @@ static bool handle_error_new_connection(
DCB* client_dcb;
client_dcb = ses->client;
client_dcb->func.write(client_dcb, errmsg);
bref_clear_state(bref, BREF_WAITING_RESULT);
#if 0
bref_clear_state(bref, BREF_WAITING_RESULT);
#endif
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);