Simplify DCB writing code

Due to the changes in the threading model, the DCB write code can be
simplified by a great amount.

Since only one thread can write to a DCB, it's safe to assume that no new
data is added to the write queue of a DCB while it is being drained. This
removes the need for the code that tracks whether a concurrent DCB write
attempt was made.

Because the high and low water callbacks weren't used by any module, it is
safe to remove them. They offer no real benefits over the drain callback.
This commit is contained in:
Markus Mäkelä
2017-02-21 10:27:07 +02:00
parent 68f99ae305
commit 4e223adeff
2 changed files with 41 additions and 187 deletions

View File

@ -187,8 +187,6 @@ typedef struct dcb
skygw_chk_t dcb_chk_top;
bool dcb_errhandle_called; /*< this can be called only once */
bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */
bool draining_flag; /**< Set while write queue is drained */
bool drain_called_while_busy; /**< Set as described */
dcb_role_t dcb_role;
DCBEVENTQ evq; /**< The event queue for this DCB */
int fd; /**< The descriptor */
@ -221,8 +219,6 @@ typedef struct dcb
DCBMM memdata; /**< The data related to DCB memory management */
DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */
long last_read; /*< Last time the DCB received data */
int high_water; /**< High water mark */
int low_water; /**< Low water mark */
struct server *server; /**< The associated backend server */
SSL* ssl; /*< SSL struct for connection */
bool ssl_read_want_read; /*< Flag */

View File

@ -148,7 +148,6 @@ static int dcb_read_SSL(DCB *dcb, GWBUF **head);
static GWBUF *dcb_basic_read(DCB *dcb, int bytesavailable, int maxbytes, int nreadtotal, int *nsingleread);
static GWBUF *dcb_basic_read_SSL(DCB *dcb, int *nsingleread);
static void dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno);
static inline void dcb_write_tidy_up(DCB *dcb, bool below_water);
static int gw_write(DCB *dcb, GWBUF *writeq, bool *stop_writing);
static int gw_write_SSL(DCB *dcb, GWBUF *writeq, bool *stop_writing);
static int dcb_log_errors_SSL (DCB *dcb, const char *called_by, int ret);
@ -158,7 +157,6 @@ static int dcb_listen_create_socket_unix(const char *config_bind);
static int dcb_set_socket_option(int sockfd, int level, int optname, void *optval, socklen_t optlen);
static void dcb_add_to_all_list(DCB *dcb);
static DCB *dcb_find_free();
static GWBUF *dcb_grab_writeq(DCB *dcb, bool first_time);
static void dcb_remove_from_list(DCB *dcb);
size_t dcb_get_session_id(
@ -1203,39 +1201,15 @@ dcb_log_errors_SSL (DCB *dcb, const char *called_by, int ret)
int
dcb_write(DCB *dcb, GWBUF *queue)
{
bool empty_queue;
bool below_water;
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water);
// The following guarantees that queue is not NULL
if (!dcb_write_parameter_check(dcb, queue))
{
return 0;
}
empty_queue = (dcb->writeq == NULL);
/*
* Add our data to the write queue. If the queue already had data,
* then there will be an EPOLLOUT event to drain what is already queued.
* If it did not already have data, we call the drain write queue
* function immediately to attempt to write the data.
*/
dcb->writeqlen += gwbuf_length(queue);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
MXS_DEBUG("%lu [dcb_write] Append to writequeue. %d writes "
"buffered for dcb %p in state %s fd %d",
pthread_self(),
dcb->stats.n_buffered,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd);
if (empty_queue)
{
dcb_drain_writeq(dcb);
}
dcb_write_tidy_up(dcb, below_water);
dcb_drain_writeq(dcb);
return 1;
}
@ -1368,180 +1342,64 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno)
}
/**
* Last few things to do at end of a write
* @brief Drain the write queue of a DCB
*
* @param dcb The DCB of the client
* @param below_water A boolean
*/
static inline void
dcb_write_tidy_up(DCB *dcb, bool below_water)
{
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
{
atomic_add(&dcb->stats.n_high_water, 1);
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
}
}
/**
* Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling
* of a socket and will try to send any buffered data from the write queue
* up until the point the write would block.
* This is called as part of the EPOLLOUT handling of a socket and will try to
* send any buffered data from the write queue up until the point the write would block.
*
* @param dcb DCB to drain the write queue of
* @param dcb DCB to drain
* @return The number of bytes written
*/
int
dcb_drain_writeq(DCB *dcb)
int dcb_drain_writeq(DCB *dcb)
{
int total_written = 0;
GWBUF *local_writeq;
bool above_water;
/*
* Loop over the buffer chain in the pending writeq
* Send as much of the data in that chain as possible and
* leave any balance on the write queue.
*
* Note that dcb_grab_writeq will set a flag (dcb->draining_flag) to prevent
* this function being entered a second time (by another thread) while
* processing is continuing. If the flag is already set, the return from
* dcb_grab_writeq will be NULL and so the outer while loop will not
* execute. The value of total_written will therefore remain zero and
* the nothing will happen in the wrap up code.
*
* @note The callback DCB_REASON_DRAINED is misleading. It is triggered
* pretty much every time there is an EPOLLOUT event and also when a
* write occurs while draining is still in progress. It is used only in
* the binlog router, which cannot function without the callback. The
* callback does not mean that a non-empty queue has been drained, or even
* that the queue is presently empty.
*/
local_writeq = dcb_grab_writeq(dcb, true);
if (NULL == local_writeq)
if (dcb->ssl_read_want_write)
{
dcb_call_callback(dcb, DCB_REASON_DRAINED);
return 0;
/** The SSL library needs to write more data */
poll_fake_read_event(dcb);
}
above_water = (dcb->low_water && gwbuf_length(local_writeq) > dcb->low_water);
do
{
/*
* Process the list of buffers taken from dcb->writeq
*/
while (local_writeq != NULL)
{
bool stop_writing = false;
int written;
/* The value put into written will be >= 0 */
if (dcb->ssl)
{
written = gw_write_SSL(dcb, local_writeq, &stop_writing);
}
else
{
written = gw_write(dcb, local_writeq, &stop_writing);
}
/*
* If the stop_writing boolean is set, writing has become blocked,
* so the remaining data is put back at the front of the write
* queue.
*
* However, if we have been called while processing the queue, it
* is possible that writing has blocked and then become unblocked.
* So an attempt is made to put the write queue into the local list
* and loop again.
*/
if (stop_writing)
{
dcb->writeq = gwbuf_append(local_writeq, dcb->writeq);
if (dcb->drain_called_while_busy)
{
local_writeq = dcb->writeq;
dcb->writeq = NULL;
dcb->drain_called_while_busy = false;
continue;
}
else
{
dcb->draining_flag = false;
goto wrap_up;
}
}
/*
* Consume the bytes we have written from the list of buffers,
* and increment the total bytes written.
*/
int total_written = 0;
GWBUF *local_writeq = dcb->writeq;
dcb->writeq = NULL;
while (local_writeq)
{
int written;
bool stop_writing = false;
/* The value put into written will be >= 0 */
if (dcb->ssl)
{
written = gw_write_SSL(dcb, local_writeq, &stop_writing);
}
else
{
written = gw_write(dcb, local_writeq, &stop_writing);
}
/*
* If the stop_writing boolean is set, writing has become blocked,
* so the remaining data is put back at the front of the write
* queue.
*/
if (stop_writing)
{
dcb->writeq = gwbuf_append(local_writeq, dcb->writeq);
local_writeq = NULL;
}
else
{
/** Consume the bytes we have written from the list of buffers,
* and increment the total bytes written. */
local_writeq = gwbuf_consume(local_writeq, written);
total_written += written;
}
}
while ((local_writeq = dcb_grab_writeq(dcb, false)) != NULL);
/* The write queue has drained, potentially need to call a callback function */
dcb_call_callback(dcb, DCB_REASON_DRAINED);
wrap_up:
/*
* If nothing has been written, the callback events cannot have occurred
* and there is no need to adjust the length of the write queue.
*/
if (total_written)
{
dcb->writeqlen -= total_written;
/* Check if the draining has taken us from above water to below water */
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
}
return total_written;
}
/**
* @brief If draining is not already under way, extracts the write queue
*
* Since we are intending to manipulate the write queue (a linked list) and
* possibly adjust some DCB flags, a spinlock is required. If we are already
* draining the queue, the flag is set to indicate a call while draining and
* null return is made.
*
* Otherwise, the DCB write queue is transferred into a local variable which
* will be returned to the caller, and the pointer in the DCB set to NULL.
* If the list to be returned is empty, we are stopping draining, otherwise
* we are engaged in draining.
*
* @param dcb Request handler DCB whose write queue is being drained
* @param first_time Set to true only on the first call in dcb_drain_writeq
* @return A local list of buffers taken from the DCB write queue
*/
static GWBUF *
dcb_grab_writeq(DCB *dcb, bool first_time)
{
GWBUF *local_writeq = NULL;
if (first_time && dcb->ssl_read_want_write)
{
poll_fake_read_event(dcb);
}
if (first_time && dcb->draining_flag)
{
dcb->drain_called_while_busy = true;
}
else
{
local_writeq = dcb->writeq;
dcb->draining_flag = local_writeq ? true : false;
dcb->writeq = NULL;
}
return local_writeq;
}
static void log_illegal_dcb(DCB *dcb)
{
const char *connected_to;