MXS-2196: Allocate a session before allocating DCBs
Allocating the session before a DCB guarantees that at no point will a DCB have a null session. This further clarifies the concept of the session and also allows the listener reference to be moved there. Ideally, the session itself would allocate and assign the client DCB but since the Listener is the only one who does it, it's acceptable for now.
This commit is contained in:
@ -147,21 +147,20 @@ static MXB_WORKER* get_dcb_owner(dcb_role_t role)
|
||||
return RoutingWorker::get_current();
|
||||
}
|
||||
|
||||
DCB::DCB(dcb_role_t role, const SListener& listener, SERVICE* service)
|
||||
DCB::DCB(dcb_role_t role, MXS_SESSION* session)
|
||||
: MXB_POLL_DATA{dcb_poll_handler, get_dcb_owner(role)}
|
||||
, dcb_role(role)
|
||||
, session(nullptr)
|
||||
, listener(listener)
|
||||
, session(session)
|
||||
, high_water(config_writeq_high_water())
|
||||
, low_water(config_writeq_low_water())
|
||||
, service(service)
|
||||
, service(session->service)
|
||||
, last_read(mxs_clock())
|
||||
{
|
||||
// TODO: Remove DCB_ROLE_INTERNAL to always have a valid listener
|
||||
if (listener)
|
||||
if (session->listener)
|
||||
{
|
||||
func = listener->protocol_func();
|
||||
authfunc = listener->auth_func();
|
||||
func = session->listener->protocol_func();
|
||||
authfunc = session->listener->auth_func();
|
||||
}
|
||||
|
||||
if (high_water && low_water)
|
||||
@ -221,9 +220,9 @@ DCB::~DCB()
|
||||
*
|
||||
* @return An available DCB or NULL if none could be allocated.
|
||||
*/
|
||||
DCB* dcb_alloc(dcb_role_t role, const SListener& listener, SERVICE* service)
|
||||
DCB* dcb_alloc(dcb_role_t role, MXS_SESSION* session)
|
||||
{
|
||||
return new(std::nothrow) DCB(role, listener, service);
|
||||
return new(std::nothrow) DCB(role, session);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -351,7 +350,7 @@ DCB* dcb_connect(SERVER* server, MXS_SESSION* session, const char* protocol)
|
||||
}
|
||||
}
|
||||
|
||||
if ((dcb = dcb_alloc(DCB_ROLE_BACKEND_HANDLER, NULL, session->service)) == NULL)
|
||||
if ((dcb = dcb_alloc(DCB_ROLE_BACKEND_HANDLER, session)) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
@ -1280,9 +1279,9 @@ void printDCB(DCB* dcb)
|
||||
{
|
||||
printf("\tUsername: %s\n", dcb->user);
|
||||
}
|
||||
if (dcb->listener)
|
||||
if (dcb->session->listener)
|
||||
{
|
||||
printf("\tProtocol: %s\n", dcb->listener->protocol());
|
||||
printf("\tProtocol: %s\n", dcb->session->listener->protocol());
|
||||
}
|
||||
if (dcb->writeq)
|
||||
{
|
||||
@ -1388,9 +1387,9 @@ void dprintOneDCB(DCB* pdcb, DCB* dcb)
|
||||
"\tUsername: %s\n",
|
||||
dcb->user);
|
||||
}
|
||||
if (dcb->listener)
|
||||
if (dcb->session->listener)
|
||||
{
|
||||
dcb_printf(pdcb, "\tProtocol: %s\n", dcb->listener->protocol());
|
||||
dcb_printf(pdcb, "\tProtocol: %s\n", dcb->session->listener->protocol());
|
||||
}
|
||||
if (dcb->writeq)
|
||||
{
|
||||
@ -1544,9 +1543,9 @@ void dprintDCB(DCB* pdcb, DCB* dcb)
|
||||
"\tUsername: %s\n",
|
||||
dcb->user);
|
||||
}
|
||||
if (dcb->listener)
|
||||
if (dcb->session->listener)
|
||||
{
|
||||
dcb_printf(pdcb, "\tProtocol: %s\n", dcb->listener->protocol());
|
||||
dcb_printf(pdcb, "\tProtocol: %s\n", dcb->session->listener->protocol());
|
||||
}
|
||||
|
||||
if (dcb->session)
|
||||
@ -2122,8 +2121,8 @@ static int dcb_create_SSL(DCB* dcb, SSL_LISTENER* ssl)
|
||||
*/
|
||||
int dcb_accept_SSL(DCB* dcb)
|
||||
{
|
||||
if ((NULL == dcb->listener || NULL == dcb->listener->ssl())
|
||||
|| (NULL == dcb->ssl && dcb_create_SSL(dcb, dcb->listener->ssl()) != 0))
|
||||
if (NULL == dcb->session->listener->ssl()
|
||||
|| (NULL == dcb->ssl && dcb_create_SSL(dcb, dcb->session->listener->ssl()) != 0))
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
@ -2435,8 +2434,7 @@ void dcb_process_idle_sessions(int thr)
|
||||
{
|
||||
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
|
||||
{
|
||||
mxb_assert(dcb->listener);
|
||||
SERVICE* service = dcb->listener->service();
|
||||
SERVICE* service = dcb->session->service;
|
||||
|
||||
if (service->conn_idle_timeout && dcb->state == DCB_STATE_POLLING)
|
||||
{
|
||||
|
@ -164,9 +164,12 @@ public:
|
||||
|
||||
using FilterList = std::vector<SessionFilter>;
|
||||
|
||||
Session(SERVICE* service, DCB* client_dcb);
|
||||
Session(const SListener& listener);
|
||||
~Session();
|
||||
|
||||
// Links a client DCB to a session
|
||||
void set_client_dcb(DCB* dcb);
|
||||
|
||||
bool setup_filters(Service* service);
|
||||
|
||||
const FilterList& get_filters() const
|
||||
|
@ -38,6 +38,7 @@
|
||||
#include <maxscale/routingworker.hh>
|
||||
|
||||
#include "internal/modules.hh"
|
||||
#include "internal/session.hh"
|
||||
|
||||
static std::list<SListener> all_listeners;
|
||||
static std::mutex listener_lock;
|
||||
@ -675,24 +676,24 @@ const char* Listener::state() const
|
||||
{
|
||||
switch (m_state)
|
||||
{
|
||||
case CREATED:
|
||||
return "Created";
|
||||
case CREATED:
|
||||
return "Created";
|
||||
|
||||
case STARTED:
|
||||
return "Running";
|
||||
case STARTED:
|
||||
return "Running";
|
||||
|
||||
case STOPPED:
|
||||
return "Stopped";
|
||||
case STOPPED:
|
||||
return "Stopped";
|
||||
|
||||
case FAILED:
|
||||
return "Failed";
|
||||
case FAILED:
|
||||
return "Failed";
|
||||
|
||||
case DESTROYED:
|
||||
return "Destroyed";
|
||||
case DESTROYED:
|
||||
return "Destroyed";
|
||||
|
||||
default:
|
||||
mxb_assert(!true);
|
||||
return "Unknown";
|
||||
default:
|
||||
mxb_assert(!true);
|
||||
return "Unknown";
|
||||
}
|
||||
}
|
||||
|
||||
@ -864,17 +865,21 @@ DCB* Listener::accept_one_dcb()
|
||||
{
|
||||
configure_network_socket(c_sock, client_conn.ss_family);
|
||||
|
||||
client_dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, m_self, m_service);
|
||||
mxs::Session* session = new(std::nothrow) mxs::Session(m_self);
|
||||
client_dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, session);
|
||||
|
||||
if (client_dcb == NULL)
|
||||
if (!session || !client_dcb)
|
||||
{
|
||||
MXS_ERROR("Failed to create DCB object for client connection.");
|
||||
MXS_ERROR("Failed to create session objects for client connection.");
|
||||
close(c_sock);
|
||||
delete session;
|
||||
dcb_close(client_dcb);
|
||||
return NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
session->set_client_dcb(client_dcb);
|
||||
client_dcb->fd = c_sock;
|
||||
client_dcb->session = session_alloc(m_service, client_dcb);
|
||||
|
||||
// get client address
|
||||
if (client_conn.ss_family == AF_UNIX)
|
||||
@ -907,25 +912,28 @@ DCB* Listener::accept_one_dcb()
|
||||
}
|
||||
|
||||
/** Allocate DCB specific authentication data */
|
||||
if (client_dcb->authfunc.create
|
||||
&& (client_dcb->authenticator_data =
|
||||
client_dcb->authfunc.create(client_dcb->listener->auth_instance())) == NULL)
|
||||
if (m_auth_func.create
|
||||
&& (client_dcb->authenticator_data = m_auth_func.create(m_auth_instance)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Failed to create authenticator for client DCB");
|
||||
delete session;
|
||||
dcb_close(client_dcb);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (client_dcb->service->max_connections
|
||||
&& client_dcb->service->client_count >= client_dcb->service->max_connections)
|
||||
if (m_service->max_connections && m_service->client_count >= m_service->max_connections)
|
||||
{
|
||||
// TODO: If connections can be queued, this is the place to put the
|
||||
// TODO: connection on that queue.
|
||||
if (client_dcb->func.connlimit)
|
||||
if (m_proto_func.connlimit)
|
||||
{
|
||||
client_dcb->func.connlimit(client_dcb, client_dcb->service->max_connections);
|
||||
m_proto_func.connlimit(client_dcb, m_service->max_connections);
|
||||
}
|
||||
|
||||
// TODO: This is never used as the client connection is not up yet
|
||||
client_dcb->session->close_reason = SESSION_CLOSE_TOO_MANY_CONNECTIONS;
|
||||
|
||||
delete session;
|
||||
dcb_close(client_dcb);
|
||||
client_dcb = NULL;
|
||||
}
|
||||
|
@ -264,13 +264,14 @@ DCB* server_get_persistent(SERVER* server, const char* user, const char* ip, con
|
||||
dcb = server->persistent[id];
|
||||
while (dcb)
|
||||
{
|
||||
// TODO: Fix this, it won't work (DCB in pool has no session)
|
||||
if (dcb->user
|
||||
&& dcb->remote
|
||||
&& ip
|
||||
&& !dcb->dcb_errhandle_called
|
||||
&& 0 == strcmp(dcb->user, user)
|
||||
&& 0 == strcmp(dcb->remote, ip)
|
||||
&& 0 == strcmp(dcb->listener->protocol(), protocol))
|
||||
&& 0 == strcmp(dcb->session->listener->protocol(), protocol))
|
||||
{
|
||||
if (NULL == previous)
|
||||
{
|
||||
|
@ -77,7 +77,7 @@ static void session_simple_free(MXS_SESSION* session, DCB* dcb);
|
||||
static void session_add_to_all_list(MXS_SESSION* session);
|
||||
static MXS_SESSION* session_find_free();
|
||||
static void session_final_free(MXS_SESSION* session);
|
||||
static void session_deliver_response(MXS_SESSION* session);
|
||||
static void session_deliver_response(MXS_SESSION* session);
|
||||
|
||||
/**
|
||||
* The clientReply of the session.
|
||||
@ -88,13 +88,14 @@ static void session_deliver_response(MXS_SESSION* session);
|
||||
*/
|
||||
static int session_reply(MXS_FILTER* inst, MXS_FILTER_SESSION* session, GWBUF* data);
|
||||
|
||||
MXS_SESSION::MXS_SESSION(DCB* client_dcb)
|
||||
MXS_SESSION::MXS_SESSION(const SListener& listener)
|
||||
: state(SESSION_STATE_READY)
|
||||
, ses_id(session_get_next_id())
|
||||
, client_dcb(client_dcb)
|
||||
, client_dcb(nullptr)
|
||||
, listener(listener)
|
||||
, router_session(nullptr)
|
||||
, stats{time(0)}
|
||||
, service(client_dcb->service)
|
||||
, service(listener ? listener->service() : nullptr)
|
||||
, head{}
|
||||
, tail{}
|
||||
, refcount(1)
|
||||
@ -112,11 +113,6 @@ MXS_SESSION::~MXS_SESSION()
|
||||
{
|
||||
}
|
||||
|
||||
MXS_SESSION* session_alloc(SERVICE* service, DCB* client_dcb)
|
||||
{
|
||||
return new(std::nothrow) Session(service, client_dcb);
|
||||
}
|
||||
|
||||
bool session_start(MXS_SESSION* session)
|
||||
{
|
||||
session->router_session = session->service->router->newSession(session->service->router_instance,
|
||||
@ -179,7 +175,6 @@ void session_link_backend_dcb(MXS_SESSION* session, DCB* dcb)
|
||||
|
||||
mxb::atomic::add(&session->refcount, 1);
|
||||
dcb->session = session;
|
||||
dcb->service = session->service;
|
||||
/** Move this DCB under the same thread */
|
||||
dcb->owner = session->client_dcb->owner;
|
||||
|
||||
@ -1097,10 +1092,10 @@ const char* session_get_close_reason(const MXS_SESSION* session)
|
||||
}
|
||||
}
|
||||
|
||||
Session::Session(SERVICE* service, DCB* client_dcb)
|
||||
: MXS_SESSION(client_dcb)
|
||||
Session::Session(const SListener& listener)
|
||||
: MXS_SESSION(listener)
|
||||
{
|
||||
if (service->retain_last_statements != -1) // Explicitly set for the service
|
||||
if (service->retain_last_statements != -1) // Explicitly set for the service
|
||||
{
|
||||
m_retain_last_statements = service->retain_last_statements;
|
||||
}
|
||||
@ -1124,6 +1119,13 @@ Session::~Session()
|
||||
}
|
||||
}
|
||||
|
||||
void Session::set_client_dcb(DCB* dcb)
|
||||
{
|
||||
mxb_assert(client_dcb == nullptr);
|
||||
mxb_assert(dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER);
|
||||
client_dcb = dcb;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -1172,7 +1174,6 @@ bool get_cmd_and_stmt(GWBUF* pBuffer, const char** ppCmd, char** ppStmt, int* pL
|
||||
|
||||
return deallocate;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void Session::dump_statements() const
|
||||
|
@ -54,7 +54,7 @@ int ssl_authenticate_client(DCB* dcb, bool is_capable)
|
||||
const char* remote = dcb->remote ? dcb->remote : "";
|
||||
const char* service = (dcb->service && dcb->service->name) ? dcb->service->name : "";
|
||||
|
||||
if (NULL == dcb->listener || NULL == dcb->listener->ssl())
|
||||
if (NULL == dcb->session->listener || NULL == dcb->session->listener->ssl())
|
||||
{
|
||||
/* Not an SSL connection on account of listener configuration */
|
||||
return SSL_AUTH_CHECKS_OK;
|
||||
@ -133,8 +133,8 @@ bool ssl_is_connection_healthy(DCB* dcb)
|
||||
* then everything is as we wish. Otherwise, either there is a problem or
|
||||
* more to be done.
|
||||
*/
|
||||
return NULL == dcb->listener
|
||||
|| NULL == dcb->listener->ssl()
|
||||
return NULL == dcb->session->listener
|
||||
|| NULL == dcb->session->listener->ssl()
|
||||
|| dcb->ssl_state == SSL_ESTABLISHED;
|
||||
}
|
||||
|
||||
@ -171,7 +171,7 @@ bool ssl_check_data_to_process(DCB* dcb)
|
||||
*/
|
||||
bool ssl_required_by_dcb(DCB* dcb)
|
||||
{
|
||||
return NULL != dcb->listener && NULL != dcb->listener->ssl();
|
||||
return NULL != dcb->session->listener && NULL != dcb->session->listener->ssl();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -186,8 +186,8 @@ bool ssl_required_by_dcb(DCB* dcb)
|
||||
*/
|
||||
bool ssl_required_but_not_negotiated(DCB* dcb)
|
||||
{
|
||||
return NULL != dcb->listener
|
||||
&& NULL != dcb->listener->ssl()
|
||||
return NULL != dcb->session->listener
|
||||
&& NULL != dcb->session->listener->ssl()
|
||||
&& SSL_HANDSHAKE_UNKNOWN == dcb->ssl_state;
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,7 @@ static int test1()
|
||||
DCB* dcb;
|
||||
/* Single buffer tests */
|
||||
fprintf(stderr, "testdcb : creating buffer with type DCB_ROLE_INTERNAL");
|
||||
dcb = dcb_alloc(DCB_ROLE_INTERNAL, nullptr, nullptr);
|
||||
dcb = dcb_alloc(DCB_ROLE_INTERNAL, nullptr);
|
||||
printDCB(dcb);
|
||||
fprintf(stderr, "\t..done\nAllocated dcb.");
|
||||
// TODO: Without running workers, the following will hang. As it does not
|
||||
|
@ -49,15 +49,12 @@ static int test1()
|
||||
DCB* dcb;
|
||||
int eno = 0;
|
||||
|
||||
SERVICE service;
|
||||
service.routerModule = (char*)"required by a check in dcb.cc";
|
||||
|
||||
/* Poll tests */
|
||||
fprintf(stderr,
|
||||
"testpoll : Initialise the polling system.");
|
||||
init_test_env(NULL);
|
||||
fprintf(stderr, "\t..done\nAdd a DCB");
|
||||
dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, nullptr, nullptr);
|
||||
dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, nullptr);
|
||||
|
||||
if (dcb == NULL)
|
||||
{
|
||||
@ -66,7 +63,6 @@ static int test1()
|
||||
}
|
||||
|
||||
dcb->fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
dcb->service = &service;
|
||||
|
||||
if (dcb->fd < 0)
|
||||
{
|
||||
@ -101,8 +97,6 @@ static int test1()
|
||||
sleep(10);
|
||||
// TODO, fix this for workers: poll_shutdown();
|
||||
fprintf(stderr, "\t..done\nTidy up.");
|
||||
SERVICE my_service = {};
|
||||
dcb->service = &my_service;
|
||||
dcb_close(dcb);
|
||||
fprintf(stderr, "\t..done\n");
|
||||
|
||||
|
Reference in New Issue
Block a user