query_classifier.cc

resolve_query_type, added GSYSVAR_FUNC type for functions that read system variables and can be executed in Maxscale instead of backend server.

dcb.c
	dcb_read, if read returns value <= 0 and if error is EAGAIN/EWOULDBLOCK so there was nothing to read in sthe socket, that is not an error because some other thread may have read the data that was expected to be available.

mysql_backend.c
	gw_read_backend_event, used dcb->authlock to ensure that dcb's protocol state is read and modified serially. This removes issues with false authentication failures which may happen when two threads modify and read the state without any control.

mysql_client.c 
	removed dead code.

readconnroute.c, readwritesplit.c
	removed invalid assert which assumed that spinlock can not have larger value than one when itis locked.
This commit is contained in:
vraatikka
2013-10-31 22:24:51 +02:00
parent e803acb036
commit 3769a02957
6 changed files with 174 additions and 139 deletions

View File

@ -450,9 +450,9 @@ static skygw_query_type_t resolve_query_type(
item->name, item->name,
STRITEMTYPE(itype)); STRITEMTYPE(itype));
if (item->type() == Item::SUBSELECT_ITEM) { if (itype == Item::SUBSELECT_ITEM) {
continue; continue;
} else if (item->type() == Item::FUNC_ITEM) { } else if (itype == Item::FUNC_ITEM) {
skygw_query_type_t skygw_query_type_t
func_qtype = QUERY_TYPE_UNKNOWN; func_qtype = QUERY_TYPE_UNKNOWN;
/** /**
@ -526,6 +526,7 @@ static skygw_query_type_t resolve_query_type(
pthread_self()); pthread_self());
break; break;
case Item_func::NOW_FUNC: case Item_func::NOW_FUNC:
case Item_func::GSYSVAR_FUNC:
func_qtype = QUERY_TYPE_LOCAL_READ; func_qtype = QUERY_TYPE_LOCAL_READ;
skygw_log_write( skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,

View File

@ -608,6 +608,15 @@ int eno = 0;
eno, eno,
strerror(eno)); strerror(eno));
} }
else
{
/**
* If read would block it means that other thread
* has probably read the data.
*/
n = 0;
}
gwbuf_free(buffer); gwbuf_free(buffer);
goto return_n; goto return_n;
} }

View File

@ -145,13 +145,13 @@ static int gw_read_backend_event(DCB *dcb) {
CHK_DCB(dcb); CHK_DCB(dcb);
CHK_SESSION(dcb->session); CHK_SESSION(dcb->session);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
/** return only with complete session */ /** return only with complete session */
current_session = gw_get_shared_session_auth_info(dcb); current_session = gw_get_shared_session_auth_info(dcb);
ss_dassert(current_session != NULL); ss_dassert(current_session != NULL);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
skygw_log_write( skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [gw_read_backend_event] Read dcb %p fd %d protocol " "%lu [gw_read_backend_event] Read dcb %p fd %d protocol "
@ -162,12 +162,25 @@ static int gw_read_backend_event(DCB *dcb) {
backend_protocol->state, backend_protocol->state,
STRPROTOCOLSTATE(backend_protocol->state)); STRPROTOCOLSTATE(backend_protocol->state));
/* backend is connected: /* backend is connected:
* *
* 1. read server handshake * 1. read server handshake
* 2. if (success) write auth request * 2. if (success) write auth request
* 3. and return * 3. and return
*/ */
/**
* If starting to auhenticate with backend server, lock dcb
* to prevent overlapping processing of auth messages.
*/
if (backend_protocol->state == MYSQL_CONNECTED) {
spinlock_acquire(&dcb->authlock);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
if (backend_protocol->state == MYSQL_CONNECTED) { if (backend_protocol->state == MYSQL_CONNECTED) {
if (gw_read_backend_handshake(backend_protocol) != 0) { if (gw_read_backend_handshake(backend_protocol) != 0) {
@ -186,12 +199,24 @@ static int gw_read_backend_event(DCB *dcb) {
} }
} }
} }
spinlock_release(&dcb->authlock);
}
/* /*
* Now: * Now:
* -- check the authentication reply from backend * -- check the authentication reply from backend
* OR * OR
* -- handle a previous handshake error * -- handle a previous handshake error
*/ */
if (backend_protocol->state == MYSQL_AUTH_RECV ||
backend_protocol->state == MYSQL_AUTH_FAILED)
{
spinlock_acquire(&dcb->authlock);
backend_protocol = (MySQLProtocol *) dcb->protocol;
CHK_PROTOCOL(backend_protocol);
if (backend_protocol->state == MYSQL_AUTH_RECV || if (backend_protocol->state == MYSQL_AUTH_RECV ||
backend_protocol->state == MYSQL_AUTH_FAILED) backend_protocol->state == MYSQL_AUTH_FAILED)
{ {
@ -210,7 +235,8 @@ static int gw_read_backend_event(DCB *dcb) {
/** /**
* Read backed auth reply * Read backed auth reply
*/ */
receive_rc = gw_receive_backend_auth(backend_protocol); receive_rc =
gw_receive_backend_auth(backend_protocol);
switch (receive_rc) { switch (receive_rc) {
case -1: case -1:
@ -218,17 +244,19 @@ static int gw_read_backend_event(DCB *dcb) {
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : backend server didn't accept " "Error : backend server didn't "
"authentication for user %s.", "accept authentication for user "
"%s.",
current_session->user); current_session->user);
break; break;
case 1: case 1:
backend_protocol->state = MYSQL_IDLE; backend_protocol->state = MYSQL_IDLE;
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_read_backend_event] " "%lu [gw_read_backend_event] "
"gw_receive_backend_auth succeed. dcb " "gw_receive_backend_auth succeed. "
"%p fd %d, user %s.", "dcb %p fd %d, user %s.",
pthread_self(), pthread_self(),
dcb, dcb,
dcb->fd, dcb->fd,
@ -247,15 +275,16 @@ static int gw_read_backend_event(DCB *dcb) {
dcb->fd, dcb->fd,
current_session->user); current_session->user);
rc = 0; rc = 0;
goto return_rc; goto return_with_lock;
break; break;
} /* switch */ } /* switch */
} }
if (backend_protocol->state == MYSQL_AUTH_FAILED) { if (backend_protocol->state == MYSQL_AUTH_FAILED) {
/** vraa : errorHandle */ /**
/* check the delayq before the reply */ * vraa : errorHandle
spinlock_acquire(&dcb->authlock); * check the delayq before the reply
*/
if (dcb->delayq != NULL) { if (dcb->delayq != NULL) {
/* send an error to the client */ /* send an error to the client */
mysql_send_custom_error( mysql_send_custom_error(
@ -264,12 +293,10 @@ static int gw_read_backend_event(DCB *dcb) {
0, 0,
"Connection to backend lost."); "Connection to backend lost.");
// consume all the delay queue // consume all the delay queue
dcb->delayq = gwbuf_consume(dcb->delayq, dcb->delayq = gwbuf_consume(
gwbuf_length( dcb->delayq,
dcb->delayq)); gwbuf_length(dcb->delayq));
} }
spinlock_release(&dcb->authlock);
rsession = session->router_session; rsession = session->router_session;
ss_dassert(rsession != NULL); ss_dassert(rsession != NULL);
/** /**
@ -277,6 +304,7 @@ static int gw_read_backend_event(DCB *dcb) {
* rsession should never be NULL here. * rsession should never be NULL here.
**/ **/
ss_dassert(rsession != NULL); ss_dassert(rsession != NULL);
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [gw_read_backend_event] " "%lu [gw_read_backend_event] "
@ -286,8 +314,10 @@ static int gw_read_backend_event(DCB *dcb) {
/* close router_session */ /* close router_session */
router->closeSession(router_instance, rsession); router->closeSession(router_instance, rsession);
rc = 1; rc = 1;
goto return_rc; goto return_with_lock;
} else { }
else
{
ss_dassert(backend_protocol->state == MYSQL_IDLE); ss_dassert(backend_protocol->state == MYSQL_IDLE);
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_DEBUG, LOGFILE_DEBUG,
@ -298,18 +328,18 @@ static int gw_read_backend_event(DCB *dcb) {
dcb->fd, dcb->fd,
current_session->user); current_session->user);
spinlock_acquire(&dcb->authlock);
/* check the delay queue and flush the data */ /* check the delay queue and flush the data */
if(dcb->delayq) { if (dcb->delayq)
{
backend_write_delayqueue(dcb); backend_write_delayqueue(dcb);
spinlock_release(&dcb->authlock);
rc = 1; rc = 1;
goto return_rc; goto return_with_lock;
} }
}
} /* MYSQL_AUTH_RECV || MYSQL_AUTH_FAILED */
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
rc = 0;
goto return_rc;
} /* MYSQL_AUTH_FAILED */
} /* MYSQL_AUTH_RECV || MYSQL_AUTH_FAILED */ } /* MYSQL_AUTH_RECV || MYSQL_AUTH_FAILED */
/* reading MySQL command output from backend and writing to the client */ /* reading MySQL command output from backend and writing to the client */
@ -377,6 +407,10 @@ static int gw_read_backend_event(DCB *dcb) {
return_rc: return_rc:
return rc; return rc;
return_with_lock:
spinlock_release(&dcb->authlock);
goto return_rc;
} }
/* /*
@ -473,8 +507,8 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
/** Free buffer memory */ /** Free buffer memory */
gwbuf_consume(queue, GWBUF_LENGTH(queue)); gwbuf_consume(queue, GWBUF_LENGTH(queue));
skygw_log_write_flush( skygw_log_write(
LOGFILE_ERROR, LOGFILE_TRACE,
"%lu [gw_MySQLWrite_backend] Write to backend failed. " "%lu [gw_MySQLWrite_backend] Write to backend failed. "
"Backend dcb %p fd %d is %s.", "Backend dcb %p fd %d is %s.",
pthread_self(), pthread_self(),
@ -569,7 +603,7 @@ static int gw_error_backend_event(DCB *dcb) {
*/ */
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_read_backend_event] " "%lu [gw_error_backend_event] "
"Call closeSession for backend " "Call closeSession for backend "
"session.", "session.",
pthread_self()); pthread_self());

View File

@ -983,13 +983,6 @@ int gw_MySQLAccept(DCB *listener)
c_sock); c_sock);
conn_open[c_sock] = true; conn_open[c_sock] = true;
#endif #endif
/*
fprintf(stderr,
"Processing %i connection fd %i for listener %i\n",
listener->stats.n_accepts,
c_sock,
listener->fd);
*/
/* set nonblocking */ /* set nonblocking */
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
setnonblocking(c_sock); setnonblocking(c_sock);

View File

@ -732,6 +732,5 @@ static void rses_exit_router_action(
ROUTER_CLIENT_SES* rses) ROUTER_CLIENT_SES* rses)
{ {
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
ss_dassert(((SPINLOCK)rses->rses_lock).lock == 1);
spinlock_release(&rses->rses_lock); spinlock_release(&rses->rses_lock);
} }

View File

@ -707,7 +707,6 @@ static void rses_exit_router_action(
ROUTER_CLIENT_SES* rses) ROUTER_CLIENT_SES* rses)
{ {
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
ss_dassert(((SPINLOCK)rses->rses_lock).lock == 1);
spinlock_release(&rses->rses_lock); spinlock_release(&rses->rses_lock);
} }