Merge branch 'release-1.0beta-refresh' into cmake_build
This commit is contained in:
commit
26e2695491
@ -403,18 +403,20 @@ DCB_CALLBACK *cb;
|
||||
* operation of clearing this bit means that no bits are set in
|
||||
* the memdata.bitmask then the DCB is no longer able to be
|
||||
* referenced and it can be finally removed.
|
||||
* Thread won't clear its bit from bitmask of the DCB it is still using.
|
||||
*
|
||||
* @param threadid The thread ID of the caller
|
||||
* @param dcb_in_use The DCB the thread currently uses, NULL or valid DCB.
|
||||
*/
|
||||
DCB *
|
||||
dcb_process_zombies(int threadid)
|
||||
dcb_process_zombies(int threadid, DCB *dcb_in_use)
|
||||
{
|
||||
DCB *ptr, *lptr;
|
||||
DCB* dcb_list = NULL;
|
||||
DCB* dcb = NULL;
|
||||
bool succp = false;
|
||||
|
||||
/*<
|
||||
/**
|
||||
* Perform a dirty read to see if there is anything in the queue.
|
||||
* This avoids threads hitting the queue spinlock when the queue
|
||||
* is empty. This will really help when the only entry is being
|
||||
@ -428,11 +430,19 @@ bool succp = false;
|
||||
ptr = zombies;
|
||||
lptr = NULL;
|
||||
while (ptr)
|
||||
{
|
||||
bitmask_clear(&ptr->memdata.bitmask, threadid);
|
||||
{
|
||||
CHK_DCB(ptr);
|
||||
/** Don't clear the bit from DCB the user currently uses */
|
||||
if (dcb_in_use == NULL || ptr != dcb_in_use)
|
||||
{
|
||||
bitmask_clear(&ptr->memdata.bitmask, threadid);
|
||||
}
|
||||
if (ptr == dcb_in_use)
|
||||
ss_dassert(!bitmask_isallclear(&ptr->memdata.bitmask));
|
||||
|
||||
if (bitmask_isallclear(&ptr->memdata.bitmask))
|
||||
{
|
||||
/*<
|
||||
/**
|
||||
* Remove the DCB from the zombie queue
|
||||
* and call the final free routine for the
|
||||
* DCB
|
||||
@ -480,13 +490,14 @@ bool succp = false;
|
||||
spinlock_release(&zombiespin);
|
||||
|
||||
dcb = dcb_list;
|
||||
/*< Close, and set DISCONNECTED victims */
|
||||
/** Close, and set DISCONNECTED victims */
|
||||
while (dcb != NULL) {
|
||||
DCB* dcb_next = NULL;
|
||||
int rc = 0;
|
||||
/*<
|
||||
* Close file descriptor and move to clean-up phase.
|
||||
*/
|
||||
ss_dassert(dcb_in_use != dcb);
|
||||
rc = close(dcb->fd);
|
||||
|
||||
if (rc < 0) {
|
||||
@ -1928,7 +1939,7 @@ int rval = 0;
|
||||
* and instead implements a queuing mechanism in which nested events are
|
||||
* queued on the DCB such that when the thread processing the first event
|
||||
* returns it will read the queued event and process it. This allows the
|
||||
* thread that woudl otherwise have to wait to process the nested event
|
||||
* thread that would otherwise have to wait to process the nested event
|
||||
* to return immediately and and process other events.
|
||||
*
|
||||
* @param dcb The DCB that has data available
|
||||
@ -1945,7 +1956,7 @@ dcb_pollin(DCB *dcb, int thread_id)
|
||||
if (dcb->readcheck)
|
||||
{
|
||||
dcb->stats.n_readrechecks++;
|
||||
dcb_process_zombies(thread_id);
|
||||
dcb_process_zombies(thread_id, dcb);
|
||||
}
|
||||
dcb->readcheck = 0;
|
||||
spinlock_release(&dcb->pollinlock);
|
||||
@ -1988,7 +1999,7 @@ dcb_pollout(DCB *dcb, int thread_id)
|
||||
do {
|
||||
if (dcb->writecheck)
|
||||
{
|
||||
dcb_process_zombies(thread_id);
|
||||
dcb_process_zombies(thread_id, dcb);
|
||||
dcb->stats.n_writerechecks++;
|
||||
}
|
||||
dcb->writecheck = 0;
|
||||
|
@ -633,12 +633,12 @@ DCB *zombies = NULL;
|
||||
} /*< for */
|
||||
no_op = FALSE;
|
||||
}
|
||||
process_zombies:
|
||||
process_zombies:
|
||||
if (thread_data)
|
||||
{
|
||||
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
||||
}
|
||||
zombies = dcb_process_zombies(thread_id);
|
||||
zombies = dcb_process_zombies(thread_id, NULL);
|
||||
|
||||
if (zombies == NULL) {
|
||||
process_zombies_only = false;
|
||||
|
@ -289,7 +289,7 @@ DCB *dcb_clone(DCB *);
|
||||
int dcb_read(DCB *, GWBUF **);
|
||||
int dcb_drain_writeq(DCB *);
|
||||
void dcb_close(DCB *);
|
||||
DCB *dcb_process_zombies(int); /* Process Zombies */
|
||||
DCB *dcb_process_zombies(int, DCB*); /* Process Zombies except the one behind the pointer */
|
||||
void printAllDCBs(); /* Debug to print all DCB in the system */
|
||||
void printDCB(DCB *); /* Debug print routine */
|
||||
void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
|
||||
|
@ -23,8 +23,15 @@
|
||||
|
||||
// network buffer is 32K
|
||||
#define MAX_BUFFER_SIZE 32768
|
||||
// socket send buffer for backend
|
||||
#define GW_BACKEND_SO_SNDBUF 1024
|
||||
|
||||
/**
|
||||
* Configuration for send and receive socket buffer sizes for
|
||||
* backend and cleint connections.
|
||||
*/
|
||||
#define GW_BACKEND_SO_SNDBUF 32768
|
||||
#define GW_BACKEND_SO_RCVBUF 32768
|
||||
#define GW_CLIENT_SO_SNDBUF 32768
|
||||
#define GW_CLIENT_SO_RCVBUF 32768
|
||||
|
||||
#define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR)
|
||||
#define GW_MYSQL_LOOP_TIMEOUT 300000000
|
||||
|
@ -82,10 +82,6 @@
|
||||
#endif
|
||||
|
||||
#define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR)
|
||||
// network buffer is 32K
|
||||
#define MAX_BUFFER_SIZE 32768
|
||||
// socket send buffer for backend
|
||||
#define GW_BACKEND_SO_SNDBUF 1024
|
||||
#define SMALL_CHUNK 1024
|
||||
#define MAX_CHUNK SMALL_CHUNK * 8 * 4
|
||||
#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10)
|
||||
|
@ -1180,7 +1180,10 @@ int gw_MySQLAccept(DCB *listener)
|
||||
conn_open[c_sock] = true;
|
||||
#endif
|
||||
/* set nonblocking */
|
||||
sendbuf = GW_BACKEND_SO_SNDBUF;
|
||||
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
|
||||
sendbuf = GW_BACKEND_SO_RCVBUF;
|
||||
setsockopt(c_sock, SOL_SOCKET, SO_RCVBUF, &sendbuf, optlen);
|
||||
setnonblocking(c_sock);
|
||||
|
||||
client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
|
||||
|
@ -34,6 +34,7 @@
|
||||
*
|
||||
*/
|
||||
|
||||
#include <gw.h>
|
||||
#include "mysql_client_server_protocol.h"
|
||||
#include <skygw_types.h>
|
||||
#include <skygw_utils.h>
|
||||
@ -741,6 +742,7 @@ int gw_do_connect_to_backend(
|
||||
struct sockaddr_in serv_addr;
|
||||
int rv;
|
||||
int so = 0;
|
||||
int bufsize;
|
||||
|
||||
memset(&serv_addr, 0, sizeof serv_addr);
|
||||
serv_addr.sin_family = AF_INET;
|
||||
@ -764,6 +766,10 @@ int gw_do_connect_to_backend(
|
||||
/* prepare for connect */
|
||||
setipaddress(&serv_addr.sin_addr, host);
|
||||
serv_addr.sin_port = htons(port);
|
||||
bufsize = GW_CLIENT_SO_SNDBUF;
|
||||
setsockopt(so, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
|
||||
bufsize = GW_CLIENT_SO_RCVBUF;
|
||||
setsockopt(so, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
|
||||
/* set socket to as non-blocking here */
|
||||
setnonblocking(so);
|
||||
rv = connect(so, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
|
||||
|
@ -1674,7 +1674,6 @@ static int routeQuery(
|
||||
{
|
||||
rses_is_closed = true;
|
||||
}
|
||||
|
||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||
|
||||
packet = GWBUF_DATA(querybuf);
|
||||
@ -1702,7 +1701,6 @@ static int routeQuery(
|
||||
(rses_is_closed ? "Router was closed" :
|
||||
"Router has no backend servers where to "
|
||||
"route to"))));
|
||||
free(querybuf);
|
||||
}
|
||||
goto retblock;
|
||||
}
|
||||
@ -2195,7 +2193,17 @@ static void clientReply (
|
||||
goto lock_failed;
|
||||
}
|
||||
bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
|
||||
|
||||
|
||||
#if !defined(FOR_BUG548_FIX_ONLY)
|
||||
/** This makes the issue becoming visible in poll.c */
|
||||
if (bref == NULL)
|
||||
{
|
||||
/** Unlock router session */
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
goto lock_failed;
|
||||
}
|
||||
#endif
|
||||
|
||||
CHK_BACKEND_REF(bref);
|
||||
scur = &bref->bref_sescmd_cur;
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user