diff --git a/include/maxscale/config.h b/include/maxscale/config.h index cd3f6b40f..d1d72ad36 100644 --- a/include/maxscale/config.h +++ b/include/maxscale/config.h @@ -168,6 +168,8 @@ extern const char CN_USER[]; extern const char CN_USERS[]; extern const char CN_VERSION_STRING[]; extern const char CN_WEIGHTBY[]; +extern const char CN_WRITEQ_HIGH_WATER[]; +extern const char CN_WRITEQ_LOW_WATER[]; /** * The config parameter @@ -233,6 +235,8 @@ typedef struct bool substitute_variables; /**< Should environment variables be substituted */ char* local_address; /**< Local address to use when connecting */ time_t users_refresh_time; /**< How often the users can be refreshed */ + uint32_t writeq_high_water; /**< High water mark of dcb write queue */ + uint32_t writeq_low_water; /**< Low water mark of dcb write queue */ } MXS_CONFIG; /** @@ -503,4 +507,18 @@ bool config_reload(void); */ json_t* config_maxscale_to_json(const char* host); +/** + * @brief Get DCB write queue high water mark + * + * @return Number of high water mark in bytes + */ +uint32_t config_writeq_high_water(); + +/** + * @brief Get DCB write queue low water mark + * + * @return @return Number of low water mark in bytes + */ +uint32_t config_writeq_low_water(); + MXS_END_DECLS diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index 7260f9191..2039aeb8a 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -151,7 +151,9 @@ typedef struct dcb struct servlistener *listener; /**< For a client DCB, the listener data */ MXS_PROTOCOL func; /**< The protocol functions for this descriptor */ MXS_AUTHENTICATOR authfunc; /**< The authenticator functions for this descriptor */ - int writeqlen; /**< Current number of byes in the write queue */ + uint32_t writeqlen; /**< Current number of byes in the write queue */ + uint32_t high_water; /**< High water mark of write queue */ + uint32_t low_water; /**< Low water mark of write queue */ GWBUF *writeq; /**< Write Data Queue */ GWBUF *delayq; /**< Delay Backend Write Data Queue */ GWBUF *readq; /**< Read queue for storing incomplete reads */ @@ -176,6 +178,7 @@ typedef struct dcb bool ssl_write_want_read; /*< Flag */ bool ssl_write_want_write; /*< Flag */ bool was_persistent; /**< Whether this DCB was in the persistent pool */ + bool high_water_reached; /** High water mark reached, to determine whether need release throttle */ struct { struct dcb *next; /**< Next DCB in owning thread's list */ @@ -206,7 +209,7 @@ typedef enum #define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi); #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) - +#define DCB_THROTTLING_ENABLED(x) ((x)->high_water && (x)->low_water) /** * @brief DCB system initialization function * diff --git a/include/maxscale/session.h b/include/maxscale/session.h index 34f05ebd8..606319680 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -136,6 +136,7 @@ typedef struct session mxs_session_state_t state; /*< Current descriptor state */ uint64_t ses_id; /*< Unique session identifier */ struct dcb *client_dcb; /*< The client connection */ + struct mxs_router_session *router_session; /*< The router instance data */ MXS_SESSION_STATS stats; /*< Session statistics */ struct service *service; /*< The service this session is using */ diff --git a/server/core/config.cc b/server/core/config.cc index 9fb4af3c2..9e1db34b2 100644 --- a/server/core/config.cc +++ b/server/core/config.cc @@ -150,6 +150,8 @@ const char CN_USERS_REFRESH_TIME[] = "users_refresh_time"; const char CN_VERSION_STRING[] = "version_string"; const char CN_WEIGHTBY[] = "weightby"; const char CN_SESSION_TRACK_TRX_STATE[] = "session_track_trx_state"; +const char CN_WRITEQ_HIGH_WATER[] = "writeq_high_water"; +const char CN_WRITEQ_LOW_WATER[] = "writeq_low_water"; typedef struct duplicate_context { @@ -593,6 +595,14 @@ static bool config_load_single_file(const char* file, } } + /* Check this after reading config is finished */ + if ((gateway.writeq_high_water || gateway.writeq_low_water) && + gateway.writeq_high_water <= gateway.writeq_low_water) + { + rval = -1; + MXS_ERROR("Invaild configuration, writeq_high_water should be greater than writeq_low_water"); + } + return rval == 0; } @@ -1356,6 +1366,15 @@ config_nbpolls() return gateway.n_nbpoll; } +uint32_t config_writeq_high_water() +{ + return gateway.writeq_high_water; +} + +uint32_t config_writeq_low_water() +{ + return gateway.writeq_low_water; +} /** * Return the configured number of milliseconds for which we wait when we do * a blocking poll call. @@ -1700,6 +1719,28 @@ handle_global_item(const char *name, const char *value) gateway.users_refresh_time = users_refresh_time; } + else if (strcmp(name, CN_WRITEQ_HIGH_WATER) == 0) + { + gateway.writeq_high_water = get_suffixed_size(value); + if (gateway.writeq_high_water < MIN_WRITEQ_HIGH_WATER) + { + MXS_WARNING("The specified writeq high water mark %d, is smaller than the minimum allowed size %d. Changing to minimum.", + gateway.writeq_high_water, MIN_WRITEQ_HIGH_WATER); + gateway.writeq_high_water = MIN_WRITEQ_HIGH_WATER; + } + MXS_NOTICE("Writeq high water mark set to: %d", gateway.writeq_high_water); + } + else if (strcmp(name, CN_WRITEQ_LOW_WATER) == 0) + { + gateway.writeq_low_water = get_suffixed_size(value); + if (gateway.writeq_low_water < MIN_WRITEQ_LOW_WATER) + { + MXS_WARNING("The specified writeq low water mark %d, is smaller than the minimum allowed size %d. Changing to minimum.", + gateway.writeq_low_water, MIN_WRITEQ_LOW_WATER); + gateway.writeq_low_water = MIN_WRITEQ_LOW_WATER; + } + MXS_NOTICE("Writeq low water mark set to: %d", gateway.writeq_low_water); + } else { MXS_ERROR("%s is an invalid value for '%s', using default %d instead.", @@ -1913,6 +1954,8 @@ void config_set_global_defaults() gateway.promoted_at = 0; gateway.thread_stack_size = 0; + gateway.writeq_high_water = 0; + gateway.writeq_low_water = 0; pthread_attr_t attr; if (pthread_attr_init(&attr) == 0) { @@ -4051,6 +4094,8 @@ json_t* config_maxscale_to_json(const char* host) json_object_set_new(param, "connector_plugindir", json_string(get_connector_plugindir())); json_object_set_new(param, CN_THREADS, json_integer(config_threadcount())); json_object_set_new(param, CN_THREAD_STACK_SIZE, json_integer(config_thread_stack_size())); + json_object_set_new(param, CN_WRITEQ_HIGH_WATER, json_integer(config_writeq_high_water())); + json_object_set_new(param, CN_WRITEQ_LOW_WATER, json_integer(config_writeq_low_water())); MXS_CONFIG* cnf = config_get_global_options(); diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 4ba29913b..eb16863f5 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -120,6 +120,8 @@ static void dcb_remove_from_list(DCB *dcb); static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events); static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t ev); static bool dcb_session_check(DCB *dcb, const char *); +static int upstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata); +static int downstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata); void dcb_global_init() { @@ -128,6 +130,9 @@ void dcb_global_init() this_unit.dcb_initialized.state = DCB_STATE_ALLOC; this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN; this_unit.dcb_initialized.poll.handler = dcb_poll_handler; + this_unit.dcb_initialized.high_water_reached = false; + this_unit.dcb_initialized.low_water = config_writeq_low_water(); + this_unit.dcb_initialized.high_water = config_writeq_high_water(); this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB; int nthreads = config_threadcount(); @@ -508,6 +513,13 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol) dcb_free_all_memory(dcb); return NULL; } + + /* Register upstream throttling callbacks */ + if (DCB_THROTTLING_ENABLED(dcb)) + { + dcb_add_callback(dcb, DCB_REASON_HIGH_WATER, upstream_throttle_callback, NULL); + dcb_add_callback(dcb, DCB_REASON_LOW_WATER, upstream_throttle_callback, NULL); + } /** * The dcb will be addded into poll set by dcb->func.connect */ @@ -878,6 +890,7 @@ dcb_log_errors_SSL(DCB *dcb, int ret) int dcb_write(DCB *dcb, GWBUF *queue) { + dcb->writeqlen += gwbuf_length(queue); // The following guarantees that queue is not NULL if (!dcb_write_parameter_check(dcb, queue)) { @@ -888,6 +901,13 @@ dcb_write(DCB *dcb, GWBUF *queue) dcb->stats.n_buffered++; dcb_drain_writeq(dcb); + if (DCB_ABOVE_HIGH_WATER(dcb) && !dcb->high_water_reached) + { + dcb_call_callback(dcb, DCB_REASON_HIGH_WATER); + dcb->high_water_reached = true; + dcb->stats.n_high_water++; + } + return 1; } @@ -1016,6 +1036,16 @@ int dcb_drain_writeq(DCB *dcb) dcb_call_callback(dcb, DCB_REASON_DRAINED); } + ss_dassert(dcb->writeqlen >= total_written); + dcb->writeqlen -= total_written; + + if (dcb->high_water_reached && DCB_BELOW_LOW_WATER(dcb)) + { + dcb_call_callback(dcb, DCB_REASON_LOW_WATER); + dcb->high_water_reached = false; + dcb->stats.n_low_water++; + } + return total_written; } @@ -2453,6 +2483,13 @@ dcb_accept(DCB *dcb) return NULL; } + /* Register downstream throttling callbacks */ + if (DCB_THROTTLING_ENABLED(dcb)) + { + dcb_add_callback(client_dcb, DCB_REASON_HIGH_WATER, downstream_throttle_callback, NULL); + dcb_add_callback(client_dcb, DCB_REASON_LOW_WATER, downstream_throttle_callback, NULL); + } + if (client_dcb->service->max_connections && client_dcb->service->client_count >= client_dcb->service->max_connections) { @@ -3510,3 +3547,80 @@ DCB* dcb_get_current() { return this_thread.current_dcb; } + +/** + * @brief DCB callback for upstream throtting + * Called by any backend dcb when its writeq is above high water mark or + * it has reached high water mark and now it is below low water mark, + * Calling `poll_remove_dcb` or `poll_add_dcb' on client dcb to throttle + * network traffic from client to mxs. + * + * @param dcb Backend dcb + * @param reason Why the callback was called + * @param userdata Data provided when the callback was added + * @return Always 0 + */ +static int upstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata) +{ + DCB *client_dcb = dcb->session->client_dcb; + if (reason == DCB_REASON_HIGH_WATER) + { + poll_remove_dcb(client_dcb); + } + else if (reason == DCB_REASON_LOW_WATER) + { + poll_add_dcb(client_dcb); + } + + return 0; +} + +bool backend_dcb_remove_func(DCB *dcb, void *data) +{ + MXS_SESSION* session = (MXS_SESSION*)data; + + if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) + { + poll_remove_dcb(dcb); + } + + return true; +} + +bool backend_dcb_add_func(DCB *dcb, void *data) +{ + MXS_SESSION* session = (MXS_SESSION*)data; + + if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER) + { + poll_add_dcb(dcb); + } + + return true; +} + +/** + * @brief DCB callback for downstream throtting + * Called by client dcb when its writeq is above high water mark or + * it has reached high water mark and now it is below low water mark, + * Calling `poll_remove_dcb` or `poll_add_dcb' on all backend dcbs to + * throttle network traffic from server to mxs. + * + * @param dcb client dcb + * @param reason Why the callback was called + * @param userdata Data provided when the callback was added + * @return Always 0 + */ +static int downstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata) +{ + if (reason == DCB_REASON_HIGH_WATER) + { + dcb_foreach(backend_dcb_remove_func, dcb->session); + } + else if (reason == DCB_REASON_LOW_WATER) + { + dcb_foreach(backend_dcb_add_func, dcb->session); + } + + return 0; +} diff --git a/server/core/internal/config.h b/server/core/internal/config.h index f9f54f841..b32e4ec38 100644 --- a/server/core/internal/config.h +++ b/server/core/internal/config.h @@ -28,6 +28,8 @@ MXS_BEGIN_DECLS #define DEFAULT_NTHREADS 1 /**< Default number of polling threads */ #define DEFAULT_QUERY_RETRIES 0 /**< Number of retries for interrupted queries */ #define DEFAULT_QUERY_RETRY_TIMEOUT 5 /**< Timeout for query retries */ +#define MIN_WRITEQ_HIGH_WATER 4096 /**< Min high water mark of dcb write queue */ +#define MIN_WRITEQ_LOW_WATER 512 /**< Min low water mark of dcb write queue */ /** * Maximum length for configuration parameter value.