Fixed many error handling issues regading to timing and multiple threads.

Added flags to those backend references which have sent something to backend which causes the backend to send results or reply back. Didn't add removal of the flag since there's currently no way to tell whether response from backend contains anything else than session command reply - which aren't counted when BREF_WAITING_RESULT is set and cleared.
This commit is contained in:
VilhoRaatikka 2014-06-12 23:22:51 +03:00
parent e95b6cc0d9
commit 15ff1fd26a
8 changed files with 439 additions and 149 deletions

View File

@ -308,7 +308,7 @@ bool gwbuf_set_type(
case GWBUF_TYPE_MYSQL:
case GWBUF_TYPE_PLAINSQL:
case GWBUF_TYPE_UNDEFINED:
buf->gwbuf_type = type;
buf->gwbuf_type |= type;
succp = true;
break;
default:

View File

@ -642,3 +642,30 @@ int i;
return 1;
}
bool session_route_query (
SESSION* ses,
GWBUF* buf)
{
bool succp;
if (ses->head.routeQuery == NULL ||
ses->head.instance == NULL ||
ses->head.session == NULL)
{
succp = false;
goto return_succp;
}
if (ses->head.routeQuery(ses->head.instance, ses->head.session, buf) == 1)
{
succp = true;
}
else
{
succp = false;
}
return_succp:
return succp;
}

View File

@ -46,11 +46,14 @@
typedef enum
{
GWBUF_TYPE_UNDEFINED = 0x0,
GWBUF_TYPE_PLAINSQL = 0x1,
GWBUF_TYPE_MYSQL = 0x2
GWBUF_TYPE_UNDEFINED = 0x00,
GWBUF_TYPE_PLAINSQL = 0x01,
GWBUF_TYPE_MYSQL = 0x02
} gwbuf_type_t;
#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL)
#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL)
/**
* A structure to encapsulate the data in a form that the data itself can be
* shared between multiple GWBUF's without the need to make multiple copies

View File

@ -32,11 +32,17 @@
#include <dcb.h>
typedef enum bref_state {
BREF_NOT_USED,
BREF_IN_USE,
BREF_CLOSED
BREF_NOT_USED = 0x00,
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */
BREF_CLOSED = 0x08
} bref_state_t;
#define BREF_IS_NOT_USED(s) (s->bref_state & BREF_NOT_USED)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,

View File

@ -339,7 +339,7 @@ static int conn_err_count;
return;
/** Store prevous status */
database->mon_prev_status = database->server->status;
database->mon_prev_status = database->server->status;
if (database->con == NULL || mysql_ping(database->con) != 0)
{

View File

@ -197,6 +197,14 @@ static int gw_read_backend_event(DCB *dcb) {
if (gw_read_backend_handshake(backend_protocol) != 0) {
backend_protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_read_backend_handshake, fd %d, "
"state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else {
/* handshake decoded, send the auth credentials */
if (gw_send_authentication_to_backend(
@ -206,6 +214,13 @@ static int gw_read_backend_event(DCB *dcb) {
backend_protocol) != 0)
{
backend_protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_send_authentication_to_backend "
"fd %d, state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else {
backend_protocol->state = MYSQL_AUTH_RECV;
}
@ -253,13 +268,21 @@ static int gw_read_backend_event(DCB *dcb) {
switch (receive_rc) {
case -1:
backend_protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_receive_backend_authentication "
"fd %d, state = MYSQL_AUTH_FAILED.",
backend_protocol->owner_dcb->fd,
pthread_self())));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Backend server didn't "
"accept authentication for user "
"%s.",
current_session->user)));
current_session->user)));
break;
case 1:
backend_protocol->state = MYSQL_IDLE;
@ -318,7 +341,11 @@ static int gw_read_backend_event(DCB *dcb) {
}
spinlock_release(&dcb->delayqlock);
/** Whole session is being closed so return. */
if (session->state == SESSION_STATE_STOPPING)
{
goto return_rc;
}
/* try reload users' table for next connection */
service_refresh_users(dcb->session->client->service);
@ -341,11 +368,6 @@ static int gw_read_backend_event(DCB *dcb) {
}
usleep(1);
}
if (session->state == SESSION_STATE_STOPPING)
{
goto return_rc;
}
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
@ -427,7 +449,7 @@ static int gw_read_backend_event(DCB *dcb) {
*/
router->handleError(router_instance,
rsession,
"Read from backend failed.",
"Read from backend failed",
dcb,
ERRACT_NEW_CONNECTION,
&succp);
@ -583,7 +605,8 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
*/
spinlock_acquire(&dcb->dcb_initlock);
if (dcb->state != DCB_STATE_POLLING) {
if (dcb->state != DCB_STATE_POLLING)
{
/*< vraa : errorHandle */
/*< Free buffer memory */
gwbuf_consume(queue, GWBUF_LENGTH(queue));
@ -629,11 +652,11 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
queue,
GWBUF_LENGTH(queue))) != NULL);
free(str);
}
rc = 0;
spinlock_release(&dcb->authlock);
goto return_rc;
break;
}
case MYSQL_IDLE:
LOGIF(LD, (skygw_log_write(
@ -677,7 +700,8 @@ return_rc:
* Backend Error Handling for EPOLLER
*
*/
static int gw_error_backend_event(DCB *dcb) {
static int gw_error_backend_event(DCB *dcb)
{
SESSION *session;
void *rsession;
ROUTER_OBJECT *router;
@ -692,7 +716,17 @@ static int gw_error_backend_event(DCB *dcb) {
router_instance = session->service->router_instance;
#if defined(ERRHANDLE2)
router->handleError();
router->handleError(router_instance,
rsession,
"Connection to backend server failed",
dcb,
ERRACT_NEW_CONNECTION,
&succp);
if (!succp)
{
dcb_close(dcb);
}
#else
if (dcb->state != DCB_STATE_POLLING) {
/*< vraa : errorHandle */
@ -866,7 +900,33 @@ static int
gw_backend_close(DCB *dcb)
{
#if defined(ERRHANDLE)
mysql_send_com_quit(dcb, 1);
DCB* client_dcb;
SESSION* session;
GWBUF* quitbuf;
bool succp;
CHK_DCB(dcb);
session = dcb->session;
CHK_SESSION(session);
quitbuf = mysql_create_com_quit(NULL, 0);
/** Send COM_QUIT to the backend being closed */
mysql_send_com_quit(dcb, 0, quitbuf);
if (session != NULL &&
(session->state == SESSION_STATE_ROUTER_READY ||
session->state == SESSION_STATE_READY))
{
client_dcb = session->client;
if (client_dcb != NULL &&
client_dcb->state == DCB_STATE_POLLING)
{
/** Close client DCB */
dcb_close(client_dcb);
}
}
#else
dcb_close(dcb);
#endif

View File

@ -126,7 +126,9 @@ void gw_mysql_close(MySQLProtocol **ptr) {
* @param conn MySQL protocol structure
* @return 0 on success, 1 on failure
*/
int gw_read_backend_handshake(MySQLProtocol *conn) {
int gw_read_backend_handshake(
MySQLProtocol *conn)
{
GWBUF *head = NULL;
DCB *dcb = conn->owner_dcb;
int n = -1;
@ -135,12 +137,14 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
int success = 0;
int packet_len = 0;
if ((n = dcb_read(dcb, &head)) != -1) {
if (head) {
if ((n = dcb_read(dcb, &head)) != -1)
{
if (head)
{
payload = GWBUF_DATA(head);
h_len = gwbuf_length(head);
/*
/**
* The mysql packets content starts at byte fifth
* just return with less bytes
*/
@ -148,10 +152,45 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
if (h_len <= 4) {
/* log error this exit point */
conn->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
"dcb_read, fd %d, "
"state = MYSQL_AUTH_FAILED.",
dcb->fd,
pthread_self())));
return 1;
}
//get mysql packet size, 3 bytes
if (payload[4] == 0xff)
{
size_t len = MYSQL_GET_PACKET_LEN(payload);
uint16_t errcode = MYSQL_GET_ERRCODE(payload);
char* bufstr = strndup(&((char *)payload)[7], len-3);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_receive_backend_auth] Invalid "
"authentication message from backend dcb %p "
"fd %d, ptr[4] = %p, error code %d, msg %s.",
pthread_self(),
dcb,
dcb->fd,
payload[4],
errcode,
bufstr)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Invalid authentication message "
"from backend. Error code: %d, Msg : %s",
errcode,
bufstr)));
free(bufstr);
}
//get mysql packet size, 3 bytes
packet_len = gw_mysql_get_byte3(payload);
if (h_len < (packet_len + 4)) {
@ -160,6 +199,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
* packet. Log error this exit point
*/
conn->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
"gw_mysql_get_byte3, fd %d, "
"state = MYSQL_AUTH_FAILED.",
pthread_self(),
dcb->fd,
pthread_self())));
return 1;
}
@ -176,6 +224,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
* log error this exit point
*/
conn->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
"gw_decode_mysql_server_handshake, fd %d, "
"state = MYSQL_AUTH_FAILED.",
pthread_self(),
conn->owner_dcb->fd,
pthread_self())));
return 1;
}
@ -202,7 +259,10 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
* @return 0 on success, < 0 on failure
*
*/
int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) {
int gw_decode_mysql_server_handshake(
MySQLProtocol *conn,
uint8_t *payload)
{
uint8_t *server_version_end = NULL;
uint16_t mysql_server_capabilities_one = 0;
uint16_t mysql_server_capabilities_two = 0;
@ -216,8 +276,8 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) {
protocol_version = payload[0];
if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) {
/* log error for this */
if (protocol_version != GW_MYSQL_PROTOCOL_VERSION)
{
return -1;
}
@ -257,19 +317,23 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) {
payload+=2;
// get scramble len
if (payload[0] > 0) {
if (payload[0] > 0)
{
scramble_len = payload[0] -1;
ss_dassert(scramble_len > GW_SCRAMBLE_LENGTH_323);
ss_dassert(scramble_len <= GW_MYSQL_SCRAMBLE_SIZE);
if ( (scramble_len < GW_SCRAMBLE_LENGTH_323) || scramble_len > GW_MYSQL_SCRAMBLE_SIZE) {
if ((scramble_len < GW_SCRAMBLE_LENGTH_323) ||
scramble_len > GW_MYSQL_SCRAMBLE_SIZE)
{
/* log this */
return -2;
return -2;
}
} else {
}
else
{
scramble_len = GW_MYSQL_SCRAMBLE_SIZE;
}
// skip 10 zero bytes
payload += 11;
@ -322,8 +386,8 @@ int gw_receive_backend_auth(
else if (ptr[4] == 0xff)
{
size_t len = MYSQL_GET_PACKET_LEN(ptr);
char* err = strndup(&ptr[8], 5);
char* bufstr = strndup(&ptr[13], len-4-5);
char* err = strndup(&((char *)ptr)[8], 5);
char* bufstr = strndup(&((char *)ptr)[13], len-4-5);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -737,31 +801,28 @@ gw_mysql_protocol_state2string (int state) {
}
}
int mysql_send_com_quit(
DCB* dcb,
int packet_number)
GWBUF* mysql_create_com_quit(
GWBUF* bufparam,
int packet_number)
{
uint8_t *data;
GWBUF *buf;
int nbytes = 0;
CHK_DCB(dcb);
ss_dassert(packet_number <= 255);
uint8_t* data;
GWBUF* buf;
if (dcb == NULL ||
(dcb->state != DCB_STATE_NOPOLLING &&
dcb->state != DCB_STATE_ZOMBIE))
if (bufparam == NULL)
{
return 0;
}
buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE);
ss_dassert(buf != NULL);
buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE);
}
else
{
buf = bufparam;
}
if (buf == NULL)
{
return 0;
}
ss_dassert(GWBUF_LENGTH(buf) == COM_QUIT_PACKET_SIZE);
data = GWBUF_DATA(buf);
*data++ = 0x1;
@ -770,6 +831,37 @@ int mysql_send_com_quit(
*data++ = packet_number;
*data = 0x1;
return buf;
}
int mysql_send_com_quit(
DCB* dcb,
int packet_number,
GWBUF* bufparam)
{
GWBUF *buf;
int nbytes = 0;
CHK_DCB(dcb);
ss_dassert(packet_number <= 255);
if (dcb == NULL || dcb->state == DCB_STATE_ZOMBIE)
{
return 0;
}
if (bufparam == NULL)
{
buf = mysql_create_com_quit(NULL, packet_number);
}
else
{
buf = bufparam;
}
if (buf == NULL)
{
return 0;
}
nbytes = dcb->func.write(dcb, buf);
return nbytes;

View File

@ -93,6 +93,7 @@ static void handleError(
static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb);
static int router_get_servercount(ROUTER_INSTANCE* router);
static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers);
static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -200,7 +201,8 @@ static bool sescmd_cursor_next(
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur);
sescmd_cursor_t* scur,
bool* has_query);
static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses,
@ -219,6 +221,10 @@ static void refreshInstance(
ROUTER_INSTANCE* router,
CONFIG_PARAMETER* param);
static void bref_clear_state(backend_ref_t* bref, bref_state_t state);
static void bref_set_state(backend_ref_t* bref, bref_state_t state);
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
@ -453,6 +459,7 @@ static void* newSession(
* router instance first.
*/
spinlock_acquire(&router->lock);
if (router->service->svc_config_version > router->rwsplit_version)
{
CONFIG_PARAMETER* param = router->service->svc_config_param;
@ -556,7 +563,8 @@ static void* newSession(
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR;
backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR;
#endif
backend_ref[i].bref_state = BREF_NOT_USED;
backend_ref[i].bref_state = 0;
bref_set_state(&backend_ref[i], BREF_NOT_USED);
backend_ref[i].bref_backend = router->servers[i];
/** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses;
@ -633,7 +641,15 @@ static void closeSession(
{
ROUTER_CLIENT_SES* router_cli_ses;
backend_ref_t* backend_ref;
/**
* router session can be NULL if newSession failed and it is discarding
* its connections and DCB's.
*/
if (router_session == NULL)
{
return;
}
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses);
@ -667,15 +683,15 @@ static void closeSession(
DCB* dcb = backend_ref[i].bref_dcb;
/** Close those which had been connected */
if (backend_ref[i].bref_state == BREF_IN_USE)
if (BREF_IS_IN_USE((&backend_ref[i])))
{
CHK_DCB(dcb);
backend_ref[i].bref_state = BREF_NOT_USED;
bref_clear_state(&backend_ref[i], BREF_IN_USE);
bref_set_state(&backend_ref[i], BREF_CLOSED);
#if defined(ERRHANDLE)
/**
* closes protocol and dcb
*/
* closes protocol and dcb
*/
dcb_close(dcb);
#else
dcb->func.close(dcb);
@ -705,12 +721,10 @@ static void freeSession(
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (backend_ref[i].bref_state != BREF_IN_USE)
if (!BREF_IS_IN_USE((&backend_ref[i])))
{
continue;
}
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0);
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
}
spinlock_acquire(&router->lock);
@ -783,8 +797,7 @@ static bool get_dcb(
for (i=0; i<rses->rses_nbackends; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_state == BREF_IN_USE &&
if (BREF_IS_IN_USE((&backend_ref[i])) &&
SERVER_IS_SLAVE(b->backend_server) &&
(smallest_nconn == -1 ||
b->backend_conn_count < smallest_nconn))
@ -799,7 +812,7 @@ static bool get_dcb(
{
backend_ref = rses->rses_master_ref;
if (backend_ref[i].bref_state == BREF_IN_USE)
if (BREF_IS_IN_USE((&backend_ref[i])))
{
*p_dcb = backend_ref->bref_dcb;
succp = true;
@ -825,7 +838,7 @@ static bool get_dcb(
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_state == BREF_IN_USE &&
if (BREF_IS_IN_USE((&backend_ref[i])) &&
(SERVER_IS_MASTER(b->backend_server)))
{
*p_dcb = backend_ref[i].bref_dcb;
@ -853,7 +866,7 @@ return_succp:
* @param session The session associated with the client
* @param queue Gateway buffer queue with the packets received
*
* @return The number of queries forwarded
* @return if succeed 1, otherwise 0
*/
static int routeQuery(
ROUTER* instance,
@ -886,17 +899,24 @@ static int routeQuery(
if (rses_is_closed)
{
LOGIF(LE,
(skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Failed to route %s:%s:\"%s\" to "
"backend server. %s.",
STRPACKETTYPE(packet_type),
STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr),
(rses_is_closed ? "Router was closed" :
"Router has no backend servers where to "
"route to"))));
/**
* COM_QUIT may have sent by client and as a part of backend
* closing procedure.
*/
if (packet_type != COM_QUIT)
{
LOGIF(LE,
(skygw_log_write_flush(
LOGFILE_ERROR,
"Error: Failed to route %s:%s:\"%s\" to "
"backend server. %s.",
STRPACKETTYPE(packet_type),
STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr),
(rses_is_closed ? "Router was closed" :
"Router has no backend servers where to "
"route to"))));
}
goto return_ret;
}
inst->stats.n_queries++;
@ -991,6 +1011,10 @@ static int routeQuery(
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE))
{
/**
* It is not sure if the session command in question requires
* response. Statement must be examined in route_session_write.
*/
bool succp = route_session_write(
router_cli_ses,
querybuf,
@ -1031,6 +1055,12 @@ static int routeQuery(
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
{
atomic_add(&inst->stats.n_slave, 1);
/**
* This backend_ref waits resultset, flag it.
*/
bref_set_state(get_bref_from_dcb(router_cli_ses,
slave_dcb),
BREF_WAITING_RESULT);
}
else
{
@ -1048,7 +1078,7 @@ static int routeQuery(
else
{
bool succp = true;
if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
if (router_cli_ses->rses_transaction_active) /*< all to master */
@ -1081,6 +1111,12 @@ static int routeQuery(
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
{
atomic_add(&inst->stats.n_master, 1);
/**
* This backend_ref waits reply to write stmt,
* flag it.
*/
bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb),
BREF_WAITING_RESULT);
}
}
rses_end_locked_router_action(router_cli_ses);
@ -1298,10 +1334,17 @@ static void clientReply(
*/
if (sescmd_cursor_is_active(scur))
{
bool has_query;
writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf,
scur);
scur,
&has_query);
if (has_query)
{
bref_clear_state(backend_ref, BREF_WAITING_RESULT);
}
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
@ -1309,9 +1352,6 @@ static void clientReply(
{
/** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf);
/**
* Log reply but use identifier for query
*/
}
lock_failed:
@ -1349,6 +1389,20 @@ int bref_cmp_behind_master(
return 1;
}
static void bref_clear_state(
backend_ref_t* bref,
bref_state_t state)
{
bref->bref_state &= ~state;
}
static void bref_set_state(
backend_ref_t* bref,
bref_state_t state)
{
bref->bref_state |= state;
}
/**
* @node Search suitable backend servers from those of router instance.
*
@ -1409,7 +1463,7 @@ static bool select_connect_backend_servers(
/** Master is already chosen and connected. This is slave failure case */
if (*p_master_ref != NULL &&
(*p_master_ref)->bref_state == BREF_IN_USE)
BREF_IS_IN_USE((*p_master_ref)))
{
master_found = true;
master_connected = true;
@ -1440,6 +1494,7 @@ static bool select_connect_backend_servers(
#if defined(EXTRA_DEBUGGING)
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:")));
for (i=0; i<router_nservers; i++)
{
BACKEND* b = backend_ref[i].bref_backend;
@ -1496,7 +1551,7 @@ static bool select_connect_backend_servers(
}
}
/**
* Choose at least 1+1 (master and slave) and at most 1+max_nslaves
* Choose at least 1+min_nslaves (master and slave) and at most 1+max_nslaves
* servers from the sorted list. First master found is selected.
*/
for (i=0;
@ -1527,7 +1582,7 @@ static bool select_connect_backend_servers(
slaves_found += 1;
/** Slave is already connected */
if (backend_ref[i].bref_state == BREF_IN_USE)
if (BREF_IS_IN_USE((&backend_ref[i])))
{
slaves_connected += 1;
}
@ -1542,7 +1597,10 @@ static bool select_connect_backend_servers(
if (backend_ref[i].bref_dcb != NULL)
{
slaves_connected += 1;
backend_ref[i].bref_state = BREF_IN_USE;
bref_clear_state(&backend_ref[i],
BREF_NOT_USED);
bref_set_state(&backend_ref[i],
BREF_IN_USE);
/**
* Increase backend connection counter.
* Server's stats are _increased_ in
@ -1577,7 +1635,10 @@ static bool select_connect_backend_servers(
if (backend_ref[i].bref_dcb != NULL)
{
master_connected = true;
backend_ref[i].bref_state = BREF_IN_USE;
bref_clear_state(&backend_ref[i],
BREF_NOT_USED);
bref_set_state(&backend_ref[i],
BREF_IN_USE);
*p_master_ref = &backend_ref[i];
/** Increase backend connection counter */
/** Increase backend connection counter */
@ -1598,18 +1659,6 @@ static bool select_connect_backend_servers(
}
}
}
else
{
succp = false;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to establish "
"connection with server %s:%d, %s",
b->backend_server->name,
b->backend_server->port,
STRSRVSTATUS(b->backend_server))));
/* handle connect error */
}
} /*< for */
/**
@ -1670,7 +1719,7 @@ static bool select_connect_backend_servers(
{
BACKEND* b = backend_ref[i].bref_backend;
if (backend_ref[i].bref_state == BREF_IN_USE)
if (BREF_IS_IN_USE((&backend_ref[i])))
{
backend_type_t btype = BACKEND_TYPE(b);
@ -1690,7 +1739,9 @@ static bool select_connect_backend_servers(
* Failure cases
*/
else
{
{
succp = false;
if (!master_found)
{
LOGIF(LE, (skygw_log_write(
@ -1702,7 +1753,7 @@ static bool select_connect_backend_servers(
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Error : Couldn't find suitable %s from %d "
"Error : Couldn't find suitable %s from %d "
"candidates.",
(is_synced_master ? "Galera node" : "Master"),
router_nservers)));
@ -1726,7 +1777,7 @@ static bool select_connect_backend_servers(
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Error : Couldn't connect to any %s although "
"Error : Couldn't connect to any %s although "
"there exists at least one %s node in the "
"cluster.",
(is_synced_master ? "Galera node" : "Master"),
@ -1750,18 +1801,22 @@ static bool select_connect_backend_servers(
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"*Error : Couldn't establish required amount of "
"Error : Couldn't establish required amount of "
"slave connections for router session.")));
}
/** Clean up connections */
for (i=0; i<router_nservers; i++)
{
if (backend_ref[i].bref_state == BREF_IN_USE)
if (BREF_IS_IN_USE((&backend_ref[i])))
{
ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0);
/** disconnect opened connections */
backend_ref[i].bref_dcb->func.close(backend_ref[i].bref_dcb);
dcb_close(backend_ref[i].bref_dcb);
bref_clear_state(&backend_ref[i], BREF_IN_USE);
bref_set_state(&backend_ref[i], BREF_NOT_USED);
atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1);
}
}
@ -1955,7 +2010,8 @@ static void mysql_sescmd_done(
static GWBUF* sescmd_cursor_process_replies(
DCB* client_dcb,
GWBUF* replybuf,
sescmd_cursor_t* scur)
sescmd_cursor_t* scur,
bool* has_query)
{
const size_t headerlen = 4; /*< mysql packet header */
uint8_t* packet;
@ -1971,7 +2027,7 @@ static GWBUF* sescmd_cursor_process_replies(
/**
* Walk through packets in the message and the list of session
*commands.
* commands.
*/
while (scmd != NULL && replybuf != NULL)
{
@ -1985,32 +2041,11 @@ static GWBUF* sescmd_cursor_process_replies(
packet = (uint8_t *)GWBUF_DATA(replybuf);
packetlen = packet[0]+packet[1]*256+packet[2]*256*256;
replybuf = gwbuf_consume(replybuf, packetlen+headerlen);
/*
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_cursor_process_replies] cmd %p "
"is already replied. Discarded %d bytes from "
"the %s replybuffer.",
pthread_self(),
scmd,
packetlen+headerlen,
STRBETYPE(scur->scmd_cur_be_type))));
*/
}
else
{
/** Mark the rest session commands as replied */
scmd->my_sescmd_is_replied = true;
/*
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [sescmd_cursor_process_replies] Marked "
"cmd %p to as replied. Left message to %s's "
"buffer for reply.",
pthread_self(),
scmd,
STRBETYPE(scur->scmd_cur_be_type))));
*/
}
if (sescmd_cursor_next(scur))
@ -2023,7 +2058,9 @@ static GWBUF* sescmd_cursor_process_replies(
/** All session commands are replied */
scur->scmd_cur_active = false;
}
}
}
/** vraa:this is set but only because there's not yet way to find out */
*has_query = false;
ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL);
return replybuf;
@ -2106,7 +2143,7 @@ static bool execute_sescmd_in_backend(
int rc = 0;
sescmd_cursor_t* scur;
if (backend_ref->bref_state == BREF_CLOSED)
if (BREF_IS_CLOSED(backend_ref))
{
goto return_succp;
}
@ -2160,9 +2197,16 @@ static bool execute_sescmd_in_backend(
"%lu [execute_sescmd_in_backend] Routed %s cmd %p.",
pthread_self(),
STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type),
scur->scmd_cur_cmd)));
if (rc != 1)
scur->scmd_cur_cmd)));
if (rc == 1)
{
/**
* All but COM_QUIT cause backend to send reply. flag backend_ref.
*/
bref_set_state(backend_ref, BREF_WAITING_RESULT);
}
else
{
succp = false;
}
@ -2381,7 +2425,7 @@ static bool route_session_write(
{
DCB* dcb = backend_ref[i].bref_dcb;
if (backend_ref[i].bref_state == BREF_IN_USE)
if (BREF_IS_IN_USE((&backend_ref[i])))
{
rc = dcb->func.write(dcb, gwbuf_clone(querybuf));
@ -2412,11 +2456,11 @@ static bool route_session_write(
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
for (i=0; i<router_cli_ses->rses_nbackends; i++)
{
if (backend_ref[i].bref_state == BREF_IN_USE)
{
if (BREF_IS_IN_USE((&backend_ref[i])))
{
succp = execute_sescmd_in_backend(&backend_ref[i]);
if (!succp)
@ -2499,7 +2543,10 @@ static void rwsplit_process_options(
* @param message The error message to reply
* @param backend_dcb The backend DCB
* @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION
*
* @param succp Result of action.
*
* Even if succp == true connecting to new slave may have failed. succp is to
* tell whether router has enough master/slave connections to continue work.
*/
static void handleError (
ROUTER *instance,
@ -2511,7 +2558,7 @@ static void handleError (
{
DCB* client_dcb = NULL;
SESSION* session = backend_dcb->session;
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)instance;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
client_dcb = session->client;
@ -2523,9 +2570,12 @@ static void handleError (
int router_nservers;
int max_nslaves;
router_nservers = router_get_servercount(router);
router_nservers = router_get_servercount(inst);
max_nslaves = rses_get_max_slavecount(rses, router_nservers);
/**
* Try to get replacement slave or at least the minimum
* number of slave connections for router session.
*/
*succp = select_connect_backend_servers(
&rses->rses_master_ref,
rses->rses_backend_ref,
@ -2533,9 +2583,39 @@ static void handleError (
max_nslaves,
rses->rses_config.rw_slave_select_criteria,
session,
router);
inst);
ss_dassert(*succp);
/** Too few or no slaves at all */
if (!succp)
{
if (session->state == SESSION_STATE_ROUTER_READY)
{
ROUTER* rsession;
ROUTER_OBJECT* router;
router = session->service->router;
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
rsession = session->router_session;
/*<
* rsession should never be NULL here.
*/
ss_dassert(rsession != NULL);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_error_backend_event] "
"Call closeSession for backend "
"session.",
pthread_self())));
router->closeSession(instance, rsession);
}
}
}
break;
@ -2598,10 +2678,10 @@ static void print_error_packet(
}
static int router_get_servercount(
ROUTER_INSTANCE* router)
ROUTER_INSTANCE* inst)
{
int router_nservers = 0;
BACKEND** b = router->servers;
BACKEND** b = inst->servers;
/** count servers */
while (*(b++) != NULL) router_nservers++;
@ -2634,4 +2714,26 @@ static int rses_get_max_slavecount(
max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves));
return max_nslaves;
}
static backend_ref_t* get_bref_from_dcb(
ROUTER_CLIENT_SES* rses,
DCB* dcb)
{
backend_ref_t* bref;
CHK_DCB(dcb);
CHK_CLIENT_RSES(rses);
bref = rses->rses_backend_ref;
while (bref != NULL)
{
if (bref->bref_dcb == dcb)
{
break;
}
bref++;
}
return bref;
}