Merge branch 'develop' into dcb_code_revert

This commit is contained in:
Markus Makela
2015-08-20 12:06:14 +03:00
10 changed files with 100 additions and 37 deletions

View File

@ -20,20 +20,33 @@
#include <gwbitmask.h> #include <gwbitmask.h>
/** /**
* @file gwbitmask.c Implementation of bitmask opertions for the gateway * @file gwbitmask.c Implementation of bitmask operations for the gateway
* *
* We provide basic bitmask manipulation routines, the size of * We provide basic bitmask manipulation routines, the size of
* the bitmask will grow dynamically based on the highest bit * the bitmask will grow dynamically based on the highest bit
* number that is set or cleared within the bitmask. * number that is set or cleared within the bitmask.
* *
* Bitmsk growth happens in increments rather than via a single bit as * Bitmask growth happens in increments rather than via a single bit as
* a time. * a time.
*
* Please note limitations to these mechanisms:
*
* 1. The initial size and increment size MUST be exact multiples of 8
* 2. Only suitable for a compact set of bit numbers i.e. the numbering
* needs to start near to 0 and grow without sizeable gaps
* 3. It is assumed that a bit number bigger than the current size can
* be accommodated by adding a single extra block of bits
* 4. During copy, if memory cannot be allocated, a zero length bitmap is
* created as the destination. This will test true for all bits clear, which
* may be a serious error. However, the memory requirement is very small and
* is only likely to fail in circumstances where a lot else is going wrong.
* *
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 28/06/13 Mark Riddoch Initial implementation * 28/06/13 Mark Riddoch Initial implementation
* 20/08/15 Martin Brampton Added caveats about limitations (above)
* *
* @endverbatim * @endverbatim
*/ */
@ -42,7 +55,7 @@
* Initialise a bitmask * Initialise a bitmask
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @return The value of *variable before the add occured * @return The value of *variable before the add occurred
*/ */
void void
bitmask_init(GWBITMASK *bitmask) bitmask_init(GWBITMASK *bitmask)
@ -77,7 +90,9 @@ bitmask_free(GWBITMASK *bitmask)
/** /**
* Set the bit at the specified bit position in the bitmask. * Set the bit at the specified bit position in the bitmask.
* The bitmask will automatically be extended if the bit is * The bitmask will automatically be extended if the bit is
* beyond the current bitmask length * beyond the current bitmask length. Note that growth is only
* by a single increment - the bit numbers used need to be a
* fairly dense set.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @param bit Bit to set * @param bit Bit to set
@ -106,7 +121,9 @@ unsigned char mask;
/** /**
* Clear the bit at the specified bit position in the bitmask. * Clear the bit at the specified bit position in the bitmask.
* The bitmask will automatically be extended if the bit is * The bitmask will automatically be extended if the bit is
* beyond the current bitmask length * beyond the current bitmask length. This could be optimised
* by always assuming that a bit beyond the current length is
* unset (i.e. 0) and not extending the actual bitmask.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @param bit Bit to clear * @param bit Bit to clear
@ -134,7 +151,8 @@ unsigned char mask;
* Return a non-zero value if the bit at the specified bit * Return a non-zero value if the bit at the specified bit
* position in the bitmask is set. * position in the bitmask is set.
* The bitmask will automatically be extended if the bit is * The bitmask will automatically be extended if the bit is
* beyond the current bitmask length * beyond the current bitmask length. This could be optimised
* by assuming that a bit beyond the length is unset.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @param bit Bit to clear * @param bit Bit to clear
@ -162,7 +180,9 @@ unsigned char mask;
/** /**
* Return a non-zero value of the bitmask has no bits set * Return a non-zero value of the bitmask has no bits set
* in it. * in it. This logic could be defeated if the bitmask is a
* copy and there was insufficient memory when the copy was
* made.
* *
* @param bitmask Pointer the bitmask * @param bitmask Pointer the bitmask
* @return Non-zero if the bitmask has no bits set * @return Non-zero if the bitmask has no bits set
@ -191,6 +211,10 @@ unsigned char *ptr, *eptr;
/** /**
* Copy the contents of one bitmap to another. * Copy the contents of one bitmap to another.
*
* On memory failure, a zero length bitmask is created in the destination,
* which could seriously undermine the logic. Given the small size of the
* bitmask, this is unlikely to happen.
* *
* @param dest Bitmap tp update * @param dest Bitmap tp update
* @param src Bitmap to copy * @param src Bitmap to copy

View File

@ -339,7 +339,7 @@ poll_add_dcb(DCB *dcb)
int int
poll_remove_dcb(DCB *dcb) poll_remove_dcb(DCB *dcb)
{ {
int rc = -1; int dcbfd, rc = -1;
struct epoll_event ev; struct epoll_event ev;
CHK_DCB(dcb); CHK_DCB(dcb);
@ -362,11 +362,14 @@ poll_remove_dcb(DCB *dcb)
dcb, dcb,
STRDCBSTATE(dcb->state)))); STRDCBSTATE(dcb->state))));
} }
/*< Set bit for each maxscale thread. This should be done before
* the state is changed, so as to protect the DCB from premature
* destruction. */
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
/*< /*<
* Set state to NOPOLLING and remove dcb from poll set. * Set state to NOPOLLING and remove dcb from poll set.
*/ */
dcb->state = DCB_STATE_NOPOLLING; dcb->state = DCB_STATE_NOPOLLING;
spinlock_release(&dcb->dcb_initlock);
/** /**
* Only positive fds can be removed from epoll set. * Only positive fds can be removed from epoll set.
@ -375,8 +378,9 @@ poll_remove_dcb(DCB *dcb)
* only action for them is already done - the change of state to * only action for them is already done - the change of state to
* DCB_STATE_NOPOLLING. * DCB_STATE_NOPOLLING.
*/ */
dcbfd = dcb->fd;
spinlock_release(&dcb->dcb_initlock); spinlock_release(&dcb->dcb_initlock);
if (dcb->fd > 0) if (dcbfd > 0)
{ {
rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev); rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
/** /**
@ -386,8 +390,6 @@ poll_remove_dcb(DCB *dcb)
*/ */
if (rc) rc = poll_resolve_error(dcb, errno, false); if (rc) rc = poll_resolve_error(dcb, errno, false);
if (rc) raise(SIGABRT); if (rc) raise(SIGABRT);
/*< Set bit for each maxscale thread */
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
} }
return rc; return rc;
} }

View File

@ -20,22 +20,23 @@
#include <spinlock.h> #include <spinlock.h>
/** /**
* @file gwbitmask.h An implementation of an arbitarly long bitmask * @file gwbitmask.h An implementation of an arbitrarily long bitmask
* *
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 28/06/13 Mark Riddoch Initial implementation * 28/06/13 Mark Riddoch Initial implementation
* *
* @endverbatim * @endverbatim
*/ */
/* Both these numbers MUST be exact multiples of 8 */
#define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */ #define BIT_LENGTH_INITIAL 32 /**< Initial number of bits in the bitmask */
#define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */ #define BIT_LENGTH_INC 32 /**< Number of bits to add on each increment */
/** /**
* The bitmask structure used to store an arbitary large bitmask * The bitmask structure used to store an arbitrary large bitmask
*/ */
typedef struct { typedef struct {
SPINLOCK lock; /**< Lock to protect the bitmask */ SPINLOCK lock; /**< Lock to protect the bitmask */

View File

@ -207,6 +207,7 @@ typedef struct {
int n_bursts; /*< Number of bursts sent */ int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */ int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */ int n_flows; /*< Number of flow control restarts */
int n_queries; /*< Number of SQL queries */
int n_upd; int n_upd;
int n_cb; int n_cb;
int n_cbna; int n_cbna;

View File

@ -541,7 +541,7 @@ static int gw_read_backend_event(DCB *dcb) {
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) !=
MYSQL_COM_UNDEFINED) MYSQL_COM_UNDEFINED)
{ {
read_buffer = process_response_data(dcb, read_buffer, nbytes_read); read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer));
/** /**
* Received incomplete response to session command. * Received incomplete response to session command.
* Store it to readqueue and return. * Store it to readqueue and return.
@ -1517,7 +1517,9 @@ static GWBUF* process_response_data (
ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */ ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */
MySQLProtocol* p; MySQLProtocol* p;
GWBUF* outbuf = NULL; GWBUF* outbuf = NULL;
int initial_packets = npackets_left;
ssize_t initial_bytes = nbytes_left;
/** Get command which was stored in gw_MySQLWrite_backend */ /** Get command which was stored in gw_MySQLWrite_backend */
p = DCB_PROTOCOL(dcb, MySQLProtocol); p = DCB_PROTOCOL(dcb, MySQLProtocol);
if (!DCB_IS_CLONE(dcb)) CHK_PROTOCOL(p); if (!DCB_IS_CLONE(dcb)) CHK_PROTOCOL(p);
@ -1560,11 +1562,13 @@ static GWBUF* process_response_data (
* enough data to read the packet length. * enough data to read the packet length.
*/ */
init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left);
initial_packets = npackets_left;
initial_bytes = nbytes_left;
} }
} }
/** Only session commands with responses should be processed */ /** Only session commands with responses should be processed */
ss_dassert(npackets_left > 0); ss_dassert(npackets_left > 0);
/** Read incomplete packet. */ /** Read incomplete packet. */
if (nbytes_left > nbytes_to_process) if (nbytes_left > nbytes_to_process)
{ {
@ -1639,10 +1643,15 @@ static GWBUF* process_response_data (
wait for more data from the backend server.*/ wait for more data from the backend server.*/
if(readbuf == NULL || GWBUF_LENGTH(readbuf) < 3) if(readbuf == NULL || GWBUF_LENGTH(readbuf) < 3)
{ {
skygw_log_write(LD," %lu [%s] Read %s packet with %d bytes. Waiting for %d packets.", skygw_log_write(LD," %lu [%s] Read %d packets. Waiting for %d more packets for a total of %d packets.",
pthread_self(),__FUNCTION__,readbuf?"partial":"empty", pthread_self(),__FUNCTION__,initial_packets - npackets_left,
readbuf?GWBUF_LENGTH(readbuf):0,npackets_left); npackets_left,initial_packets);
break;
/** Store the already read data into the readqueue of the DCB
* and restore the response status to the initial number of packets */
dcb->dcb_readqueue = gwbuf_append(outbuf,dcb->dcb_readqueue);
protocol_set_response_status(p, initial_packets, initial_bytes);
return NULL;
} }
data = GWBUF_DATA(readbuf); data = GWBUF_DATA(readbuf);

View File

@ -640,10 +640,12 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
LOGIF(LM, (skygw_log_write_flush( LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE, LOGFILE_MESSAGE,
"%s: Slave %s, server id %d, disconnected after %ld seconds. " "%s: Slave %s, server id %d, disconnected after %ld seconds. "
"%d events sent, %lu bytes.", "%d SQL commands, %d events sent (%lu bytes).",
router->service->name, slave->dcb->remote, router->service->name, slave->dcb->remote,
slave->serverid, slave->serverid,
time(0) - slave->connect_time, slave->stats.n_events, time(0) - slave->connect_time,
slave->stats.n_queries,
slave->stats.n_events,
slave->stats.n_bytes))); slave->stats.n_bytes)));
/* /*

View File

@ -122,6 +122,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
switch (MYSQL_COMMAND(queue)) switch (MYSQL_COMMAND(queue))
{ {
case COM_QUERY: case COM_QUERY:
slave->stats.n_queries++;
return blr_slave_query(router, slave, queue); return blr_slave_query(router, slave, queue);
break; break;
case COM_REGISTER_SLAVE: case COM_REGISTER_SLAVE:

View File

@ -1581,10 +1581,17 @@ void check_drop_tmp_table(
DCB* master_dcb = NULL; DCB* master_dcb = NULL;
rses_property_t* rses_prop_tmp; rses_property_t* rses_prop_tmp;
if(router_cli_ses == NULL || querybuf == NULL)
{
skygw_log_write(LE,"[%s] Error: NULL parameters passed: %p %p",
__FUNCTION__,router_cli_ses,querybuf);
return;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES]; rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb; master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL) if(master_dcb == NULL || master_dcb->session == NULL)
{ {
skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. " skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. "
"This means that the connection to the master server is already " "This means that the connection to the master server is already "
@ -1659,10 +1666,17 @@ static skygw_query_type_t is_read_tmp_table(
skygw_query_type_t qtype = type; skygw_query_type_t qtype = type;
rses_property_t* rses_prop_tmp; rses_property_t* rses_prop_tmp;
if(router_cli_ses == NULL || querybuf == NULL)
{
skygw_log_write(LE,"[%s] Error: NULL parameters passed: %p %p",
__FUNCTION__,router_cli_ses,querybuf);
return type;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES]; rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb; master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL) if(master_dcb == NULL || master_dcb->session == NULL)
{ {
skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. " skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. "
"This means that the connection to the master server is already " "This means that the connection to the master server is already "
@ -1740,22 +1754,26 @@ static void check_create_tmp_table(
GWBUF* querybuf, GWBUF* querybuf,
skygw_query_type_t type) skygw_query_type_t type)
{ {
int klen = 0; int klen = 0;
char *hkey,*dbname; char *hkey,*dbname;
MYSQL_session* data; MYSQL_session* data;
DCB* master_dcb = NULL;
rses_property_t* rses_prop_tmp;
HASHTABLE* h;
DCB* master_dcb = NULL; if(router_cli_ses == NULL || querybuf == NULL)
rses_property_t* rses_prop_tmp; {
HASHTABLE* h; skygw_log_write(LE,"[%s] Error: NULL parameters passed: %p %p",
__FUNCTION__,router_cli_ses,querybuf);
return;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES]; rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb; master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL) if(master_dcb == NULL || master_dcb->session == NULL)
{ {
skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. " skygw_log_write(LE,"[%s] Error: Master server DCB is NULL. "
"This means that the connection to the master server is already " "This means that the connection to the master server is already "
"closed while a query is still being routed.",__FUNCTION__); "closed while a query is still being routed.",__FUNCTION__);
return; return;

View File

@ -4518,7 +4518,12 @@ int process_show_shards(ROUTER_CLIENT_SES* rses)
sl.iter = iter; sl.iter = iter;
sl.rses = rses; sl.rses = rses;
sl.rset = resultset_create(shard_list_cb,&sl); if((sl.rset = resultset_create(shard_list_cb,&sl)) == NULL)
{
skygw_log_write(LE,"[%s] Error: Failed to create resultset.",__FUNCTION__);
return -1;
}
resultset_add_column(sl.rset,"Database",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR); resultset_add_column(sl.rset,"Database",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
resultset_add_column(sl.rset,"Server",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR); resultset_add_column(sl.rset,"Server",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
resultset_stream_mysql(sl.rset,rses->rses_client_dcb); resultset_stream_mysql(sl.rset,rses->rses_client_dcb);

View File

@ -33,7 +33,7 @@ static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *session); static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session); static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void clientReply(ROUTER *instance, void *session, GWBUF *queue); static void clientReply(ROUTER *instance, void *session, GWBUF *queue,DCB*);
static void diagnostic(ROUTER *instance, DCB *dcb); static void diagnostic(ROUTER *instance, DCB *dcb);
static uint8_t getCapabilities (ROUTER* inst, void* router_session); static uint8_t getCapabilities (ROUTER* inst, void* router_session);
static void handleError( static void handleError(
@ -151,7 +151,7 @@ routeQuery(ROUTER *instance, void *session, GWBUF *queue)
return 0; return 0;
} }
void clientReply(ROUTER* instance, void* session, GWBUF* queue) void clientReply(ROUTER* instance, void* session, GWBUF* queue, DCB* dcb)
{ {
} }