diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index 994508011..7171e3457 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -187,8 +187,6 @@ typedef struct dcb skygw_chk_t dcb_chk_top; bool dcb_errhandle_called; /*< this can be called only once */ bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */ - bool draining_flag; /**< Set while write queue is drained */ - bool drain_called_while_busy; /**< Set as described */ dcb_role_t dcb_role; DCBEVENTQ evq; /**< The event queue for this DCB */ int fd; /**< The descriptor */ @@ -221,8 +219,6 @@ typedef struct dcb DCBMM memdata; /**< The data related to DCB memory management */ DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ long last_read; /*< Last time the DCB received data */ - int high_water; /**< High water mark */ - int low_water; /**< Low water mark */ struct server *server; /**< The associated backend server */ SSL* ssl; /*< SSL struct for connection */ bool ssl_read_want_read; /*< Flag */ diff --git a/server/core/dcb.c b/server/core/dcb.c index f0b9802b3..943350804 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -148,7 +148,6 @@ static int dcb_read_SSL(DCB *dcb, GWBUF **head); static GWBUF *dcb_basic_read(DCB *dcb, int bytesavailable, int maxbytes, int nreadtotal, int *nsingleread); static GWBUF *dcb_basic_read_SSL(DCB *dcb, int *nsingleread); static void dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno); -static inline void dcb_write_tidy_up(DCB *dcb, bool below_water); static int gw_write(DCB *dcb, GWBUF *writeq, bool *stop_writing); static int gw_write_SSL(DCB *dcb, GWBUF *writeq, bool *stop_writing); static int dcb_log_errors_SSL (DCB *dcb, const char *called_by, int ret); @@ -158,7 +157,6 @@ static int dcb_listen_create_socket_unix(const char *config_bind); static int dcb_set_socket_option(int sockfd, int level, int optname, void *optval, socklen_t optlen); static void dcb_add_to_all_list(DCB *dcb); static DCB *dcb_find_free(); -static GWBUF *dcb_grab_writeq(DCB *dcb, bool first_time); static void dcb_remove_from_list(DCB *dcb); size_t dcb_get_session_id( @@ -1203,39 +1201,15 @@ dcb_log_errors_SSL (DCB *dcb, const char *called_by, int ret) int dcb_write(DCB *dcb, GWBUF *queue) { - bool empty_queue; - bool below_water; - - below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water); // The following guarantees that queue is not NULL if (!dcb_write_parameter_check(dcb, queue)) { return 0; } - empty_queue = (dcb->writeq == NULL); - /* - * Add our data to the write queue. If the queue already had data, - * then there will be an EPOLLOUT event to drain what is already queued. - * If it did not already have data, we call the drain write queue - * function immediately to attempt to write the data. - */ - dcb->writeqlen += gwbuf_length(queue); dcb->writeq = gwbuf_append(dcb->writeq, queue); dcb->stats.n_buffered++; - - MXS_DEBUG("%lu [dcb_write] Append to writequeue. %d writes " - "buffered for dcb %p in state %s fd %d", - pthread_self(), - dcb->stats.n_buffered, - dcb, - STRDCBSTATE(dcb->state), - dcb->fd); - if (empty_queue) - { - dcb_drain_writeq(dcb); - } - dcb_write_tidy_up(dcb, below_water); + dcb_drain_writeq(dcb); return 1; } @@ -1368,180 +1342,64 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno) } /** - * Last few things to do at end of a write + * @brief Drain the write queue of a DCB * - * @param dcb The DCB of the client - * @param below_water A boolean - */ -static inline void -dcb_write_tidy_up(DCB *dcb, bool below_water) -{ - if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water) - { - atomic_add(&dcb->stats.n_high_water, 1); - dcb_call_callback(dcb, DCB_REASON_HIGH_WATER); - } -} - -/** - * Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling - * of a socket and will try to send any buffered data from the write queue - * up until the point the write would block. + * This is called as part of the EPOLLOUT handling of a socket and will try to + * send any buffered data from the write queue up until the point the write would block. * - * @param dcb DCB to drain the write queue of + * @param dcb DCB to drain * @return The number of bytes written */ -int -dcb_drain_writeq(DCB *dcb) +int dcb_drain_writeq(DCB *dcb) { - int total_written = 0; - GWBUF *local_writeq; - bool above_water; - /* - * Loop over the buffer chain in the pending writeq - * Send as much of the data in that chain as possible and - * leave any balance on the write queue. - * - * Note that dcb_grab_writeq will set a flag (dcb->draining_flag) to prevent - * this function being entered a second time (by another thread) while - * processing is continuing. If the flag is already set, the return from - * dcb_grab_writeq will be NULL and so the outer while loop will not - * execute. The value of total_written will therefore remain zero and - * the nothing will happen in the wrap up code. - * - * @note The callback DCB_REASON_DRAINED is misleading. It is triggered - * pretty much every time there is an EPOLLOUT event and also when a - * write occurs while draining is still in progress. It is used only in - * the binlog router, which cannot function without the callback. The - * callback does not mean that a non-empty queue has been drained, or even - * that the queue is presently empty. - */ - local_writeq = dcb_grab_writeq(dcb, true); - if (NULL == local_writeq) + if (dcb->ssl_read_want_write) { - dcb_call_callback(dcb, DCB_REASON_DRAINED); - return 0; + /** The SSL library needs to write more data */ + poll_fake_read_event(dcb); } - above_water = (dcb->low_water && gwbuf_length(local_writeq) > dcb->low_water); - do - { - /* - * Process the list of buffers taken from dcb->writeq - */ - while (local_writeq != NULL) - { - bool stop_writing = false; - int written; - /* The value put into written will be >= 0 */ - if (dcb->ssl) - { - written = gw_write_SSL(dcb, local_writeq, &stop_writing); - } - else - { - written = gw_write(dcb, local_writeq, &stop_writing); - } - /* - * If the stop_writing boolean is set, writing has become blocked, - * so the remaining data is put back at the front of the write - * queue. - * - * However, if we have been called while processing the queue, it - * is possible that writing has blocked and then become unblocked. - * So an attempt is made to put the write queue into the local list - * and loop again. - */ - if (stop_writing) - { - dcb->writeq = gwbuf_append(local_writeq, dcb->writeq); - if (dcb->drain_called_while_busy) - { - local_writeq = dcb->writeq; - dcb->writeq = NULL; - dcb->drain_called_while_busy = false; - continue; - } - else - { - dcb->draining_flag = false; - goto wrap_up; - } - } - /* - * Consume the bytes we have written from the list of buffers, - * and increment the total bytes written. - */ + int total_written = 0; + GWBUF *local_writeq = dcb->writeq; + dcb->writeq = NULL; + + while (local_writeq) + { + int written; + bool stop_writing = false; + /* The value put into written will be >= 0 */ + if (dcb->ssl) + { + written = gw_write_SSL(dcb, local_writeq, &stop_writing); + } + else + { + written = gw_write(dcb, local_writeq, &stop_writing); + } + /* + * If the stop_writing boolean is set, writing has become blocked, + * so the remaining data is put back at the front of the write + * queue. + */ + if (stop_writing) + { + dcb->writeq = gwbuf_append(local_writeq, dcb->writeq); + local_writeq = NULL; + } + else + { + /** Consume the bytes we have written from the list of buffers, + * and increment the total bytes written. */ local_writeq = gwbuf_consume(local_writeq, written); total_written += written; } } - while ((local_writeq = dcb_grab_writeq(dcb, false)) != NULL); + /* The write queue has drained, potentially need to call a callback function */ dcb_call_callback(dcb, DCB_REASON_DRAINED); -wrap_up: - - /* - * If nothing has been written, the callback events cannot have occurred - * and there is no need to adjust the length of the write queue. - */ - if (total_written) - { - dcb->writeqlen -= total_written; - - /* Check if the draining has taken us from above water to below water */ - if (above_water && dcb->writeqlen < dcb->low_water) - { - atomic_add(&dcb->stats.n_low_water, 1); - dcb_call_callback(dcb, DCB_REASON_LOW_WATER); - } - - } return total_written; } -/** - * @brief If draining is not already under way, extracts the write queue - * - * Since we are intending to manipulate the write queue (a linked list) and - * possibly adjust some DCB flags, a spinlock is required. If we are already - * draining the queue, the flag is set to indicate a call while draining and - * null return is made. - * - * Otherwise, the DCB write queue is transferred into a local variable which - * will be returned to the caller, and the pointer in the DCB set to NULL. - * If the list to be returned is empty, we are stopping draining, otherwise - * we are engaged in draining. - * - * @param dcb Request handler DCB whose write queue is being drained - * @param first_time Set to true only on the first call in dcb_drain_writeq - * @return A local list of buffers taken from the DCB write queue - */ -static GWBUF * -dcb_grab_writeq(DCB *dcb, bool first_time) -{ - GWBUF *local_writeq = NULL; - - if (first_time && dcb->ssl_read_want_write) - { - poll_fake_read_event(dcb); - } - - if (first_time && dcb->draining_flag) - { - dcb->drain_called_while_busy = true; - } - else - { - local_writeq = dcb->writeq; - dcb->draining_flag = local_writeq ? true : false; - dcb->writeq = NULL; - } - - return local_writeq; -} - static void log_illegal_dcb(DCB *dcb) { const char *connected_to;