Reduce complexity.
This commit is contained in:
parent
8376bbf3e6
commit
015cb890d4
@ -98,6 +98,14 @@ static int dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf
|
||||
static int dcb_isvalid_nolock(DCB *dcb);
|
||||
static void dcb_close_finish(DCB *);
|
||||
static bool dcb_maybe_add_persistent(DCB *);
|
||||
static inline bool dcb_write_parameter_check(DCB *dcb, GWBUF *queue);
|
||||
#if defined(FAKE_CODE)
|
||||
static inline void dcb_write_fake_code(DCB *dcb);
|
||||
#endif
|
||||
static inline void dcb_write_when_already_queued(DCB *dcb, GWBUF *queue);
|
||||
static void dcb_log_write_failure(DCB *dcb, GWBUF *queue, int errno);
|
||||
static inline void dcb_write_tidy_up(DCB *dcb, bool below_water);
|
||||
static int dcb_write_SSL_error_report (DCB *dcb, int ret);
|
||||
|
||||
size_t dcb_get_session_id(
|
||||
DCB *dcb)
|
||||
@ -156,65 +164,65 @@ dcb_alloc(dcb_role_t role)
|
||||
{
|
||||
DCB *newdcb;
|
||||
|
||||
if ((newdcb = calloc(1, sizeof(DCB))) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
if ((newdcb = calloc(1, sizeof(DCB))) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
newdcb->dcb_chk_top = CHK_NUM_DCB;
|
||||
newdcb->dcb_chk_tail = CHK_NUM_DCB;
|
||||
|
||||
newdcb->dcb_errhandle_called = false;
|
||||
newdcb->dcb_errhandle_called = false;
|
||||
newdcb->dcb_role = role;
|
||||
spinlock_init(&newdcb->dcb_initlock);
|
||||
spinlock_init(&newdcb->writeqlock);
|
||||
spinlock_init(&newdcb->delayqlock);
|
||||
spinlock_init(&newdcb->authlock);
|
||||
spinlock_init(&newdcb->cb_lock);
|
||||
spinlock_init(&newdcb->pollinlock);
|
||||
spinlock_init(&newdcb->polloutlock);
|
||||
newdcb->pollinbusy = 0;
|
||||
newdcb->readcheck = 0;
|
||||
newdcb->polloutbusy = 0;
|
||||
newdcb->writecheck = 0;
|
||||
spinlock_init(&newdcb->writeqlock);
|
||||
spinlock_init(&newdcb->delayqlock);
|
||||
spinlock_init(&newdcb->authlock);
|
||||
spinlock_init(&newdcb->cb_lock);
|
||||
spinlock_init(&newdcb->pollinlock);
|
||||
spinlock_init(&newdcb->polloutlock);
|
||||
newdcb->pollinbusy = 0;
|
||||
newdcb->readcheck = 0;
|
||||
newdcb->polloutbusy = 0;
|
||||
newdcb->writecheck = 0;
|
||||
newdcb->fd = DCBFD_CLOSED;
|
||||
|
||||
newdcb->evq.next = NULL;
|
||||
newdcb->evq.prev = NULL;
|
||||
newdcb->evq.pending_events = 0;
|
||||
newdcb->evq.processing = 0;
|
||||
spinlock_init(&newdcb->evq.eventqlock);
|
||||
newdcb->evq.next = NULL;
|
||||
newdcb->evq.prev = NULL;
|
||||
newdcb->evq.pending_events = 0;
|
||||
newdcb->evq.processing = 0;
|
||||
spinlock_init(&newdcb->evq.eventqlock);
|
||||
|
||||
memset(&newdcb->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
|
||||
newdcb->state = DCB_STATE_ALLOC;
|
||||
bitmask_init(&newdcb->memdata.bitmask);
|
||||
newdcb->writeqlen = 0;
|
||||
newdcb->high_water = 0;
|
||||
newdcb->low_water = 0;
|
||||
memset(&newdcb->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
|
||||
newdcb->state = DCB_STATE_ALLOC;
|
||||
bitmask_init(&newdcb->memdata.bitmask);
|
||||
newdcb->writeqlen = 0;
|
||||
newdcb->high_water = 0;
|
||||
newdcb->low_water = 0;
|
||||
newdcb->session = NULL;
|
||||
newdcb->server = NULL;
|
||||
newdcb->service = NULL;
|
||||
newdcb->next = NULL;
|
||||
newdcb->next = NULL;
|
||||
newdcb->nextpersistent = NULL;
|
||||
newdcb->persistentstart = 0;
|
||||
newdcb->callbacks = NULL;
|
||||
newdcb->data = NULL;
|
||||
newdcb->callbacks = NULL;
|
||||
newdcb->data = NULL;
|
||||
|
||||
newdcb->remote = NULL;
|
||||
newdcb->user = NULL;
|
||||
newdcb->flags = 0;
|
||||
newdcb->remote = NULL;
|
||||
newdcb->user = NULL;
|
||||
newdcb->flags = 0;
|
||||
|
||||
spinlock_acquire(&dcbspin);
|
||||
if (allDCBs == NULL)
|
||||
allDCBs = newdcb;
|
||||
else
|
||||
{
|
||||
DCB *ptr = allDCBs;
|
||||
while (ptr->next)
|
||||
ptr = ptr->next;
|
||||
ptr->next = newdcb;
|
||||
}
|
||||
spinlock_release(&dcbspin);
|
||||
return newdcb;
|
||||
spinlock_acquire(&dcbspin);
|
||||
if (allDCBs == NULL)
|
||||
allDCBs = newdcb;
|
||||
else
|
||||
{
|
||||
DCB *ptr = allDCBs;
|
||||
while (ptr->next)
|
||||
ptr = ptr->next;
|
||||
ptr->next = newdcb;
|
||||
}
|
||||
spinlock_release(&dcbspin);
|
||||
return newdcb;
|
||||
}
|
||||
|
||||
|
||||
@ -233,7 +241,7 @@ dcb_free(DCB *dcb)
|
||||
"Error : Attempt to free a DCB via dcb_free "
|
||||
"that has been associated with a descriptor.")));
|
||||
}
|
||||
assert(dcb->fd == DCBFD_CLOSED);
|
||||
raise(SIGABRT);
|
||||
dcb_final_free(dcb);
|
||||
}
|
||||
|
||||
@ -1237,17 +1245,126 @@ int w;
|
||||
int saved_errno = 0;
|
||||
int below_water;
|
||||
|
||||
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
|
||||
ss_dassert(queue != NULL);
|
||||
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
|
||||
// The following guarantees that queue is not NULL
|
||||
if (!dcb_write_parameter_check(dcb, queue)) return 0;
|
||||
|
||||
spinlock_acquire(&dcb->writeqlock);
|
||||
|
||||
if (dcb->fd <= 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write failed, dcb is %s.",
|
||||
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not writable")));
|
||||
return 0;
|
||||
}
|
||||
if (dcb->writeq)
|
||||
{
|
||||
dcb_write_when_already_queued(dcb, queue);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Loop over the buffer chain that has been passed to us
|
||||
* from the reading side.
|
||||
* Send as much of the data in that chain as possible and
|
||||
* add any balance to the write queue.
|
||||
*/
|
||||
while (queue != NULL)
|
||||
{
|
||||
int qlen;
|
||||
#if defined(FAKE_CODE)
|
||||
dcb_write_fake_code(dcb);
|
||||
#endif /* FAKE_CODE */
|
||||
qlen = GWBUF_LENGTH(queue);
|
||||
GW_NOINTR_CALL(
|
||||
w = gw_write(dcb, GWBUF_DATA(queue), qlen);
|
||||
dcb->stats.n_writes++;
|
||||
);
|
||||
|
||||
if (w < 0)
|
||||
{
|
||||
dcb_log_write_failure(dcb, queue, errno);
|
||||
atomic_add(&dcb->writeqlen, gwbuf_length(queue));
|
||||
dcb->stats.n_buffered++;
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
return 0;
|
||||
}
|
||||
/*
|
||||
* Pull the number of bytes we have written from
|
||||
* queue with have.
|
||||
*/
|
||||
queue = gwbuf_consume(queue, w);
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Wrote %d Bytes to dcb %p in "
|
||||
"state %s fd %d",
|
||||
pthread_self(),
|
||||
w,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
} /*< while (queue != NULL) */
|
||||
/*<
|
||||
* What wasn't successfully written is stored to write queue
|
||||
* for suspended write.
|
||||
*/
|
||||
dcb->writeq = queue;
|
||||
|
||||
} /* if (dcb->writeq) */
|
||||
|
||||
dcb_write_tidy_up(dcb, below_water);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
#if defined(FAKE_CODE)
|
||||
/**
|
||||
* Check the parameters for dcb_write
|
||||
*
|
||||
* @param dcb The DCB of the client
|
||||
* @param queue Queue of buffers to write
|
||||
* @return true if parameters acceptable, false otherwise
|
||||
*/
|
||||
static inline void
|
||||
dcb_write_fake_code(DCB *dcb)
|
||||
{
|
||||
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER &&
|
||||
dcb->session != NULL)
|
||||
{
|
||||
if (dcb_isclient(dcb) && fail_next_client_fd)
|
||||
{
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_client_fd = false;
|
||||
}
|
||||
else if (!dcb_isclient(dcb) &&
|
||||
fail_next_backend_fd)
|
||||
{
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_backend_fd = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif /* FAKE_CODE */
|
||||
|
||||
/**
|
||||
* Check the parameters for dcb_write
|
||||
*
|
||||
* @param dcb The DCB of the client
|
||||
* @param queue Queue of buffers to write
|
||||
* @return true if parameters acceptable, false otherwise
|
||||
*/
|
||||
static inline bool
|
||||
dcb_write_parameter_check(DCB *dcb, GWBUF *queue)
|
||||
{
|
||||
if (dcb->fd <= 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write failed, dcb is %s.",
|
||||
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not writable")));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (queue == NULL) return false;
|
||||
|
||||
if (dcb->session == NULL || dcb->session->state != SESSION_STATE_STOPPING)
|
||||
{
|
||||
/**
|
||||
* SESSION_STATE_STOPPING means that one of the backends is closing
|
||||
* the router session. Some backends may have not completed
|
||||
@ -1256,210 +1373,159 @@ int below_water;
|
||||
* before router's closeSession is called and that tells that DCB may
|
||||
* still be writable.
|
||||
*/
|
||||
if (queue == NULL ||
|
||||
(dcb->state != DCB_STATE_ALLOC &&
|
||||
dcb->state != DCB_STATE_POLLING &&
|
||||
dcb->state != DCB_STATE_LISTENING &&
|
||||
dcb->state != DCB_STATE_NOPOLLING &&
|
||||
(dcb->session == NULL ||
|
||||
dcb->session->state != SESSION_STATE_STOPPING)))
|
||||
if (dcb->state != DCB_STATE_ALLOC &&
|
||||
|
||||
dcb->state != DCB_STATE_POLLING &&
|
||||
dcb->state != DCB_STATE_LISTENING &&
|
||||
dcb->state != DCB_STATE_NOPOLLING)
|
||||
|
||||
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write aborted to dcb %p because "
|
||||
"it is in state %s",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
//ss_dassert(false);
|
||||
return 0;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write aborted to dcb %p because "
|
||||
"it is in state %s",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
//ss_dassert(false);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle writing when there is already queued data
|
||||
*
|
||||
* @param dcb The DCB of the client
|
||||
* @param queue Queue of buffers to write
|
||||
*/
|
||||
static inline void
|
||||
dcb_write_when_already_queued(DCB *dcb, GWBUF *queue)
|
||||
{
|
||||
/*
|
||||
* We have some queued data, so add our data to
|
||||
* the write queue and return.
|
||||
* The assumption is that there will be an EPOLLOUT
|
||||
* event to drain what is already queued. We are protected
|
||||
* by the spinlock, which will also be acquired by the
|
||||
* the routine that drains the queue data, so we should
|
||||
* not have a race condition on the event.
|
||||
*/
|
||||
atomic_add(&dcb->writeqlen, gwbuf_length(queue));
|
||||
dcb->writeq = gwbuf_append(dcb->writeq, queue);
|
||||
dcb->stats.n_buffered++;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Append to writequeue. %d writes "
|
||||
"buffered for dcb %p in state %s fd %d",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Debug log write failure, except when it is COM_QUIT
|
||||
*
|
||||
* @param dcb The DCB of the client
|
||||
* @param queue Queue of buffers to write
|
||||
* @return 0 on failure, 1 on success
|
||||
*/
|
||||
static void
|
||||
dcb_log_write_failure(DCB *dcb, GWBUF *queue, int errno)
|
||||
{
|
||||
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
|
||||
{
|
||||
if (errno == EPIPE)
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due errno %d, %s",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
errno,
|
||||
strerror(errno))));
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_ERROR))
|
||||
{
|
||||
if (errno != EPIPE &&
|
||||
errno != EAGAIN &&
|
||||
errno != EWOULDBLOCK)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write to dcb %p in "
|
||||
"state %s fd %d failed due "
|
||||
"errno %d, %s",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
errno,
|
||||
strerror(errno))));
|
||||
|
||||
}
|
||||
|
||||
spinlock_acquire(&dcb->writeqlock);
|
||||
}
|
||||
|
||||
bool dolog = true;
|
||||
|
||||
if (dcb->writeq != NULL)
|
||||
{
|
||||
/*
|
||||
* We have some queued data, so add our data to
|
||||
* the write queue and return.
|
||||
* The assumption is that there will be an EPOLLOUT
|
||||
* event to drain what is already queued. We are protected
|
||||
* by the spinlock, which will also be acquired by the
|
||||
* the routine that drains the queue data, so we should
|
||||
* not have a race condition on the event.
|
||||
*/
|
||||
if (queue)
|
||||
{
|
||||
int qlen;
|
||||
if (errno != 0 &&
|
||||
errno != EAGAIN &&
|
||||
errno != EWOULDBLOCK)
|
||||
{
|
||||
/**
|
||||
* Do not log if writing COM_QUIT to backend failed.
|
||||
*/
|
||||
if (GWBUF_IS_TYPE_MYSQL(queue))
|
||||
{
|
||||
uint8_t* data = GWBUF_DATA(queue);
|
||||
|
||||
qlen = gwbuf_length(queue);
|
||||
atomic_add(&dcb->writeqlen, qlen);
|
||||
dcb->writeq = gwbuf_append(dcb->writeq, queue);
|
||||
dcb->stats.n_buffered++;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Append to writequeue. %d writes "
|
||||
"buffered for dcb %p in state %s fd %d",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Loop over the buffer chain that has been passed to us
|
||||
* from the reading side.
|
||||
* Send as much of the data in that chain as possible and
|
||||
* add any balance to the write queue.
|
||||
*/
|
||||
while (queue != NULL)
|
||||
{
|
||||
int qlen;
|
||||
#if defined(FAKE_CODE)
|
||||
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER &&
|
||||
dcb->session != NULL)
|
||||
{
|
||||
if (dcb_isclient(dcb) && fail_next_client_fd) {
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_client_fd = false;
|
||||
} else if (!dcb_isclient(dcb) &&
|
||||
fail_next_backend_fd)
|
||||
{
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_backend_fd = false;
|
||||
}
|
||||
}
|
||||
#endif /* FAKE_CODE */
|
||||
qlen = GWBUF_LENGTH(queue);
|
||||
GW_NOINTR_CALL(
|
||||
w = gw_write(dcb, GWBUF_DATA(queue), qlen);
|
||||
dcb->stats.n_writes++;
|
||||
);
|
||||
|
||||
if (w < 0)
|
||||
{
|
||||
saved_errno = errno;
|
||||
errno = 0;
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
|
||||
{
|
||||
if (saved_errno == EPIPE)
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due errno %d, %s",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
saved_errno,
|
||||
strerror(saved_errno))));
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_ERROR))
|
||||
{
|
||||
if (saved_errno != EPIPE &&
|
||||
saved_errno != EAGAIN &&
|
||||
saved_errno != EWOULDBLOCK)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write to dcb %p in "
|
||||
"state %s fd %d failed due "
|
||||
"errno %d, %s",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
saved_errno,
|
||||
strerror(saved_errno))));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
/*
|
||||
* Pull the number of bytes we have written from
|
||||
* queue with have.
|
||||
*/
|
||||
queue = gwbuf_consume(queue, w);
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Wrote %d Bytes to dcb %p in "
|
||||
"state %s fd %d",
|
||||
pthread_self(),
|
||||
w,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
} /*< while (queue != NULL) */
|
||||
/*<
|
||||
* What wasn't successfully written is stored to write queue
|
||||
* for suspended write.
|
||||
*/
|
||||
dcb->writeq = queue;
|
||||
|
||||
if (queue)
|
||||
{
|
||||
int qlen;
|
||||
|
||||
qlen = gwbuf_length(queue);
|
||||
atomic_add(&dcb->writeqlen, qlen);
|
||||
dcb->stats.n_buffered++;
|
||||
}
|
||||
} /* if (dcb->writeq) */
|
||||
|
||||
if (saved_errno != 0 &&
|
||||
queue != NULL &&
|
||||
saved_errno != EAGAIN &&
|
||||
saved_errno != EWOULDBLOCK)
|
||||
{
|
||||
bool dolog = true;
|
||||
|
||||
/**
|
||||
* Do not log if writing COM_QUIT to backend failed.
|
||||
*/
|
||||
if (GWBUF_IS_TYPE_MYSQL(queue))
|
||||
{
|
||||
uint8_t* data = GWBUF_DATA(queue);
|
||||
|
||||
if (data[4] == 0x01)
|
||||
{
|
||||
dolog = false;
|
||||
}
|
||||
}
|
||||
if (dolog)
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Writing to %s socket failed due %d, %s.",
|
||||
pthread_self(),
|
||||
dcb_isclient(dcb) ? "client" : "backend server",
|
||||
saved_errno,
|
||||
strerror(saved_errno))));
|
||||
}
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
return 0;
|
||||
}
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
|
||||
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
|
||||
{
|
||||
atomic_add(&dcb->stats.n_high_water, 1);
|
||||
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
|
||||
}
|
||||
|
||||
return 1;
|
||||
if (data[4] == 0x01)
|
||||
{
|
||||
dolog = false;
|
||||
}
|
||||
}
|
||||
if (dolog)
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Writing to %s socket failed due %d, %s.",
|
||||
pthread_self(),
|
||||
dcb_isclient(dcb) ? "client" : "backend server",
|
||||
errno,
|
||||
strerror(errno))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle writing when there is already queued data
|
||||
*
|
||||
* @param dcb The DCB of the client
|
||||
* @param below_water A boolean
|
||||
*/
|
||||
static inline void
|
||||
dcb_write_tidy_up (DCB *dcb, bool below_water)
|
||||
{
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
|
||||
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
|
||||
{
|
||||
atomic_add(&dcb->stats.n_high_water, 1);
|
||||
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* General purpose routine to write to an SSL enabled DCB
|
||||
*
|
||||
@ -1471,82 +1537,19 @@ int below_water;
|
||||
int
|
||||
dcb_write_SSL(DCB *dcb, GWBUF *queue)
|
||||
{
|
||||
int w;
|
||||
int saved_errno = 0;
|
||||
int below_water;
|
||||
int w;
|
||||
int saved_errno = 0;
|
||||
bool below_water;
|
||||
|
||||
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
|
||||
ss_dassert(queue != NULL);
|
||||
|
||||
if (dcb->fd <= 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write failed, dcb is %s.",
|
||||
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not writable")));
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* SESSION_STATE_STOPPING means that one of the backends is closing
|
||||
* the router session. Some backends may have not completed
|
||||
* authentication yet and thus they have no information about router
|
||||
* being closed. Session state is changed to SESSION_STATE_STOPPING
|
||||
* before router's closeSession is called and that tells that DCB may
|
||||
* still be writable.
|
||||
*/
|
||||
if (queue == NULL ||
|
||||
(dcb->state != DCB_STATE_ALLOC &&
|
||||
dcb->state != DCB_STATE_POLLING &&
|
||||
dcb->state != DCB_STATE_LISTENING &&
|
||||
dcb->state != DCB_STATE_NOPOLLING &&
|
||||
(dcb->session == NULL ||
|
||||
dcb->session->state != SESSION_STATE_STOPPING)))
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write aborted to dcb %p because "
|
||||
"it is in state %s",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
//ss_dassert(false);
|
||||
return 0;
|
||||
}
|
||||
// The following guarantees that queue is not NULL
|
||||
if (!dcb_write_parameter_check(dcb, queue)) return 0;
|
||||
|
||||
spinlock_acquire(&dcb->writeqlock);
|
||||
|
||||
if (dcb->writeq != NULL)
|
||||
if (dcb->writeq)
|
||||
{
|
||||
/*
|
||||
* We have some queued data, so add our data to
|
||||
* the write queue and return.
|
||||
* The assumption is that there will be an EPOLLOUT
|
||||
* event to drain what is already queued. We are protected
|
||||
* by the spinlock, which will also be acquired by the
|
||||
* the routine that drains the queue data, so we should
|
||||
* not have a race condition on the event.
|
||||
*/
|
||||
if (queue)
|
||||
{
|
||||
int qlen;
|
||||
|
||||
qlen = gwbuf_length(queue);
|
||||
atomic_add(&dcb->writeqlen, qlen);
|
||||
dcb->writeq = gwbuf_append(dcb->writeq, queue);
|
||||
dcb->stats.n_buffered++;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Append to writequeue. %d writes "
|
||||
"buffered for dcb %p in state %s fd %d",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
}
|
||||
dcb_write_when_already_queued(dcb, queue);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1558,163 +1561,145 @@ dcb_write_SSL(DCB *dcb, GWBUF *queue)
|
||||
*/
|
||||
while (queue != NULL)
|
||||
{
|
||||
int qlen;
|
||||
#if defined(FAKE_CODE)
|
||||
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER &&
|
||||
dcb->session != NULL)
|
||||
{
|
||||
if (dcb_isclient(dcb) && fail_next_client_fd) {
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_client_fd = false;
|
||||
} else if (!dcb_isclient(dcb) &&
|
||||
fail_next_backend_fd)
|
||||
{
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_backend_fd = false;
|
||||
}
|
||||
}
|
||||
dcb_write_fake_code(dcb);
|
||||
#endif /* FAKE_CODE */
|
||||
qlen = GWBUF_LENGTH(queue);
|
||||
do
|
||||
{
|
||||
w = gw_write_SSL(dcb->ssl, GWBUF_DATA(queue), qlen);
|
||||
w = gw_write_SSL(dcb->ssl, GWBUF_DATA(queue), GWBUF_LENGTH(queue));
|
||||
dcb->stats.n_writes++;
|
||||
|
||||
if (w <= 0)
|
||||
{
|
||||
int ssl_errno = SSL_get_error(dcb->ssl,w);
|
||||
int ssl_errno = dcb_write_SSL_error_report (dcb, w);
|
||||
if(ssl_errno != SSL_ERROR_WANT_WRITE)
|
||||
{
|
||||
atomic_add(&dcb->writeqlen, gwbuf_length(queue));
|
||||
dcb->stats.n_buffered++;
|
||||
dcb_write_tidy_up(dcb, below_water);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
} while(w <= 0);
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
|
||||
{
|
||||
switch(ssl_errno)
|
||||
{
|
||||
case SSL_ERROR_WANT_READ:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error SSL_ERROR_WANT_READ",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
break;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error SSL_ERROR_WANT_WRITE",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
break;
|
||||
default:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error %d",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,ssl_errno)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
/** Remove written bytes from the queue */
|
||||
queue = gwbuf_consume(queue, w);
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Wrote %d Bytes to dcb %p in "
|
||||
"state %s fd %d",
|
||||
pthread_self(),
|
||||
w,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
} /*< while (queue != NULL) */
|
||||
/*<
|
||||
* What wasn't successfully written is stored to write queue
|
||||
* for suspended write.
|
||||
*/
|
||||
dcb->writeq = queue;
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_ERROR) && ssl_errno != SSL_ERROR_WANT_WRITE)
|
||||
{
|
||||
if (ssl_errno == -1)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write to dcb %p in "
|
||||
"state %s fd %d failed due to "
|
||||
"SSL error %d",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
ssl_errno)));
|
||||
if(ssl_errno == SSL_ERROR_SSL || ssl_errno == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
if(ssl_errno == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
skygw_log_write(LE,"%d:%s",errno,strerror(errno));
|
||||
}
|
||||
do
|
||||
{
|
||||
char errbuf[140];
|
||||
ERR_error_string(ssl_errno,errbuf);
|
||||
skygw_log_write(LE,"%d:%s",ssl_errno,errbuf);
|
||||
}while((ssl_errno = ERR_get_error()) != 0);
|
||||
}
|
||||
}
|
||||
else if(w == 0)
|
||||
{
|
||||
do
|
||||
{
|
||||
char errbuf[140];
|
||||
ERR_error_string(ssl_errno,errbuf);
|
||||
skygw_log_write(LE,"%d:%s",ssl_errno,errbuf);
|
||||
}while((ssl_errno = ERR_get_error()) != 0);
|
||||
}
|
||||
}
|
||||
|
||||
if(ssl_errno != SSL_ERROR_WANT_WRITE)
|
||||
break;
|
||||
}
|
||||
}while(w <= 0);
|
||||
|
||||
if(w <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Remove written bytes from the queue */
|
||||
queue = gwbuf_consume(queue, w);
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Wrote %d Bytes to dcb %p in "
|
||||
"state %s fd %d",
|
||||
pthread_self(),
|
||||
w,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
}
|
||||
} /*< while (queue != NULL) */
|
||||
/*<
|
||||
* What wasn't successfully written is stored to write queue
|
||||
* for suspended write.
|
||||
*/
|
||||
dcb->writeq = queue;
|
||||
|
||||
if (queue)
|
||||
{
|
||||
int qlen;
|
||||
|
||||
qlen = gwbuf_length(queue);
|
||||
atomic_add(&dcb->writeqlen, qlen);
|
||||
dcb->stats.n_buffered++;
|
||||
}
|
||||
} /* if (dcb->writeq) */
|
||||
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
|
||||
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
|
||||
{
|
||||
atomic_add(&dcb->stats.n_high_water, 1);
|
||||
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
|
||||
}
|
||||
dcb_write_tidy_up(dcb, below_water);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* General purpose routine to write error reports for SSL writes
|
||||
*
|
||||
* @param dcb The DCB of the client
|
||||
* @param ret The SSL operation return code
|
||||
* @return The final SSL error number
|
||||
*/
|
||||
static int
|
||||
dcb_write_SSL_error_report (DCB *dcb, int ret)
|
||||
{
|
||||
int ssl_errno;
|
||||
|
||||
ssl_errno = SSL_get_error(dcb->ssl,ret);
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
|
||||
{
|
||||
switch(ssl_errno)
|
||||
{
|
||||
case SSL_ERROR_WANT_READ:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error SSL_ERROR_WANT_READ",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
break;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error SSL_ERROR_WANT_WRITE",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
break;
|
||||
default:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error %d",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,ssl_errno)));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_ERROR) && ssl_errno != SSL_ERROR_WANT_WRITE)
|
||||
{
|
||||
if (ssl_errno == -1)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write to dcb %p in "
|
||||
"state %s fd %d failed due to "
|
||||
"SSL error %d",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
ssl_errno)));
|
||||
if(ssl_errno == SSL_ERROR_SSL || ssl_errno == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
if(ssl_errno == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
skygw_log_write(LE,"%d:%s",errno,strerror(errno));
|
||||
}
|
||||
do
|
||||
{
|
||||
char errbuf[140];
|
||||
ERR_error_string(ssl_errno,errbuf);
|
||||
skygw_log_write(LE,"%d:%s",ssl_errno,errbuf);
|
||||
} while((ssl_errno = ERR_get_error()) != 0);
|
||||
}
|
||||
}
|
||||
else if(ret == 0)
|
||||
{
|
||||
do
|
||||
{
|
||||
char errbuf[140];
|
||||
ERR_error_string(ssl_errno,errbuf);
|
||||
skygw_log_write(LE,"%d:%s",ssl_errno,errbuf);
|
||||
} while((ssl_errno = ERR_get_error()) != 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling
|
||||
* of a socket and will try to send any buffered data from the write queue
|
||||
|
Loading…
x
Reference in New Issue
Block a user