Merge branch 'develop' into 1.2.1-binlog_router_trx

This commit is contained in:
MassimilianoPinto
2015-08-20 10:09:32 +02:00
5 changed files with 65 additions and 24 deletions

View File

@ -20,20 +20,33 @@
#include <gwbitmask.h> #include <gwbitmask.h>
/** /**
* @file gwbitmask.c Implementation of bitmask opertions for the gateway * @file gwbitmask.c Implementation of bitmask operations for the gateway
* *
* We provide basic bitmask manipulation routines, the size of * We provide basic bitmask manipulation routines, the size of
* the bitmask will grow dynamically based on the highest bit * the bitmask will grow dynamically based on the highest bit
* number that is set or cleared within the bitmask. * number that is set or cleared within the bitmask.
* *
* Bitmsk growth happens in increments rather than via a single bit as * Bitmask growth happens in increments rather than via a single bit as
* a time. * a time.
*
* Please note limitations to these mechanisms:
*
* 1. The initial size and increment size MUST be exact multiples of 8
* 2. Only suitable for a compact set of bit numbers i.e. the numbering
* needs to start near to 0 and grow without sizeable gaps
* 3. It is assumed that a bit number bigger than the current size can
* be accommodated by adding a single extra block of bits
* 4. During copy, if memory cannot be allocated, a zero length bitmap is
* created as the destination. This will test true for all bits clear, which
* may be a serious error. However, the memory requirement is very small and
* is only likely to fail in circumstances where a lot else is going wrong.
* *
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 28/06/13 Mark Riddoch Initial implementation * 28/06/13 Mark Riddoch Initial implementation
* 20/08/15 Martin Brampton Added caveats about limitations (above)
* *
* @endverbatim * @endverbatim
*/ */
@ -42,7 +55,7 @@
* Initialise a bitmask * Initialise a bitmask
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @return The value of *variable before the add occured * @return The value of *variable before the add occurred
*/ */
void void
bitmask_init(GWBITMASK *bitmask) bitmask_init(GWBITMASK *bitmask)
@ -77,7 +90,9 @@ bitmask_free(GWBITMASK *bitmask)
/** /**
* Set the bit at the specified bit position in the bitmask. * Set the bit at the specified bit position in the bitmask.
* The bitmask will automatically be extended if the bit is * The bitmask will automatically be extended if the bit is
* beyond the current bitmask length * beyond the current bitmask length. Note that growth is only
* by a single increment - the bit numbers used need to be a
* fairly dense set.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @param bit Bit to set * @param bit Bit to set
@ -106,7 +121,9 @@ unsigned char mask;
/** /**
* Clear the bit at the specified bit position in the bitmask. * Clear the bit at the specified bit position in the bitmask.
* The bitmask will automatically be extended if the bit is * The bitmask will automatically be extended if the bit is
* beyond the current bitmask length * beyond the current bitmask length. This could be optimised
* by always assuming that a bit beyond the current length is
* unset (i.e. 0) and not extending the actual bitmask.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @param bit Bit to clear * @param bit Bit to clear
@ -134,7 +151,8 @@ unsigned char mask;
* Return a non-zero value if the bit at the specified bit * Return a non-zero value if the bit at the specified bit
* position in the bitmask is set. * position in the bitmask is set.
* The bitmask will automatically be extended if the bit is * The bitmask will automatically be extended if the bit is
* beyond the current bitmask length * beyond the current bitmask length. This could be optimised
* by assuming that a bit beyond the length is unset.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @param bit Bit to clear * @param bit Bit to clear
@ -162,7 +180,9 @@ unsigned char mask;
/** /**
* Return a non-zero value of the bitmask has no bits set * Return a non-zero value of the bitmask has no bits set
* in it. * in it. This logic could be defeated if the bitmask is a
* copy and there was insufficient memory when the copy was
* made.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @return Non-zero if the bitmask has no bits set * @return Non-zero if the bitmask has no bits set
@ -191,6 +211,10 @@ unsigned char *ptr, *eptr;
/** /**
* Copy the contents of one bitmap to another. * Copy the contents of one bitmap to another.
*
* On memory failure, a zero length bitmask is created in the destination,
* which could seriously undermine the logic. Given the small size of the
* bitmask, this is unlikely to happen.
* *
* @param dest Bitmap tp update * @param dest Bitmap tp update
* @param src Bitmap to copy * @param src Bitmap to copy

View File

@ -339,7 +339,7 @@ poll_add_dcb(DCB *dcb)
int int
poll_remove_dcb(DCB *dcb) poll_remove_dcb(DCB *dcb)
{ {
int rc = -1; int dcbfd, rc = -1;
struct epoll_event ev; struct epoll_event ev;
CHK_DCB(dcb); CHK_DCB(dcb);
@ -362,11 +362,14 @@ poll_remove_dcb(DCB *dcb)
dcb, dcb,
STRDCBSTATE(dcb->state)))); STRDCBSTATE(dcb->state))));
} }
/*< Set bit for each maxscale thread. This should be done before
* the state is changed, so as to protect the DCB from premature
* destruction. */
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
/*< /*<
* Set state to NOPOLLING and remove dcb from poll set. * Set state to NOPOLLING and remove dcb from poll set.
*/ */
dcb->state = DCB_STATE_NOPOLLING; dcb->state = DCB_STATE_NOPOLLING;
spinlock_release(&dcb->dcb_initlock);
/** /**
* Only positive fds can be removed from epoll set. * Only positive fds can be removed from epoll set.
@ -375,8 +378,9 @@ poll_remove_dcb(DCB *dcb)
* only action for them is already done - the change of state to * only action for them is already done - the change of state to
* DCB_STATE_NOPOLLING. * DCB_STATE_NOPOLLING.
*/ */
dcbfd = dcb->fd;
spinlock_release(&dcb->dcb_initlock); spinlock_release(&dcb->dcb_initlock);
if (dcb->fd > 0) if (dcbfd > 0)
{ {
rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
/** /**
@ -386,8 +390,6 @@ poll_remove_dcb(DCB *dcb)
*/ */
if (rc) rc = poll_resolve_error(dcb, errno, false); if (rc) rc = poll_resolve_error(dcb, errno, false);
if (rc) raise(SIGABRT); if (rc) raise(SIGABRT);
/*< Set bit for each maxscale thread */
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
} }
return rc; return rc;
} }

View File

@ -20,22 +20,23 @@
#include <spinlock.h> #include <spinlock.h>
/** /**
* @file gwbitmask.h An implementation of an arbitarly long bitmask * @file gwbitmask.h An implementation of an arbitrarily long bitmask
* *
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 28/06/13 Mark Riddoch Initial implementation * 28/06/13 Mark Riddoch Initial implementation
* *
* @endverbatim * @endverbatim
*/ */
/* Both these numbers MUST be exact multiples of 8 */
#define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */ #define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */
#define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */ #define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */
/** /**
* The bitmask structure used to store an arbitary large bitmask * The bitmask structure used to store an arbitrary large bitmask
*/ */
typedef struct { typedef struct {
SPINLOCK lock; /**< Lock to protect the bitmask */ SPINLOCK lock; /**< Lock to protect the bitmask */

View File

@ -541,7 +541,7 @@ static int gw_read_backend_event(DCB *dcb) {
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) !=
MYSQL_COM_UNDEFINED) MYSQL_COM_UNDEFINED)
{ {
read_buffer = process_response_data(dcb, read_buffer, nbytes_read); read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer));
/** /**
* Received incomplete response to session command. * Received incomplete response to session command.
* Store it to readqueue and return. * Store it to readqueue and return.
@ -1517,7 +1517,9 @@ static GWBUF* process_response_data (
ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */ ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */
MySQLProtocol* p; MySQLProtocol* p;
GWBUF* outbuf = NULL; GWBUF* outbuf = NULL;
int initial_packets = npackets_left;
ssize_t initial_bytes = nbytes_left;
/** Get command which was stored in gw_MySQLWrite_backend */ /** Get command which was stored in gw_MySQLWrite_backend */
p = DCB_PROTOCOL(dcb, MySQLProtocol); p = DCB_PROTOCOL(dcb, MySQLProtocol);
if (!DCB_IS_CLONE(dcb)) CHK_PROTOCOL(p); if (!DCB_IS_CLONE(dcb)) CHK_PROTOCOL(p);
@ -1560,11 +1562,13 @@ static GWBUF* process_response_data (
* enough data to read the packet length. * enough data to read the packet length.
*/ */
init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left);
initial_packets = npackets_left;
initial_bytes = nbytes_left;
} }
} }
/** Only session commands with responses should be processed */ /** Only session commands with responses should be processed */
ss_dassert(npackets_left > 0); ss_dassert(npackets_left > 0);
/** Read incomplete packet. */ /** Read incomplete packet. */
if (nbytes_left > nbytes_to_process) if (nbytes_left > nbytes_to_process)
{ {
@ -1639,10 +1643,15 @@ static GWBUF* process_response_data (
wait for more data from the backend server.*/ wait for more data from the backend server.*/
if(readbuf == NULL || GWBUF_LENGTH(readbuf) < 3) if(readbuf == NULL || GWBUF_LENGTH(readbuf) < 3)
{ {
skygw_log_write(LD," %lu [%s] Read %s packet with %d bytes. Waiting for %d packets.", skygw_log_write(LD," %lu [%s] Read %d packets. Waiting for %d more packets for a total of %d packets.",
pthread_self(),__FUNCTION__,readbuf?"partial":"empty", pthread_self(),__FUNCTION__,initial_packets - npackets_left,
readbuf?GWBUF_LENGTH(readbuf):0,npackets_left); npackets_left,initial_packets);
break;
/** Store the already read data into the readqueue of the DCB
* and restore the response status to the initial number of packets */
dcb->dcb_readqueue = gwbuf_append(outbuf,dcb->dcb_readqueue);
protocol_set_response_status(p, initial_packets, initial_bytes);
return NULL;
} }
data = GWBUF_DATA(readbuf); data = GWBUF_DATA(readbuf);

View File

@ -4518,7 +4518,12 @@ int process_show_shards(ROUTER_CLIENT_SES* rses)
sl.iter = iter; sl.iter = iter;
sl.rses = rses; sl.rses = rses;
sl.rset = resultset_create(shard_list_cb,&sl); if((sl.rset = resultset_create(shard_list_cb,&sl)) == NULL)
{
skygw_log_write(LE,"[%s] Error: Failed to create resultset.",__FUNCTION__);
return -1;
}
resultset_add_column(sl.rset,"Database",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR); resultset_add_column(sl.rset,"Database",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
resultset_add_column(sl.rset,"Server",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR); resultset_add_column(sl.rset,"Server",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
resultset_stream_mysql(sl.rset,rses->rses_client_dcb); resultset_stream_mysql(sl.rset,rses->rses_client_dcb);