Merge from release-1.0beta to Z2

This commit is contained in:
VilhoRaatikka
2014-08-15 18:00:39 +03:00
parent cefadaeb75
commit 677a44f497
11 changed files with 277 additions and 103 deletions

View File

@ -761,12 +761,13 @@ return_rc:
*/
static int gw_error_backend_event(DCB *dcb)
{
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
GWBUF* errbuf;
bool succp;
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
GWBUF* errbuf;
bool succp;
session_state_t ses_state;
CHK_DCB(dcb);
session = dcb->session;
@ -775,11 +776,6 @@ static int gw_error_backend_event(DCB *dcb)
router = session->service->router;
router_instance = session->service->router_instance;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend error event handling.")));
#endif
/**
* Avoid running redundant error handling procedure.
* dcb_close is already called for the DCB. Thus, either connection is
@ -795,6 +791,34 @@ static int gw_error_backend_event(DCB *dcb)
0,
"Lost connection to backend server.");
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
/**
* Session might be initialized when DCB already is in the poll set.
* Thus hangup can occur in the middle of session initialization.
* Only complete and successfully initialized sessions allow for
* calling error handler.
*/
while (ses_state == SESSION_STATE_READY)
{
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
}
if (ses_state != SESSION_STATE_ROUTER_READY)
{
gwbuf_free(errbuf);
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend error event handling.")));
#endif
router->handleError(router_instance,
rsession,
errbuf,
@ -810,6 +834,7 @@ static int gw_error_backend_event(DCB *dcb)
}
dcb_close(dcb);
retblock:
return 1;
}
@ -923,12 +948,13 @@ return_fd:
static int
gw_backend_hangup(DCB *dcb)
{
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
bool succp;
GWBUF* errbuf;
SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
bool succp;
GWBUF* errbuf;
session_state_t ses_state;
CHK_DCB(dcb);
session = dcb->session;
@ -936,20 +962,41 @@ gw_backend_hangup(DCB *dcb)
rsession = session->router_session;
router = session->service->router;
router_instance = session->service->router_instance;
router_instance = session->service->router_instance;
errbuf = mysql_create_custom_error(
1,
0,
"Lost connection to backend server.");
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
/**
* Session might be initialized when DCB already is in the poll set.
* Thus hangup can occur in the middle of session initialization.
* Only complete and successfully initialized sessions allow for
* calling error handler.
*/
while (ses_state == SESSION_STATE_READY)
{
spinlock_acquire(&session->ses_lock);
ses_state = session->state;
spinlock_release(&session->ses_lock);
}
if (ses_state != SESSION_STATE_ROUTER_READY)
{
gwbuf_free(errbuf);
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup error handling.")));
#endif
errbuf = mysql_create_custom_error(
1,
0,
"Lost connection to backend server.");
router->handleError(router_instance,
rsession,
errbuf,
@ -958,7 +1005,8 @@ gw_backend_hangup(DCB *dcb)
&succp);
/** There are not required backends available, close session. */
if (!succp) {
if (!succp)
{
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
@ -971,7 +1019,8 @@ gw_backend_hangup(DCB *dcb)
}
dcb_close(dcb);
return 1;
retblock:
return 1;
}
/**

View File

@ -65,7 +65,7 @@ static int gw_client_hangup_event(DCB *dcb);
int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message);
int MySQLSendHandshake(DCB* dcb);
static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue);
static int route_by_statement(SESSION *, GWBUF *);
static int route_by_statement(SESSION *, GWBUF **);
/*
* The "module object" for the mysqld client protocol module.
@ -534,7 +534,7 @@ int gw_read_client_event(
if (nbytes_read == 0)
{
goto return_rc;
}
}
/**
* if read queue existed appent read to it.
* if length of read buffer is less than 3 or less than mysql packet
@ -783,7 +783,7 @@ int gw_read_client_event(
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(session, read_buffer);
rc = route_by_statement(session, &read_buffer);
if (read_buffer != NULL)
{
@ -1300,12 +1300,19 @@ static int gw_error_client_event(
STRDCBSTATE(dcb->state),
(session != NULL ? session : NULL))));
if (session != NULL && session->state == SESSION_STATE_STOPPING)
{
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client error event handling.")));
#endif
dcb_close(dcb);
retblock:
return 1;
}
@ -1380,12 +1387,19 @@ gw_client_hangup_event(DCB *dcb)
{
CHK_SESSION(session);
}
if (session != NULL && session->state == SESSION_STATE_STOPPING)
{
goto retblock;
}
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client hangup error handling.")));
"Client hangup error handling.")));
#endif
dcb_close(dcb);
retblock:
return 1;
}
@ -1399,14 +1413,16 @@ gw_client_hangup_event(DCB *dcb)
* Return 1 in success. If the last packet is incomplete return success but
* leave incomplete packet to readbuf.
*/
static int route_by_statement(SESSION *session, GWBUF *readbuf)
static int route_by_statement(
SESSION* session,
GWBUF** p_readbuf)
{
int rc = -1;
GWBUF* packetbuf;
#if defined(SS_DEBUG)
GWBUF* tmpbuf;
tmpbuf = readbuf;
tmpbuf = *p_readbuf;
while (tmpbuf != NULL)
{
ss_dassert(GWBUF_IS_TYPE_MYSQL(tmpbuf));
@ -1415,15 +1431,14 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf)
#endif
do
{
ss_dassert(GWBUF_IS_TYPE_MYSQL(readbuf));
ss_dassert(GWBUF_IS_TYPE_MYSQL((*p_readbuf)));
packetbuf = gw_MySQL_get_next_packet(&readbuf);
ss_dassert(GWBUF_IS_TYPE_MYSQL(packetbuf));
packetbuf = gw_MySQL_get_next_packet(p_readbuf);
if (packetbuf != NULL)
{
CHK_GWBUF(packetbuf);
ss_dassert(GWBUF_IS_TYPE_MYSQL(packetbuf));
/**
* This means that buffer includes exactly one MySQL
* statement.
@ -1446,7 +1461,7 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf)
goto return_rc;
}
}
while (readbuf != NULL);
while (*p_readbuf != NULL);
return_rc:
return rc;

View File

@ -79,6 +79,7 @@ MySQLProtocol* mysql_protocol_init(
strerror(eno))));
goto return_p;
}
p->protocol_state = MYSQL_PROTOCOL_ALLOC;
p->protocol_auth_state = MYSQL_ALLOC;
p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED;
p->protocol_command.scom_nresponse_packets = 0;
@ -90,6 +91,7 @@ MySQLProtocol* mysql_protocol_init(
/*< Assign fd with protocol */
p->fd = fd;
p->owner_dcb = dcb;
p->protocol_state = MYSQL_PROTOCOL_ACTIVE;
CHK_PROTOCOL(p);
return_p:
return p;
@ -107,15 +109,30 @@ return_p:
void mysql_protocol_done (
DCB* dcb)
{
server_command_t* scmd = ((MySQLProtocol *)dcb->protocol)->protocol_cmd_history;
MySQLProtocol* p;
server_command_t* scmd;
server_command_t* scmd2;
p = (MySQLProtocol *)dcb->protocol;
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
{
goto retblock;
}
scmd = p->protocol_cmd_history;
while (scmd != NULL)
{
scmd2 = scmd->scom_next;
free(scmd);
scmd = scmd2;
}
p->protocol_state = MYSQL_PROTOCOL_DONE;
retblock:
spinlock_release(&p->protocol_lock);
}
@ -886,7 +903,7 @@ int mysql_send_com_quit(
if (buf == NULL)
{
return 0;
}
}
nbytes = dcb->func.write(dcb, buf);
return nbytes;
@ -1497,6 +1514,9 @@ GWBUF* gw_MySQL_get_next_packet(
size_t packetlen;
size_t totalbuflen;
uint8_t* data;
size_t nbytes_copied = 0;
uint8_t* target;
readbuf = *p_readbuf;
if (readbuf == NULL)
@ -1523,42 +1543,27 @@ GWBUF* gw_MySQL_get_next_packet(
packetbuf = NULL;
goto return_packetbuf;
}
/** there is one complete packet in the buffer */
if (packetlen == buflen)
{
packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen);
*p_readbuf = gwbuf_consume(readbuf, packetlen);
goto return_packetbuf;
}
packetbuf = gwbuf_alloc(packetlen);
target = GWBUF_DATA(packetbuf);
packetbuf->gwbuf_type = readbuf->gwbuf_type; /*< Copy the type too */
/**
* Packet spans multiple buffers.
* Allocate buffer for complete packet
* copy packet parts into it and consume copied bytes
*/
else if (packetlen > buflen)
* Copy first MySQL packet to packetbuf and leave posible other
* packets to read buffer.
*/
while (nbytes_copied < packetlen && totalbuflen > 0)
{
size_t nbytes_copied = 0;
uint8_t* target;
uint8_t* src = GWBUF_DATA((*p_readbuf));
size_t bytestocopy;
packetbuf = gwbuf_alloc(packetlen);
target = GWBUF_DATA(packetbuf);
while (nbytes_copied < packetlen)
{
uint8_t* src = GWBUF_DATA(readbuf);
size_t buflen = GWBUF_LENGTH(readbuf);
memcpy(target+nbytes_copied, src, buflen);
*p_readbuf = gwbuf_consume(readbuf, buflen);
nbytes_copied += buflen;
}
ss_dassert(nbytes_copied == packetlen);
}
else
{
packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen);
*p_readbuf = gwbuf_consume(readbuf, packetlen);
bytestocopy = MIN(buflen,packetlen-nbytes_copied);
memcpy(target+nbytes_copied, src, bytestocopy);
*p_readbuf = gwbuf_consume((*p_readbuf), bytestocopy);
totalbuflen = gwbuf_length((*p_readbuf));
nbytes_copied += bytestocopy;
}
ss_dassert(buflen == 0 || nbytes_copied == packetlen);
return_packetbuf:
return packetbuf;
@ -1625,11 +1630,16 @@ void protocol_archive_srv_command(
MySQLProtocol* p)
{
server_command_t* s1;
server_command_t** s2;
server_command_t* h1;
int len = 0;
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
{
goto retblock;
}
s1 = &p->protocol_command;
LOGIF(LT, (skygw_log_write(
@ -1639,18 +1649,19 @@ void protocol_archive_srv_command(
p->owner_dcb->fd)));
/** Copy to history list */
s2 = &p->protocol_cmd_history;
if (*s2 != NULL)
{
while ((*s2)->scom_next != NULL)
{
*s2 = (*s2)->scom_next;
len += 1;
}
if ((h1 = p->protocol_cmd_history) == NULL)
{
p->protocol_cmd_history = server_command_copy(s1);
}
*s2 = server_command_copy(s1);
else
{
while (h1->scom_next != NULL)
{
h1 = h1->scom_next;
}
h1->scom_next = server_command_copy(s1);
}
/** Keep history limits, remove oldest */
if (len > MAX_CMD_HISTORY)
{
@ -1669,6 +1680,8 @@ void protocol_archive_srv_command(
p->protocol_command = *(s1->scom_next);
free(s1->scom_next);
}
retblock:
spinlock_release(&p->protocol_lock);
}
@ -1685,6 +1698,10 @@ void protocol_add_srv_command(
spinlock_acquire(&p->protocol_lock);
if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE)
{
goto retblock;
}
/** this is the only server command in protocol */
if (p->protocol_command.scom_cmd == MYSQL_COM_UNDEFINED)
{
@ -1717,6 +1734,7 @@ void protocol_add_srv_command(
c = c->scom_next;
}
#endif
retblock:
spinlock_release(&p->protocol_lock);
}