MXS-1690: Need Backpressure mechanism (#169)
* implication of backpressure * fix typo and refactor * misc refactor * misc fix * add function session_unlink_backend_dcb and more comments * misc fix * refactor, move all throttling logic into dcb.cc * misc fix * misc fix
This commit is contained in:
@ -168,6 +168,8 @@ extern const char CN_USER[];
|
||||
extern const char CN_USERS[];
|
||||
extern const char CN_VERSION_STRING[];
|
||||
extern const char CN_WEIGHTBY[];
|
||||
extern const char CN_WRITEQ_HIGH_WATER[];
|
||||
extern const char CN_WRITEQ_LOW_WATER[];
|
||||
|
||||
/**
|
||||
* The config parameter
|
||||
@ -233,6 +235,8 @@ typedef struct
|
||||
bool substitute_variables; /**< Should environment variables be substituted */
|
||||
char* local_address; /**< Local address to use when connecting */
|
||||
time_t users_refresh_time; /**< How often the users can be refreshed */
|
||||
uint32_t writeq_high_water; /**< High water mark of dcb write queue */
|
||||
uint32_t writeq_low_water; /**< Low water mark of dcb write queue */
|
||||
} MXS_CONFIG;
|
||||
|
||||
/**
|
||||
@ -503,4 +507,18 @@ bool config_reload(void);
|
||||
*/
|
||||
json_t* config_maxscale_to_json(const char* host);
|
||||
|
||||
/**
|
||||
* @brief Get DCB write queue high water mark
|
||||
*
|
||||
* @return Number of high water mark in bytes
|
||||
*/
|
||||
uint32_t config_writeq_high_water();
|
||||
|
||||
/**
|
||||
* @brief Get DCB write queue low water mark
|
||||
*
|
||||
* @return @return Number of low water mark in bytes
|
||||
*/
|
||||
uint32_t config_writeq_low_water();
|
||||
|
||||
MXS_END_DECLS
|
||||
|
@ -151,7 +151,9 @@ typedef struct dcb
|
||||
struct servlistener *listener; /**< For a client DCB, the listener data */
|
||||
MXS_PROTOCOL func; /**< The protocol functions for this descriptor */
|
||||
MXS_AUTHENTICATOR authfunc; /**< The authenticator functions for this descriptor */
|
||||
int writeqlen; /**< Current number of byes in the write queue */
|
||||
uint32_t writeqlen; /**< Current number of byes in the write queue */
|
||||
uint32_t high_water; /**< High water mark of write queue */
|
||||
uint32_t low_water; /**< Low water mark of write queue */
|
||||
GWBUF *writeq; /**< Write Data Queue */
|
||||
GWBUF *delayq; /**< Delay Backend Write Data Queue */
|
||||
GWBUF *readq; /**< Read queue for storing incomplete reads */
|
||||
@ -176,6 +178,7 @@ typedef struct dcb
|
||||
bool ssl_write_want_read; /*< Flag */
|
||||
bool ssl_write_want_write; /*< Flag */
|
||||
bool was_persistent; /**< Whether this DCB was in the persistent pool */
|
||||
bool high_water_reached; /** High water mark reached, to determine whether need release throttle */
|
||||
struct
|
||||
{
|
||||
struct dcb *next; /**< Next DCB in owning thread's list */
|
||||
@ -206,7 +209,7 @@ typedef enum
|
||||
#define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi);
|
||||
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
|
||||
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
|
||||
|
||||
#define DCB_THROTTLING_ENABLED(x) ((x)->high_water && (x)->low_water)
|
||||
/**
|
||||
* @brief DCB system initialization function
|
||||
*
|
||||
|
@ -136,6 +136,7 @@ typedef struct session
|
||||
mxs_session_state_t state; /*< Current descriptor state */
|
||||
uint64_t ses_id; /*< Unique session identifier */
|
||||
struct dcb *client_dcb; /*< The client connection */
|
||||
|
||||
struct mxs_router_session *router_session; /*< The router instance data */
|
||||
MXS_SESSION_STATS stats; /*< Session statistics */
|
||||
struct service *service; /*< The service this session is using */
|
||||
|
@ -150,6 +150,8 @@ const char CN_USERS_REFRESH_TIME[] = "users_refresh_time";
|
||||
const char CN_VERSION_STRING[] = "version_string";
|
||||
const char CN_WEIGHTBY[] = "weightby";
|
||||
const char CN_SESSION_TRACK_TRX_STATE[] = "session_track_trx_state";
|
||||
const char CN_WRITEQ_HIGH_WATER[] = "writeq_high_water";
|
||||
const char CN_WRITEQ_LOW_WATER[] = "writeq_low_water";
|
||||
|
||||
typedef struct duplicate_context
|
||||
{
|
||||
@ -593,6 +595,14 @@ static bool config_load_single_file(const char* file,
|
||||
}
|
||||
}
|
||||
|
||||
/* Check this after reading config is finished */
|
||||
if ((gateway.writeq_high_water || gateway.writeq_low_water) &&
|
||||
gateway.writeq_high_water <= gateway.writeq_low_water)
|
||||
{
|
||||
rval = -1;
|
||||
MXS_ERROR("Invaild configuration, writeq_high_water should be greater than writeq_low_water");
|
||||
}
|
||||
|
||||
return rval == 0;
|
||||
}
|
||||
|
||||
@ -1356,6 +1366,15 @@ config_nbpolls()
|
||||
return gateway.n_nbpoll;
|
||||
}
|
||||
|
||||
uint32_t config_writeq_high_water()
|
||||
{
|
||||
return gateway.writeq_high_water;
|
||||
}
|
||||
|
||||
uint32_t config_writeq_low_water()
|
||||
{
|
||||
return gateway.writeq_low_water;
|
||||
}
|
||||
/**
|
||||
* Return the configured number of milliseconds for which we wait when we do
|
||||
* a blocking poll call.
|
||||
@ -1700,6 +1719,28 @@ handle_global_item(const char *name, const char *value)
|
||||
|
||||
gateway.users_refresh_time = users_refresh_time;
|
||||
}
|
||||
else if (strcmp(name, CN_WRITEQ_HIGH_WATER) == 0)
|
||||
{
|
||||
gateway.writeq_high_water = get_suffixed_size(value);
|
||||
if (gateway.writeq_high_water < MIN_WRITEQ_HIGH_WATER)
|
||||
{
|
||||
MXS_WARNING("The specified writeq high water mark %d, is smaller than the minimum allowed size %d. Changing to minimum.",
|
||||
gateway.writeq_high_water, MIN_WRITEQ_HIGH_WATER);
|
||||
gateway.writeq_high_water = MIN_WRITEQ_HIGH_WATER;
|
||||
}
|
||||
MXS_NOTICE("Writeq high water mark set to: %d", gateway.writeq_high_water);
|
||||
}
|
||||
else if (strcmp(name, CN_WRITEQ_LOW_WATER) == 0)
|
||||
{
|
||||
gateway.writeq_low_water = get_suffixed_size(value);
|
||||
if (gateway.writeq_low_water < MIN_WRITEQ_LOW_WATER)
|
||||
{
|
||||
MXS_WARNING("The specified writeq low water mark %d, is smaller than the minimum allowed size %d. Changing to minimum.",
|
||||
gateway.writeq_low_water, MIN_WRITEQ_LOW_WATER);
|
||||
gateway.writeq_low_water = MIN_WRITEQ_LOW_WATER;
|
||||
}
|
||||
MXS_NOTICE("Writeq low water mark set to: %d", gateway.writeq_low_water);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("%s is an invalid value for '%s', using default %d instead.",
|
||||
@ -1913,6 +1954,8 @@ void config_set_global_defaults()
|
||||
gateway.promoted_at = 0;
|
||||
|
||||
gateway.thread_stack_size = 0;
|
||||
gateway.writeq_high_water = 0;
|
||||
gateway.writeq_low_water = 0;
|
||||
pthread_attr_t attr;
|
||||
if (pthread_attr_init(&attr) == 0)
|
||||
{
|
||||
@ -4051,6 +4094,8 @@ json_t* config_maxscale_to_json(const char* host)
|
||||
json_object_set_new(param, "connector_plugindir", json_string(get_connector_plugindir()));
|
||||
json_object_set_new(param, CN_THREADS, json_integer(config_threadcount()));
|
||||
json_object_set_new(param, CN_THREAD_STACK_SIZE, json_integer(config_thread_stack_size()));
|
||||
json_object_set_new(param, CN_WRITEQ_HIGH_WATER, json_integer(config_writeq_high_water()));
|
||||
json_object_set_new(param, CN_WRITEQ_LOW_WATER, json_integer(config_writeq_low_water()));
|
||||
|
||||
MXS_CONFIG* cnf = config_get_global_options();
|
||||
|
||||
|
@ -120,6 +120,8 @@ static void dcb_remove_from_list(DCB *dcb);
|
||||
static uint32_t dcb_poll_handler(MXS_POLL_DATA *data, int thread_id, uint32_t events);
|
||||
static uint32_t dcb_process_poll_events(DCB *dcb, uint32_t ev);
|
||||
static bool dcb_session_check(DCB *dcb, const char *);
|
||||
static int upstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata);
|
||||
static int downstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata);
|
||||
|
||||
void dcb_global_init()
|
||||
{
|
||||
@ -128,6 +130,9 @@ void dcb_global_init()
|
||||
this_unit.dcb_initialized.state = DCB_STATE_ALLOC;
|
||||
this_unit.dcb_initialized.ssl_state = SSL_HANDSHAKE_UNKNOWN;
|
||||
this_unit.dcb_initialized.poll.handler = dcb_poll_handler;
|
||||
this_unit.dcb_initialized.high_water_reached = false;
|
||||
this_unit.dcb_initialized.low_water = config_writeq_low_water();
|
||||
this_unit.dcb_initialized.high_water = config_writeq_high_water();
|
||||
this_unit.dcb_initialized.dcb_chk_tail = CHK_NUM_DCB;
|
||||
|
||||
int nthreads = config_threadcount();
|
||||
@ -508,6 +513,13 @@ dcb_connect(SERVER *server, MXS_SESSION *session, const char *protocol)
|
||||
dcb_free_all_memory(dcb);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Register upstream throttling callbacks */
|
||||
if (DCB_THROTTLING_ENABLED(dcb))
|
||||
{
|
||||
dcb_add_callback(dcb, DCB_REASON_HIGH_WATER, upstream_throttle_callback, NULL);
|
||||
dcb_add_callback(dcb, DCB_REASON_LOW_WATER, upstream_throttle_callback, NULL);
|
||||
}
|
||||
/**
|
||||
* The dcb will be addded into poll set by dcb->func.connect
|
||||
*/
|
||||
@ -878,6 +890,7 @@ dcb_log_errors_SSL(DCB *dcb, int ret)
|
||||
int
|
||||
dcb_write(DCB *dcb, GWBUF *queue)
|
||||
{
|
||||
dcb->writeqlen += gwbuf_length(queue);
|
||||
// The following guarantees that queue is not NULL
|
||||
if (!dcb_write_parameter_check(dcb, queue))
|
||||
{
|
||||
@ -888,6 +901,13 @@ dcb_write(DCB *dcb, GWBUF *queue)
|
||||
dcb->stats.n_buffered++;
|
||||
dcb_drain_writeq(dcb);
|
||||
|
||||
if (DCB_ABOVE_HIGH_WATER(dcb) && !dcb->high_water_reached)
|
||||
{
|
||||
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
|
||||
dcb->high_water_reached = true;
|
||||
dcb->stats.n_high_water++;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -1016,6 +1036,16 @@ int dcb_drain_writeq(DCB *dcb)
|
||||
dcb_call_callback(dcb, DCB_REASON_DRAINED);
|
||||
}
|
||||
|
||||
ss_dassert(dcb->writeqlen >= total_written);
|
||||
dcb->writeqlen -= total_written;
|
||||
|
||||
if (dcb->high_water_reached && DCB_BELOW_LOW_WATER(dcb))
|
||||
{
|
||||
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
|
||||
dcb->high_water_reached = false;
|
||||
dcb->stats.n_low_water++;
|
||||
}
|
||||
|
||||
return total_written;
|
||||
}
|
||||
|
||||
@ -2453,6 +2483,13 @@ dcb_accept(DCB *dcb)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Register downstream throttling callbacks */
|
||||
if (DCB_THROTTLING_ENABLED(dcb))
|
||||
{
|
||||
dcb_add_callback(client_dcb, DCB_REASON_HIGH_WATER, downstream_throttle_callback, NULL);
|
||||
dcb_add_callback(client_dcb, DCB_REASON_LOW_WATER, downstream_throttle_callback, NULL);
|
||||
}
|
||||
|
||||
if (client_dcb->service->max_connections &&
|
||||
client_dcb->service->client_count >= client_dcb->service->max_connections)
|
||||
{
|
||||
@ -3510,3 +3547,80 @@ DCB* dcb_get_current()
|
||||
{
|
||||
return this_thread.current_dcb;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief DCB callback for upstream throtting
|
||||
* Called by any backend dcb when its writeq is above high water mark or
|
||||
* it has reached high water mark and now it is below low water mark,
|
||||
* Calling `poll_remove_dcb` or `poll_add_dcb' on client dcb to throttle
|
||||
* network traffic from client to mxs.
|
||||
*
|
||||
* @param dcb Backend dcb
|
||||
* @param reason Why the callback was called
|
||||
* @param userdata Data provided when the callback was added
|
||||
* @return Always 0
|
||||
*/
|
||||
static int upstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata)
|
||||
{
|
||||
DCB *client_dcb = dcb->session->client_dcb;
|
||||
if (reason == DCB_REASON_HIGH_WATER)
|
||||
{
|
||||
poll_remove_dcb(client_dcb);
|
||||
}
|
||||
else if (reason == DCB_REASON_LOW_WATER)
|
||||
{
|
||||
poll_add_dcb(client_dcb);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool backend_dcb_remove_func(DCB *dcb, void *data)
|
||||
{
|
||||
MXS_SESSION* session = (MXS_SESSION*)data;
|
||||
|
||||
if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
|
||||
{
|
||||
poll_remove_dcb(dcb);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool backend_dcb_add_func(DCB *dcb, void *data)
|
||||
{
|
||||
MXS_SESSION* session = (MXS_SESSION*)data;
|
||||
|
||||
if (dcb->session == session && dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER)
|
||||
{
|
||||
poll_add_dcb(dcb);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief DCB callback for downstream throtting
|
||||
* Called by client dcb when its writeq is above high water mark or
|
||||
* it has reached high water mark and now it is below low water mark,
|
||||
* Calling `poll_remove_dcb` or `poll_add_dcb' on all backend dcbs to
|
||||
* throttle network traffic from server to mxs.
|
||||
*
|
||||
* @param dcb client dcb
|
||||
* @param reason Why the callback was called
|
||||
* @param userdata Data provided when the callback was added
|
||||
* @return Always 0
|
||||
*/
|
||||
static int downstream_throttle_callback(DCB *dcb, DCB_REASON reason, void *userdata)
|
||||
{
|
||||
if (reason == DCB_REASON_HIGH_WATER)
|
||||
{
|
||||
dcb_foreach(backend_dcb_remove_func, dcb->session);
|
||||
}
|
||||
else if (reason == DCB_REASON_LOW_WATER)
|
||||
{
|
||||
dcb_foreach(backend_dcb_add_func, dcb->session);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -28,6 +28,8 @@ MXS_BEGIN_DECLS
|
||||
#define DEFAULT_NTHREADS 1 /**< Default number of polling threads */
|
||||
#define DEFAULT_QUERY_RETRIES 0 /**< Number of retries for interrupted queries */
|
||||
#define DEFAULT_QUERY_RETRY_TIMEOUT 5 /**< Timeout for query retries */
|
||||
#define MIN_WRITEQ_HIGH_WATER 4096 /**< Min high water mark of dcb write queue */
|
||||
#define MIN_WRITEQ_LOW_WATER 512 /**< Min low water mark of dcb write queue */
|
||||
|
||||
/**
|
||||
* Maximum length for configuration parameter value.
|
||||
|
Reference in New Issue
Block a user