MXS-2220 Move persistent DCB settings and handling to the private Server-class

This commit is contained in:
Esa Korhonen
2018-12-12 15:52:45 +02:00
parent 5a9e84d39a
commit 31ceee6d22
8 changed files with 92 additions and 66 deletions

View File

@ -39,8 +39,6 @@ const int MAINTENANCE_ON = 100;
const int MAINTENANCE_FLAG_NOCHECK = 0; const int MAINTENANCE_FLAG_NOCHECK = 0;
const int MAINTENANCE_FLAG_CHECK = -1; const int MAINTENANCE_FLAG_CHECK = -1;
struct DCB;
/* Custom server parameters. These can be used by modules for e.g. weighting routing decisions. */ /* Custom server parameters. These can be used by modules for e.g. weighting routing decisions. */
struct SERVER_PARAM struct SERVER_PARAM
{ {
@ -119,8 +117,6 @@ public:
// Other settings // Other settings
char monuser[MAX_MONUSER_LEN] = {'\0'}; /**< Monitor username, overrides monitor setting */ char monuser[MAX_MONUSER_LEN] = {'\0'}; /**< Monitor username, overrides monitor setting */
char monpw[MAX_MONPW_LEN] = {'\0'}; /**< Monitor password, overrides monitor setting */ char monpw[MAX_MONPW_LEN] = {'\0'}; /**< Monitor password, overrides monitor setting */
long persistpoolmax = 0; /**< Maximum size of persistent connections pool */
long persistmaxtime = 0; /**< Maximum number of seconds connection can live */
bool proxy_protocol = false; /**< Send proxy-protocol header to backends when connecting bool proxy_protocol = false; /**< Send proxy-protocol header to backends when connecting
* routing sessions. */ * routing sessions. */
SERVER_PARAM* parameters = nullptr; /**< Additional custom parameters which may affect routing SERVER_PARAM* parameters = nullptr; /**< Additional custom parameters which may affect routing
@ -129,7 +125,6 @@ public:
bool is_active = false; /**< Server is active and has not been "destroyed" */ bool is_active = false; /**< Server is active and has not been "destroyed" */
void* auth_instance = nullptr; /**< Authenticator instance data */ void* auth_instance = nullptr; /**< Authenticator instance data */
SSL_LISTENER* server_ssl = nullptr; /**< SSL data */ SSL_LISTENER* server_ssl = nullptr; /**< SSL data */
DCB** persistent = nullptr; /**< List of unused persistent connections to the server */
uint8_t charset = DEFAULT_CHARSET;/**< Character set. Read from backend and sent to client. */ uint8_t charset = DEFAULT_CHARSET;/**< Character set. Read from backend and sent to client. */
// Statistics and events // Statistics and events
@ -180,6 +175,13 @@ public:
*/ */
virtual void set_disk_space_limits(const MxsDiskSpaceThreshold& new_limits) = 0; virtual void set_disk_space_limits(const MxsDiskSpaceThreshold& new_limits) = 0;
/**
* Is persistent connection pool enabled.
*
* @return True if enabled
*/
virtual bool persistent_conns_enabled() const = 0;
protected: protected:
SERVER() SERVER()
{ {
@ -478,11 +480,6 @@ extern void server_transfer_status(SERVER* dest_server, const SERVER* source_
extern void server_add_mon_user(SERVER* server, const char* user, const char* passwd); extern void server_add_mon_user(SERVER* server, const char* user, const char* passwd);
extern size_t server_get_parameter(const SERVER* server, const char* name, char* out, size_t size); extern size_t server_get_parameter(const SERVER* server, const char* name, char* out, size_t size);
extern void server_update_credentials(SERVER* server, const char* user, const char* passwd); extern void server_update_credentials(SERVER* server, const char* user, const char* passwd);
extern DCB* server_get_persistent(SERVER* server,
const char* user,
const char* ip,
const char* protocol,
int id);
extern void server_update_address(SERVER* server, const char* address); extern void server_update_address(SERVER* server, const char* address);
extern void server_update_port(SERVER* server, unsigned short port); extern void server_update_port(SERVER* server, unsigned short port);
extern void server_update_extra_port(SERVER* server, unsigned short port); extern void server_update_extra_port(SERVER* server, unsigned short port);

View File

@ -447,7 +447,7 @@ bool param_is_valid(const MXS_MODULE_PARAM* basic,
|| config_param_is_valid(module, key, value, NULL); || config_param_is_valid(module, key, value, NULL);
} }
bool runtime_alter_server(SERVER* server, const char* key, const char* value) bool runtime_alter_server(Server* server, const char* key, const char* value)
{ {
if (!value[0]) if (!value[0])
{ {
@ -505,14 +505,14 @@ bool runtime_alter_server(SERVER* server, const char* key, const char* value)
{ {
if (is_valid_integer(value)) if (is_valid_integer(value))
{ {
server->persistpoolmax = atoi(value); server->set_persistpoolmax(atoi(value));
} }
} }
else if (strcmp(key, CN_PERSISTMAXTIME) == 0) else if (strcmp(key, CN_PERSISTMAXTIME) == 0)
{ {
if (is_valid_integer(value)) if (is_valid_integer(value))
{ {
server->persistmaxtime = atoi(value); server->set_persistmaxtime(atoi(value));
} }
} }
else else
@ -1901,7 +1901,7 @@ bool server_to_object_relations(SERVER* server, json_t* old_json, json_t* new_js
return rval; return rval;
} }
bool runtime_alter_server_from_json(SERVER* server, json_t* new_json) bool runtime_alter_server_from_json(Server* server, json_t* new_json)
{ {
bool rval = false; bool rval = false;
std::unique_ptr<json_t> old_json(server_to_json(server, "")); std::unique_ptr<json_t> old_json(server_to_json(server, ""));

View File

@ -46,12 +46,12 @@
#include <maxscale/listener.hh> #include <maxscale/listener.hh>
#include <maxscale/poll.hh> #include <maxscale/poll.hh>
#include <maxscale/router.hh> #include <maxscale/router.hh>
#include <maxscale/server.hh>
#include <maxscale/service.hh> #include <maxscale/service.hh>
#include <maxscale/utils.h> #include <maxscale/utils.h>
#include <maxscale/routingworker.hh> #include <maxscale/routingworker.hh>
#include "internal/modules.hh" #include "internal/modules.hh"
#include "internal/server.hh"
#include "internal/session.hh" #include "internal/session.hh"
using maxscale::RoutingWorker; using maxscale::RoutingWorker;
@ -307,28 +307,25 @@ static void dcb_stop_polling_and_shutdown(DCB* dcb)
* If successful the new dcb will be put in * If successful the new dcb will be put in
* epoll set by dcb->func.connect * epoll set by dcb->func.connect
* *
* @param server The server to connect to * @param srv The server to connect to
* @param session The session this connection is being made for * @param session The session this connection is being made for
* @param protocol The protocol module to use * @param protocol The protocol module to use
* @return The new allocated dcb or NULL if the DCB was not connected * @return The new allocated dcb or NULL if the DCB was not connected
*/ */
DCB* dcb_connect(SERVER* server, MXS_SESSION* session, const char* protocol) DCB* dcb_connect(SERVER* srv, MXS_SESSION* session, const char* protocol)
{ {
DCB* dcb; DCB* dcb;
MXS_PROTOCOL* funcs; MXS_PROTOCOL* funcs;
int fd; int fd;
int rc; int rc;
const char* user; const char* user;
Server* server = static_cast<Server*>(srv);
user = session_get_user(session); user = session_get_user(session);
if (user && strlen(user)) if (user && strlen(user))
{ {
MXS_DEBUG("Looking for persistent connection DCB user %s protocol %s", user, protocol); MXS_DEBUG("Looking for persistent connection DCB user %s protocol %s", user, protocol);
dcb = server_get_persistent(server, dcb = server->get_persistent_dcb(user, session->client_dcb->remote, protocol,
user, static_cast<RoutingWorker*>(session->client_dcb->owner)->id());
session->client_dcb->remote,
protocol,
static_cast<RoutingWorker*>(session->client_dcb->owner)->id());
if (dcb) if (dcb)
{ {
/** /**
@ -1215,17 +1212,18 @@ void dcb_final_close(DCB* dcb)
static bool dcb_maybe_add_persistent(DCB* dcb) static bool dcb_maybe_add_persistent(DCB* dcb)
{ {
RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner); RoutingWorker* owner = static_cast<RoutingWorker*>(dcb->owner);
Server* server = static_cast<Server*>(dcb->server);
if (dcb->user != NULL if (dcb->user != NULL
&& (dcb->func.established == NULL || dcb->func.established(dcb)) && (dcb->func.established == NULL || dcb->func.established(dcb))
&& strlen(dcb->user) && strlen(dcb->user)
&& dcb->server && server
&& dcb->session && dcb->session
&& session_valid_for_pool(dcb->session) && session_valid_for_pool(dcb->session)
&& dcb->server->persistpoolmax && server->persistpoolmax()
&& (dcb->server->status & SERVER_RUNNING) && (server->status & SERVER_RUNNING)
&& !dcb->dcb_errhandle_called && !dcb->dcb_errhandle_called
&& dcb_persistent_clean_count(dcb, owner->id(), false) < dcb->server->persistpoolmax && dcb_persistent_clean_count(dcb, owner->id(), false) < server->persistpoolmax()
&& mxb::atomic::load(&dcb->server->stats.n_persistent) < dcb->server->persistpoolmax) && mxb::atomic::load(&server->stats.n_persistent) < server->persistpoolmax())
{ {
DCB_CALLBACK* loopcallback; DCB_CALLBACK* loopcallback;
MXS_DEBUG("Adding DCB to persistent pool, user %s.", dcb->user); MXS_DEBUG("Adding DCB to persistent pool, user %s.", dcb->user);
@ -1250,8 +1248,8 @@ static bool dcb_maybe_add_persistent(DCB* dcb)
dcb->delayq = NULL; dcb->delayq = NULL;
dcb->writeq = NULL; dcb->writeq = NULL;
dcb->nextpersistent = dcb->server->persistent[owner->id()]; dcb->nextpersistent = server->persistent[owner->id()];
dcb->server->persistent[owner->id()] = dcb; server->persistent[owner->id()] = dcb;
mxb::atomic::add(&dcb->server->stats.n_persistent, 1); mxb::atomic::add(&dcb->server->stats.n_persistent, 1);
mxb::atomic::add(&dcb->server->stats.n_current, -1, mxb::atomic::RELAXED); mxb::atomic::add(&dcb->server->stats.n_current, -1, mxb::atomic::RELAXED);
return true; return true;
@ -1963,7 +1961,7 @@ int dcb_persistent_clean_count(DCB* dcb, int id, bool cleanall)
int count = 0; int count = 0;
if (dcb && dcb->server) if (dcb && dcb->server)
{ {
SERVER* server = dcb->server; Server* server = static_cast<Server*>(dcb->server);
DCB* previousdcb = NULL; DCB* previousdcb = NULL;
DCB* persistentdcb, * nextdcb; DCB* persistentdcb, * nextdcb;
DCB* disposals = NULL; DCB* disposals = NULL;
@ -1974,10 +1972,10 @@ int dcb_persistent_clean_count(DCB* dcb, int id, bool cleanall)
nextdcb = persistentdcb->nextpersistent; nextdcb = persistentdcb->nextpersistent;
if (cleanall if (cleanall
|| persistentdcb->dcb_errhandle_called || persistentdcb->dcb_errhandle_called
|| count >= server->persistpoolmax || count >= server->persistpoolmax()
|| persistentdcb->server == NULL || persistentdcb->server == NULL
|| !(persistentdcb->server->status & SERVER_RUNNING) || !(persistentdcb->server->status & SERVER_RUNNING)
|| (time(NULL) - persistentdcb->persistentstart) > server->persistmaxtime) || (time(NULL) - persistentdcb->persistentstart) > server->persistmaxtime())
{ {
/* Remove from persistent pool */ /* Remove from persistent pool */
if (previousdcb) if (previousdcb)

View File

@ -20,12 +20,12 @@
#include <maxscale/adminusers.h> #include <maxscale/adminusers.h>
#include <maxscale/monitor.hh> #include <maxscale/monitor.hh>
#include <maxscale/server.hh>
#include <maxscale/service.hh> #include <maxscale/service.hh>
#include "service.hh" #include "service.hh"
#include "filter.hh" #include "filter.hh"
class Server;
/** /**
* @brief Log error to be returned to client * @brief Log error to be returned to client
@ -104,7 +104,7 @@ bool runtime_unlink_server(SERVER* server, const char* target);
* @param value New value * @param value New value
* @return True if @c key was one of the supported parameters * @return True if @c key was one of the supported parameters
*/ */
bool runtime_alter_server(SERVER* server, const char* key, const char* value); bool runtime_alter_server(Server* server, const char* key, const char* value);
/** /**
* @brief Enable SSL for a server * @brief Enable SSL for a server
@ -281,7 +281,7 @@ SERVER* runtime_create_server_from_json(json_t* json);
* *
* @return True if the server was successfully modified to represent @c new_json * @return True if the server was successfully modified to represent @c new_json
*/ */
bool runtime_alter_server_from_json(SERVER* server, json_t* new_json); bool runtime_alter_server_from_json(Server* server, json_t* new_json);
/** /**
* @brief Alter server relationships * @brief Alter server relationships

View File

@ -48,6 +48,26 @@ public:
void response_time_add(double ave, int num_samples); void response_time_add(double ave, int num_samples);
long persistpoolmax() const
{
return m_settings.persistpoolmax;
}
void set_persistpoolmax(long persistpoolmax)
{
m_settings.persistpoolmax = persistpoolmax;
}
long persistmaxtime() const
{
return m_settings.persistmaxtime;
}
void set_persistmaxtime(long persistmaxtime)
{
m_settings.persistmaxtime = persistmaxtime;
}
bool have_disk_space_limits() const override bool have_disk_space_limits() const override
{ {
std::lock_guard<std::mutex> guard(m_settings.lock); std::lock_guard<std::mutex> guard(m_settings.lock);
@ -66,6 +86,24 @@ public:
m_settings.disk_space_limits = new_limits; m_settings.disk_space_limits = new_limits;
} }
bool persistent_conns_enabled() const override
{
return m_settings.persistpoolmax > 0;
}
/**
* Get a DCB from the persistent connection pool, if possible
*
* @param user The name of the user needing the connection
* @param ip Client IP address
* @param protocol The name of the protocol needed for the connection
* @param id Thread ID
*
* @return A DCB or NULL if no connection is found
*/
DCB* get_persistent_dcb(const std::string& user, const std::string& ip, const std::string& protocol,
int id);
/** /**
* Print server details to a dcb. * Print server details to a dcb.
* *
@ -131,16 +169,20 @@ public:
static void dListServers(DCB*); static void dListServers(DCB*);
mutable std::mutex m_lock; mutable std::mutex m_lock;
DCB** persistent = nullptr; /**< List of unused persistent connections to the server */
private: private:
struct Settings struct Settings
{ {
mutable std::mutex lock; /**< Protects array-like settings from concurrent access */ mutable std::mutex lock; /**< Protects array-like settings from concurrent access */
MxsDiskSpaceThreshold disk_space_limits; /**< Disk space thresholds */ MxsDiskSpaceThreshold disk_space_limits; /**< Disk space thresholds */
long persistpoolmax = 0; /**< Maximum size of persistent connections pool */
long persistmaxtime = 0; /**< Maximum number of seconds connection can live */
}; };
Settings m_settings; /**< Server settings */
maxbase::EMAverage m_response_time; /**< Response time calculations for this server */ maxbase::EMAverage m_response_time; /**< Response time calculations for this server */
Settings m_settings; /**< Server settings */
}; };
void server_free(Server* server); void server_free(Server* server);

View File

@ -23,7 +23,6 @@
#include <maxscale/jansson.hh> #include <maxscale/jansson.hh>
#include <maxscale/json_api.h> #include <maxscale/json_api.h>
#include <maxscale/modulecmd.hh> #include <maxscale/modulecmd.hh>
#include <maxscale/server.hh>
#include <maxscale/routingworker.hh> #include <maxscale/routingworker.hh>
#include "internal/config_runtime.hh" #include "internal/config_runtime.hh"
@ -33,6 +32,7 @@
#include "internal/modules.hh" #include "internal/modules.hh"
#include "internal/monitor.hh" #include "internal/monitor.hh"
#include "internal/query_classifier.hh" #include "internal/query_classifier.hh"
#include "internal/server.hh"
#include "internal/service.hh" #include "internal/service.hh"
#include "internal/session.hh" #include "internal/session.hh"
@ -299,7 +299,7 @@ HttpResponse cb_create_server(const HttpRequest& request)
HttpResponse cb_alter_server(const HttpRequest& request) HttpResponse cb_alter_server(const HttpRequest& request)
{ {
SERVER* server = server_find_by_unique_name(request.uri_part(1).c_str()); auto server = Server::find_by_unique_name(request.uri_part(1));
mxb_assert(server && request.get_json()); mxb_assert(server && request.get_json());
if (runtime_alter_server_from_json(server, request.get_json())) if (runtime_alter_server_from_json(server, request.get_json()))

View File

@ -153,8 +153,8 @@ Server* Server::server_alloc(const char* name, MXS_CONFIG_PARAMETER* params)
server->extra_port = config_get_integer(params, CN_EXTRA_PORT); server->extra_port = config_get_integer(params, CN_EXTRA_PORT);
server->protocol = my_protocol; server->protocol = my_protocol;
server->authenticator = my_authenticator; server->authenticator = my_authenticator;
server->persistpoolmax = config_get_integer(params, CN_PERSISTPOOLMAX); server->m_settings.persistpoolmax = config_get_integer(params, CN_PERSISTPOOLMAX);
server->persistmaxtime = config_get_integer(params, CN_PERSISTMAXTIME); server->m_settings.persistmaxtime = config_get_integer(params, CN_PERSISTMAXTIME);
server->proxy_protocol = config_get_bool(params, CN_PROXY_PROTOCOL); server->proxy_protocol = config_get_bool(params, CN_PROXY_PROTOCOL);
server->is_active = true; server->is_active = true;
server->auth_instance = auth_instance; server->auth_instance = auth_instance;
@ -218,21 +218,10 @@ void server_free(Server* server)
delete server; delete server;
} }
/** DCB* Server::get_persistent_dcb(const string& user, const string& ip, const string& protocol, int id)
* Get a DCB from the persistent connection pool, if possible
*
* @param server The server to set the name on
* @param user The name of the user needing the connection
* @param ip Client IP address
* @param protocol The name of the protocol needed for the connection
* @param id Thread ID
*
* @return A DCB or NULL if no connection is found
*/
DCB* server_get_persistent(SERVER* server, const char* user, const char* ip, const char* protocol, int id)
{ {
DCB* dcb, * previous = NULL; DCB* dcb, * previous = NULL;
Server* server = this;
if (server->persistent[id] if (server->persistent[id]
&& dcb_persistent_clean_count(server->persistent[id], id, false) && dcb_persistent_clean_count(server->persistent[id], id, false)
&& server->persistent[id] // Check after cleaning && server->persistent[id] // Check after cleaning
@ -245,11 +234,11 @@ DCB* server_get_persistent(SERVER* server, const char* user, const char* ip, con
{ {
if (dcb->user if (dcb->user
&& dcb->remote && dcb->remote
&& ip && !ip.empty()
&& !dcb->dcb_errhandle_called && !dcb->dcb_errhandle_called
&& 0 == strcmp(dcb->user, user) && user == dcb->user
&& 0 == strcmp(dcb->remote, ip) && ip == dcb->remote
&& 0 == strcmp(dcb->server->protocol, protocol)) && protocol == dcb->server->protocol)
{ {
if (NULL == previous) if (NULL == previous)
{ {
@ -420,7 +409,7 @@ void Server::dprintAllServersJson(DCB* dcb)
class CleanupTask : public Worker::Task class CleanupTask : public Worker::Task
{ {
public: public:
CleanupTask(const SERVER* server) CleanupTask(const Server* server)
: m_server(server) : m_server(server)
{ {
} }
@ -435,7 +424,7 @@ public:
} }
private: private:
const SERVER* m_server; /**< Server to clean up */ const Server* m_server; /**< Server to clean up */
}; };
/** /**
@ -445,7 +434,7 @@ private:
* *
* @param server Server to clean up * @param server Server to clean up
*/ */
static void cleanup_persistent_connections(const SERVER* server) static void cleanup_persistent_connections(const Server* server)
{ {
CleanupTask task(server); CleanupTask task(server);
RoutingWorker::execute_concurrently(task); RoutingWorker::execute_concurrently(task);
@ -525,14 +514,14 @@ void Server::print_to_dcb(DCB* dcb) const
} }
dcb_printf(dcb, "\tAdaptive avg. select time: %s\n", ave_os.str().c_str()); dcb_printf(dcb, "\tAdaptive avg. select time: %s\n", ave_os.str().c_str());
if (server->persistpoolmax) if (server->m_settings.persistpoolmax)
{ {
dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent); dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent);
cleanup_persistent_connections(server); cleanup_persistent_connections(server);
dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent); dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent);
dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax); dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax);
dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax); dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->m_settings.persistpoolmax);
dcb_printf(dcb, "\tPersistent max time (secs): %ld\n", server->persistmaxtime); dcb_printf(dcb, "\tPersistent max time (secs): %ld\n", server->m_settings.persistmaxtime);
dcb_printf(dcb, "\tConnections taken from pool: %lu\n", server->stats.n_from_pool); dcb_printf(dcb, "\tConnections taken from pool: %lu\n", server->stats.n_from_pool);
double d = (double)server->stats.n_from_pool / (double)(server->stats.n_connections double d = (double)server->stats.n_from_pool / (double)(server->stats.n_connections
+ server->stats.n_from_pool + 1); + server->stats.n_from_pool + 1);

View File

@ -1234,7 +1234,7 @@ static int gw_MySQLWrite_backend(DCB* dcb, GWBUF* queue)
prepare_for_write(dcb, queue); prepare_for_write(dcb, queue);
if (cmd == MXS_COM_QUIT && dcb->server->persistpoolmax) if (cmd == MXS_COM_QUIT && dcb->server->persistent_conns_enabled())
{ {
/** We need to keep the pooled connections alive so we just ignore the COM_QUIT packet */ /** We need to keep the pooled connections alive so we just ignore the COM_QUIT packet */
gwbuf_free(queue); gwbuf_free(queue);
@ -1432,7 +1432,7 @@ static int backend_write_delayqueue(DCB* dcb, GWBUF* buffer)
int rc = 1; int rc = 1;
if (MYSQL_IS_COM_QUIT(((uint8_t*)GWBUF_DATA(buffer))) && dcb->server->persistpoolmax) if (MYSQL_IS_COM_QUIT(((uint8_t*)GWBUF_DATA(buffer))) && dcb->server->persistent_conns_enabled())
{ {
/** We need to keep the pooled connections alive so we just ignore the COM_QUIT packet */ /** We need to keep the pooled connections alive so we just ignore the COM_QUIT packet */
gwbuf_free(buffer); gwbuf_free(buffer);