MXS-1951: Move worker selection into Listener
The worker to which the client DCB is assigned to is now chosen by the Listener. This makes the protocol code simpler which is always a good thing.
This commit is contained in:
@ -1024,7 +1024,10 @@ uint32_t Listener::poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_
|
|||||||
|
|
||||||
while ((client_dcb = listener->accept_one_dcb()))
|
while ((client_dcb = listener->accept_one_dcb()))
|
||||||
{
|
{
|
||||||
|
auto worker = mxs::RoutingWorker::pick_worker();
|
||||||
|
worker->execute([listener, client_dcb]() {
|
||||||
listener->m_proto_func.accept(client_dcb);
|
listener->m_proto_func.accept(client_dcb);
|
||||||
|
}, mxs::RoutingWorker::EXECUTE_AUTO);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -72,7 +72,6 @@ static void mysql_client_auth_error_handling(DCB* dcb, int auth_val, i
|
|||||||
static int gw_read_do_authentication(DCB* dcb, GWBUF* read_buffer, int nbytes_read);
|
static int gw_read_do_authentication(DCB* dcb, GWBUF* read_buffer, int nbytes_read);
|
||||||
static int gw_read_normal_data(DCB* dcb, GWBUF* read_buffer, int nbytes_read);
|
static int gw_read_normal_data(DCB* dcb, GWBUF* read_buffer, int nbytes_read);
|
||||||
static int gw_read_finish_processing(DCB* dcb, GWBUF* read_buffer, uint64_t capabilities);
|
static int gw_read_finish_processing(DCB* dcb, GWBUF* read_buffer, uint64_t capabilities);
|
||||||
static void gw_process_one_new_client(DCB* client_dcb);
|
|
||||||
static spec_com_res_t process_special_commands(DCB* client_dcb, GWBUF* read_buffer, int nbytes_read);
|
static spec_com_res_t process_special_commands(DCB* client_dcb, GWBUF* read_buffer, int nbytes_read);
|
||||||
static spec_com_res_t handle_query_kill(DCB* dcb,
|
static spec_com_res_t handle_query_kill(DCB* dcb,
|
||||||
GWBUF* read_buffer,
|
GWBUF* read_buffer,
|
||||||
@ -1379,59 +1378,32 @@ return_1:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @node Accept a new connection, using the DCB code for the basic work
|
* @node Accept and prepare a new client connection
|
||||||
*
|
*
|
||||||
* For as long as dcb_accept can return new client DCBs for new connections,
|
* @param dcb The client DCB that was created
|
||||||
* continue to loop. The code will always give a failure return, since it
|
|
||||||
* continues to try to create new connections until a failure occurs.
|
|
||||||
*
|
|
||||||
* @param listener - The Listener DCB that picks up new connection requests
|
|
||||||
* @return 0 in success, 1 in failure
|
|
||||||
*
|
*
|
||||||
|
* @return always 1
|
||||||
*/
|
*/
|
||||||
int gw_MySQLAccept(DCB* client_dcb)
|
int gw_MySQLAccept(DCB* dcb)
|
||||||
{
|
{
|
||||||
gw_process_one_new_client(client_dcb);
|
dcb->protocol = mysql_protocol_init(dcb, dcb->fd);
|
||||||
return 1;
|
MXS_ABORT_IF_NULL(dcb->protocol);
|
||||||
}
|
|
||||||
|
|
||||||
static void gw_process_one_new_client(DCB* client_dcb)
|
if (poll_add_dcb(dcb) == -1)
|
||||||
{
|
{
|
||||||
/**
|
mysql_send_custom_error(dcb, 1, 0,
|
||||||
* The worker who owns the DCB is chosen here, before any epoll events for it can be processed.
|
|
||||||
* This guarantees that the first event for the DCB is processed only after the following
|
|
||||||
* task has been processed by the owning thread.
|
|
||||||
*/
|
|
||||||
mxs::RoutingWorker* worker = mxs::RoutingWorker::pick_worker();
|
|
||||||
|
|
||||||
worker->execute([=]() {
|
|
||||||
client_dcb->protocol = mysql_protocol_init(client_dcb, client_dcb->fd);
|
|
||||||
MXS_ABORT_IF_NULL(client_dcb->protocol);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set new descriptor to event set. At the same time,
|
|
||||||
* change state to DCB_STATE_POLLING so that
|
|
||||||
* thread which wakes up sees correct state.
|
|
||||||
*/
|
|
||||||
if (poll_add_dcb(client_dcb) == -1)
|
|
||||||
{
|
|
||||||
/* Send a custom error as MySQL command reply */
|
|
||||||
mysql_send_custom_error(client_dcb, 1, 0,
|
|
||||||
"MaxScale encountered system limit while "
|
"MaxScale encountered system limit while "
|
||||||
"attempting to register on an epoll instance.");
|
"attempting to register on an epoll instance.");
|
||||||
|
|
||||||
/** close client_dcb */
|
MXS_ERROR("Failed to add dcb %p for fd %d to epoll set.", dcb, dcb->fd);
|
||||||
dcb_close(client_dcb);
|
dcb_close(dcb);
|
||||||
|
|
||||||
/** Previous state is recovered in poll_add_dcb. */
|
|
||||||
MXS_ERROR("Failed to add dcb %p for fd %d to epoll set.",
|
|
||||||
client_dcb, client_dcb->fd);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MySQLSendHandshake(client_dcb);
|
MySQLSendHandshake(dcb);
|
||||||
}
|
}
|
||||||
}, mxs::RoutingWorker::EXECUTE_AUTO);
|
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int gw_error_client_event(DCB* dcb)
|
static int gw_error_client_event(DCB* dcb)
|
||||||
|
Reference in New Issue
Block a user