From 4e223adeff8d8840173cb6f10ad048d67c7452e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 21 Feb 2017 10:27:07 +0200 Subject: [PATCH] Simplify DCB writing code Due to the changes in the threading model, the DCB write code can be simplified by a great amount. Since only one thread can write to a DCB, it's safe to assume that no new data is added to the write queue of a DCB while it is being drained. This removes the need for the code that tracks whether a concurrent DCB write attempt was made. Because the high and low water callbacks weren't used by any module, it is safe to remove them. They offer no real benefits over the drain callback. --- include/maxscale/dcb.h | 4 - server/core/dcb.c | 224 ++++++++--------------------------------- 2 files changed, 41 insertions(+), 187 deletions(-) diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index 994508011..7171e3457 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -187,8 +187,6 @@ typedef struct dcb skygw_chk_t dcb_chk_top; bool dcb_errhandle_called; /*< this can be called only once */ bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */ - bool draining_flag; /**< Set while write queue is drained */ - bool drain_called_while_busy; /**< Set as described */ dcb_role_t dcb_role; DCBEVENTQ evq; /**< The event queue for this DCB */ int fd; /**< The descriptor */ @@ -221,8 +219,6 @@ typedef struct dcb DCBMM memdata; /**< The data related to DCB memory management */ DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ long last_read; /*< Last time the DCB received data */ - int high_water; /**< High water mark */ - int low_water; /**< Low water mark */ struct server *server; /**< The associated backend server */ SSL* ssl; /*< SSL struct for connection */ bool ssl_read_want_read; /*< Flag */ diff --git a/server/core/dcb.c b/server/core/dcb.c index f0b9802b3..943350804 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -148,7 +148,6 @@ static int dcb_read_SSL(DCB *dcb, GWBUF **head); static GWBUF *dcb_basic_read(DCB *dcb, int bytesavailable, int maxbytes, int nreadtotal, int *nsingleread); static GWBUF *dcb_basic_read_SSL(DCB *dcb, int *nsingleread); static void dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno); -static inline void dcb_write_tidy_up(DCB *dcb, bool below_water); static int gw_write(DCB *dcb, GWBUF *writeq, bool *stop_writing); static int gw_write_SSL(DCB *dcb, GWBUF *writeq, bool *stop_writing); static int dcb_log_errors_SSL (DCB *dcb, const char *called_by, int ret); @@ -158,7 +157,6 @@ static int dcb_listen_create_socket_unix(const char *config_bind); static int dcb_set_socket_option(int sockfd, int level, int optname, void *optval, socklen_t optlen); static void dcb_add_to_all_list(DCB *dcb); static DCB *dcb_find_free(); -static GWBUF *dcb_grab_writeq(DCB *dcb, bool first_time); static void dcb_remove_from_list(DCB *dcb); size_t dcb_get_session_id( @@ -1203,39 +1201,15 @@ dcb_log_errors_SSL (DCB *dcb, const char *called_by, int ret) int dcb_write(DCB *dcb, GWBUF *queue) { - bool empty_queue; - bool below_water; - - below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water); // The following guarantees that queue is not NULL if (!dcb_write_parameter_check(dcb, queue)) { return 0; } - empty_queue = (dcb->writeq == NULL); - /* - * Add our data to the write queue. If the queue already had data, - * then there will be an EPOLLOUT event to drain what is already queued. - * If it did not already have data, we call the drain write queue - * function immediately to attempt to write the data. - */ - dcb->writeqlen += gwbuf_length(queue); dcb->writeq = gwbuf_append(dcb->writeq, queue); dcb->stats.n_buffered++; - - MXS_DEBUG("%lu [dcb_write] Append to writequeue. %d writes " - "buffered for dcb %p in state %s fd %d", - pthread_self(), - dcb->stats.n_buffered, - dcb, - STRDCBSTATE(dcb->state), - dcb->fd); - if (empty_queue) - { - dcb_drain_writeq(dcb); - } - dcb_write_tidy_up(dcb, below_water); + dcb_drain_writeq(dcb); return 1; } @@ -1368,180 +1342,64 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno) } /** - * Last few things to do at end of a write + * @brief Drain the write queue of a DCB * - * @param dcb The DCB of the client - * @param below_water A boolean - */ -static inline void -dcb_write_tidy_up(DCB *dcb, bool below_water) -{ - if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water) - { - atomic_add(&dcb->stats.n_high_water, 1); - dcb_call_callback(dcb, DCB_REASON_HIGH_WATER); - } -} - -/** - * Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling - * of a socket and will try to send any buffered data from the write queue - * up until the point the write would block. + * This is called as part of the EPOLLOUT handling of a socket and will try to + * send any buffered data from the write queue up until the point the write would block. * - * @param dcb DCB to drain the write queue of + * @param dcb DCB to drain * @return The number of bytes written */ -int -dcb_drain_writeq(DCB *dcb) +int dcb_drain_writeq(DCB *dcb) { - int total_written = 0; - GWBUF *local_writeq; - bool above_water; - /* - * Loop over the buffer chain in the pending writeq - * Send as much of the data in that chain as possible and - * leave any balance on the write queue. - * - * Note that dcb_grab_writeq will set a flag (dcb->draining_flag) to prevent - * this function being entered a second time (by another thread) while - * processing is continuing. If the flag is already set, the return from - * dcb_grab_writeq will be NULL and so the outer while loop will not - * execute. The value of total_written will therefore remain zero and - * the nothing will happen in the wrap up code. - * - * @note The callback DCB_REASON_DRAINED is misleading. It is triggered - * pretty much every time there is an EPOLLOUT event and also when a - * write occurs while draining is still in progress. It is used only in - * the binlog router, which cannot function without the callback. The - * callback does not mean that a non-empty queue has been drained, or even - * that the queue is presently empty. - */ - local_writeq = dcb_grab_writeq(dcb, true); - if (NULL == local_writeq) + if (dcb->ssl_read_want_write) { - dcb_call_callback(dcb, DCB_REASON_DRAINED); - return 0; + /** The SSL library needs to write more data */ + poll_fake_read_event(dcb); } - above_water = (dcb->low_water && gwbuf_length(local_writeq) > dcb->low_water); - do - { - /* - * Process the list of buffers taken from dcb->writeq - */ - while (local_writeq != NULL) - { - bool stop_writing = false; - int written; - /* The value put into written will be >= 0 */ - if (dcb->ssl) - { - written = gw_write_SSL(dcb, local_writeq, &stop_writing); - } - else - { - written = gw_write(dcb, local_writeq, &stop_writing); - } - /* - * If the stop_writing boolean is set, writing has become blocked, - * so the remaining data is put back at the front of the write - * queue. - * - * However, if we have been called while processing the queue, it - * is possible that writing has blocked and then become unblocked. - * So an attempt is made to put the write queue into the local list - * and loop again. - */ - if (stop_writing) - { - dcb->writeq = gwbuf_append(local_writeq, dcb->writeq); - if (dcb->drain_called_while_busy) - { - local_writeq = dcb->writeq; - dcb->writeq = NULL; - dcb->drain_called_while_busy = false; - continue; - } - else - { - dcb->draining_flag = false; - goto wrap_up; - } - } - /* - * Consume the bytes we have written from the list of buffers, - * and increment the total bytes written. - */ + int total_written = 0; + GWBUF *local_writeq = dcb->writeq; + dcb->writeq = NULL; + + while (local_writeq) + { + int written; + bool stop_writing = false; + /* The value put into written will be >= 0 */ + if (dcb->ssl) + { + written = gw_write_SSL(dcb, local_writeq, &stop_writing); + } + else + { + written = gw_write(dcb, local_writeq, &stop_writing); + } + /* + * If the stop_writing boolean is set, writing has become blocked, + * so the remaining data is put back at the front of the write + * queue. + */ + if (stop_writing) + { + dcb->writeq = gwbuf_append(local_writeq, dcb->writeq); + local_writeq = NULL; + } + else + { + /** Consume the bytes we have written from the list of buffers, + * and increment the total bytes written. */ local_writeq = gwbuf_consume(local_writeq, written); total_written += written; } } - while ((local_writeq = dcb_grab_writeq(dcb, false)) != NULL); + /* The write queue has drained, potentially need to call a callback function */ dcb_call_callback(dcb, DCB_REASON_DRAINED); -wrap_up: - - /* - * If nothing has been written, the callback events cannot have occurred - * and there is no need to adjust the length of the write queue. - */ - if (total_written) - { - dcb->writeqlen -= total_written; - - /* Check if the draining has taken us from above water to below water */ - if (above_water && dcb->writeqlen < dcb->low_water) - { - atomic_add(&dcb->stats.n_low_water, 1); - dcb_call_callback(dcb, DCB_REASON_LOW_WATER); - } - - } return total_written; } -/** - * @brief If draining is not already under way, extracts the write queue - * - * Since we are intending to manipulate the write queue (a linked list) and - * possibly adjust some DCB flags, a spinlock is required. If we are already - * draining the queue, the flag is set to indicate a call while draining and - * null return is made. - * - * Otherwise, the DCB write queue is transferred into a local variable which - * will be returned to the caller, and the pointer in the DCB set to NULL. - * If the list to be returned is empty, we are stopping draining, otherwise - * we are engaged in draining. - * - * @param dcb Request handler DCB whose write queue is being drained - * @param first_time Set to true only on the first call in dcb_drain_writeq - * @return A local list of buffers taken from the DCB write queue - */ -static GWBUF * -dcb_grab_writeq(DCB *dcb, bool first_time) -{ - GWBUF *local_writeq = NULL; - - if (first_time && dcb->ssl_read_want_write) - { - poll_fake_read_event(dcb); - } - - if (first_time && dcb->draining_flag) - { - dcb->drain_called_while_busy = true; - } - else - { - local_writeq = dcb->writeq; - dcb->draining_flag = local_writeq ? true : false; - dcb->writeq = NULL; - } - - return local_writeq; -} - static void log_illegal_dcb(DCB *dcb) { const char *connected_to;