Merge branch 'develop' into 1.2.1-binlog_router_trx

This commit is contained in:
MassimilianoPinto
2015-08-24 10:20:45 +02:00
4 changed files with 197 additions and 155 deletions

View File

@ -106,7 +106,7 @@ static inline bool dcb_write_parameter_check(DCB *dcb, GWBUF *queue);
static inline void dcb_write_fake_code(DCB *dcb);
#endif
static inline void dcb_write_when_already_queued(DCB *dcb, GWBUF *queue);
static int dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno);
static void dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno);
static inline void dcb_write_tidy_up(DCB *dcb, bool below_water);
static int dcb_write_SSL_error_report (DCB *dcb, int ret);
@ -1147,7 +1147,8 @@ int below_water;
if (written < 0)
{
int rv = dcb_log_write_failure(dcb, queue, errno);
int saved_errno = errno;
dcb_log_write_failure(dcb, queue, saved_errno);
/*<
* What wasn't successfully written is stored to write queue
@ -1161,7 +1162,9 @@ int below_water;
/** Return 1 if the write failure was due to EWOULDBLOCK or EAGAIN.
The rest of the buffer will be written once an EPOLL_OUT event
arrives.*/
return rv == 0 ? 1 : 0;
return saved_errno == 0 ||
saved_errno == EAGAIN ||
saved_errno == EWOULDBLOCK;
}
/*
* Pull the number of bytes we have written from
@ -1311,10 +1314,9 @@ dcb_write_when_already_queued(DCB *dcb, GWBUF *queue)
* @param queue Queue of buffers to write
* @param eno Error number for logging
*/
static int
static void
dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno)
{
int rval = 0;
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
{
if (eno == EPIPE)
@ -1330,7 +1332,6 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno)
dcb->fd,
eno,
strerror(eno))));
rval = -1;
}
}
@ -1350,7 +1351,6 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno)
dcb->fd,
eno,
strerror(eno))));
rval = -1;
}
@ -1383,10 +1383,8 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno)
dcb_isclient(dcb) ? "client" : "backend server",
eno,
strerror(eno))));
rval = -1;
}
}
return rval;
}
/**

View File

@ -71,9 +71,11 @@ int max_poll_sleep;
* 24/09/14 Mark Riddoch Introduction of the event queue for processing the
* incoming events rather than processing them immediately
* in the loop after the epoll_wait. This allows for better
* thread utilisaiton and fairer scheduling of the event
* thread utilisation and fairer scheduling of the event
* processing.
* 07/07/15 Martin Brampton Simplified add and remove DCB, improve error handling.
* 23/08/15 Martin Brampton Provisionally added test so only DCB with a
* session link can be added to the poll list
*
* @endverbatim
*/
@ -96,7 +98,7 @@ static int process_pollq(int thread_id);
static void poll_add_event_to_dcb(DCB* dcb, GWBUF* buf, __uint32_t ev);
DCB *eventq = NULL;
DCB *eventq = NULL;
SPINLOCK pollqlock = SPINLOCK_INIT;
/**
@ -104,13 +106,13 @@ SPINLOCK pollqlock = SPINLOCK_INIT;
* poll completion, a value of 1 or less is the ideal.
*/
static double load_average = 0.0;
static int load_samples = 0;
static int load_nfds = 0;
static int load_samples = 0;
static int load_nfds = 0;
static double current_avg = 0.0;
static double *avg_samples = NULL;
static int *evqp_samples = NULL;
static int next_sample = 0;
static int n_avg_samples;
static int *evqp_samples = NULL;
static int next_sample = 0;
static int n_avg_samples;
/* Thread statistics data */
static int n_threads; /*< No. of threads */
@ -128,9 +130,9 @@ typedef enum { THREAD_STOPPED, THREAD_IDLE,
*/
typedef struct {
THREAD_STATE state; /*< Current thread state */
int n_fds; /*< No. of descriptors thread is processing */
DCB *cur_dcb; /*< Current DCB being processed */
uint32_t event; /*< Current event being processed */
int n_fds; /*< No. of descriptors thread is processing */
DCB *cur_dcb; /*< Current DCB being processed */
uint32_t event; /*< Current event being processed */
} THREAD_DATA;
static THREAD_DATA *thread_data = NULL; /*< Status of each thread */
@ -254,12 +256,12 @@ int i;
int
poll_add_dcb(DCB *dcb)
{
int rc = -1;
dcb_state_t old_state = dcb->state;
dcb_state_t new_state;
struct epoll_event ev;
int rc = -1;
dcb_state_t old_state = dcb->state;
dcb_state_t new_state;
struct epoll_event ev;
CHK_DCB(dcb);
CHK_DCB(dcb);
#ifdef EPOLLRDHUP
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
@ -268,65 +270,86 @@ poll_add_dcb(DCB *dcb)
#endif
ev.data.ptr = dcb;
/*<
* Choose new state according to the role of dcb.
*/
spinlock_acquire(&dcb->dcb_initlock);
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) {
new_state = DCB_STATE_POLLING;
} else {
ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
new_state = DCB_STATE_LISTENING;
}
/*
* Check DCB current state seems sensible
*/
if (DCB_STATE_DISCONNECTED == dcb->state
|| DCB_STATE_ZOMBIE == dcb->state
|| DCB_STATE_UNDEFINED == dcb->state)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this should be impossible, crashing.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state))));
raise(SIGABRT);
}
if (DCB_STATE_POLLING == dcb->state
|| DCB_STATE_LISTENING == dcb->state)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this is probably an error, not crashing.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state))));
}
dcb->state = new_state;
spinlock_release(&dcb->dcb_initlock);
/*
* The only possible failure that will not cause a crash is
* running out of system resources.
*/
rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev);
if (rc)
{
rc = poll_resolve_error(dcb, errno, true);
}
if (0 == rc)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_add_dcb] Added dcb %p in state %s to poll set.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state))));
}
else dcb->state = old_state;
return rc;
/*<
* Choose new state according to the role of dcb.
*/
spinlock_acquire(&dcb->dcb_initlock);
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER)
{
new_state = DCB_STATE_POLLING;
}
else
{
ss_dassert(dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER);
new_state = DCB_STATE_LISTENING;
}
/*
* Check DCB current state seems sensible
*/
if (DCB_STATE_DISCONNECTED == dcb->state
|| DCB_STATE_ZOMBIE == dcb->state
|| DCB_STATE_UNDEFINED == dcb->state)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this should be impossible, crashing.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state))));
raise(SIGABRT);
}
/*
* This test could be wrong. On the face of it, we don't want to add a
* DCB to the poll list if it is not linked to a session because the code
* that handles events will expect to find a session. Test added by
* Martin as an experiment on 23 August 2015
*/
if (false && NULL == dcb->session)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [%s] Error : Attempt to add dcb %p "
"to poll list but it is not linked to a session, crashing.",
__func__,
pthread_self(),
dcb)));
raise(SIGABRT);
}
if (DCB_STATE_POLLING == dcb->state
|| DCB_STATE_LISTENING == dcb->state)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [poll_add_dcb] Error : existing state of dcb %p "
"is %s, but this is probably an error, not crashing.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state))));
}
dcb->state = new_state;
spinlock_release(&dcb->dcb_initlock);
/*
* The only possible failure that will not cause a crash is
* running out of system resources.
*/
rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev);
if (rc)
{
/* Some errors are actually considered acceptable */
rc = poll_resolve_error(dcb, errno, true);
}
if (0 == rc)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_add_dcb] Added dcb %p in state %s to poll set.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state))));
}
else dcb->state = old_state;
return rc;
}
/**
@ -381,8 +404,8 @@ poll_remove_dcb(DCB *dcb)
dcbfd = dcb->fd;
spinlock_release(&dcb->dcb_initlock);
if (dcbfd > 0)
{
rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
{
rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcbfd, &ev);
/**
* The poll_resolve_error function will always
* return 0 or crash. So if it returns non-zero result,

View File

@ -26,6 +26,7 @@
* 17/06/13 Mark Riddoch Initial implementation
* 02/09/13 Massimiliano Pinto Added session refcounter
* 29/05/14 Mark Riddoch Addition of filter mechanism
* 23/08/15 Martin Brampton Tidying; slight improvement in safety
*
* @endverbatim
*/
@ -72,65 +73,66 @@ static int session_setup_filters(SESSION *session);
SESSION *
session_alloc(SERVICE *service, DCB *client_dcb)
{
SESSION *session;
SESSION *session;
session = (SESSION *)calloc(1, sizeof(SESSION));
ss_info_dassert(session != NULL,
"Allocating memory for session failed.");
session = (SESSION *)calloc(1, sizeof(SESSION));
ss_info_dassert(session != NULL,
"Allocating memory for session failed.");
if (session == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to allocate memory for "
"session object due error %d, %s.",
errno,
strerror(errno))));
if (client_dcb->data && !DCB_IS_CLONE(client_dcb))
{
free(client_dcb->data);
client_dcb->data = NULL;
}
goto return_session;
if (session == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to allocate memory for "
"session object due error %d, %s.",
errno,
strerror(errno))));
if (client_dcb->data && !DCB_IS_CLONE(client_dcb))
{
void *clientdata = client_dcb->data;
client_dcb->data = NULL;
free(clientdata);
}
goto return_session;
}
#if defined(SS_DEBUG)
session->ses_chk_top = CHK_NUM_SESSION;
session->ses_chk_tail = CHK_NUM_SESSION;
session->ses_chk_top = CHK_NUM_SESSION;
session->ses_chk_tail = CHK_NUM_SESSION;
#endif
if (DCB_IS_CLONE(client_dcb))
{
session->ses_is_child = true;
}
spinlock_init(&session->ses_lock);
/*<
* Prevent backend threads from accessing before session is completely
* initialized.
*/
spinlock_acquire(&session->ses_lock);
session->service = service;
if (DCB_IS_CLONE(client_dcb))
{
session->ses_is_child = true;
}
spinlock_init(&session->ses_lock);
/*<
* Prevent backend threads from accessing before session is completely
* initialized.
*/
spinlock_acquire(&session->ses_lock);
session->service = service;
session->client = client_dcb;
session->n_filters = 0;
memset(&session->stats, 0, sizeof(SESSION_STATS));
session->stats.connect = time(0);
session->state = SESSION_STATE_ALLOC;
/*<
/*<
* Associate the session to the client DCB and set the reference count on
* the session to indicate that there is a single reference to the
* session. There is no need to protect this or use atomic add as the
* session has not been made available to the other threads at this
* point.
*/
session->data = client_dcb->data;
client_dcb->session = session;
session->refcount = 1;
/*<
* This indicates that session is ready to be shared with backend
* DCBs. Note that this doesn't mean that router is initialized yet!
*/
session->state = SESSION_STATE_READY;
* session. There is no need to protect this or use atomic add as the
* session has not been made available to the other threads at this
* point.
*/
session->data = client_dcb->data;
client_dcb->session = session;
session->refcount = 1;
/*<
* This indicates that session is ready to be shared with backend
* DCBs. Note that this doesn't mean that router is initialized yet!
*/
session->state = SESSION_STATE_READY;
/*< Release session lock */
spinlock_release(&session->ses_lock);
/*< Release session lock */
spinlock_release(&session->ses_lock);
/*
* Only create a router session if we are not the listening
@ -140,36 +142,34 @@ session_alloc(SERVICE *service, DCB *client_dcb)
*
* Router session creation may create other DCBs that link to the
* session, therefore it is important that the session lock is
* relinquished beforethe router call.
* relinquished before the router call.
*/
if (client_dcb->state != DCB_STATE_LISTENING &&
client_dcb->dcb_role != DCB_ROLE_INTERNAL)
client_dcb->dcb_role != DCB_ROLE_INTERNAL)
{
session->router_session =
service->router->newSession(service->router_instance,
session);
if (session->router_session == NULL)
service->router->newSession(service->router_instance, session);
if (session->router_session == NULL)
{
/**
* Inform other threads that session is closing.
*/
session->state = SESSION_STATE_STOPPING;
/*<
* Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL.
*/
session->client = NULL;
session_free(session);
client_dcb->session = NULL;
session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to create %s session.",
service->name)));
/**
* Inform other threads that session is closing.
*/
session->state = SESSION_STATE_STOPPING;
/*<
* Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL.
*/
session->client = NULL;
session_free(session);
client_dcb->session = NULL;
session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to create %s session.",
service->name)));
goto return_session;
}
goto return_session;
}
/*
* Pending filter chain being setup set the head of the chain to
* be the router. As filters are inserted the current head will

View File

@ -1588,6 +1588,13 @@ void check_drop_tmp_table(
return;
}
if(router_cli_ses->rses_master_ref == NULL)
{
skygw_log_write(LE,"[%s] Error: Master server reference is NULL.",
__FUNCTION__);
return;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
@ -1673,6 +1680,13 @@ static skygw_query_type_t is_read_tmp_table(
return type;
}
if(router_cli_ses->rses_master_ref == NULL)
{
skygw_log_write(LE,"[%s] Error: Master server reference is NULL.",
__FUNCTION__);
return type;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
@ -1768,6 +1782,13 @@ static void check_create_tmp_table(
return;
}
if(router_cli_ses->rses_master_ref == NULL)
{
skygw_log_write(LE,"[%s] Error: Master server reference is NULL.",
__FUNCTION__);
return;
}
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;