From 46da2d3ad2a361ec8d4e404c4907546529aa0165 Mon Sep 17 00:00:00 2001 From: Esa Korhonen Date: Fri, 28 Apr 2017 15:42:58 +0300 Subject: [PATCH 1/4] Add entry on proxy protocol to release notes --- Documentation/Release-Notes/MaxScale-2.2.0-Release-Notes.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Documentation/Release-Notes/MaxScale-2.2.0-Release-Notes.md b/Documentation/Release-Notes/MaxScale-2.2.0-Release-Notes.md index f1f2d8c2e..b7caa39a6 100644 --- a/Documentation/Release-Notes/MaxScale-2.2.0-Release-Notes.md +++ b/Documentation/Release-Notes/MaxScale-2.2.0-Release-Notes.md @@ -55,6 +55,12 @@ The Avrorouter now supports the `deflate` compression method. This allows the stored Avro format files to be compressed on disk. For more information, refer to the [Avrorouter](../Routers/Avrorouter.md) documentation. +### Preliminary proxy protocol support + +The MySQL backend protocol module now supports sending a proxy protocol header +to the server. For more information, see the server section in the +[Configuration guide](../Getting-Started/Configuration-Guide.md). + ## Bug fixes [Here is a list of bugs fixed since the release of MaxScale 2.1.X.]() From bfd94c2b31db3dafb4001684236fa299900c0ba3 Mon Sep 17 00:00:00 2001 From: Esa Korhonen Date: Thu, 27 Apr 2017 18:09:06 +0300 Subject: [PATCH 2/4] KILL [CONNECTION | QUERY] support, part1 Preparation for adding KILL syntax support. Session id changed to uint32 everywhere. Added atomic op. Session id can be acquired before session_alloc(). Added session_alloc_with_id(), which is given a session id number. Worker object has a session_id->SESSION* mapping, not used yet. --- include/maxscale/atomic.h | 1 + include/maxscale/dcb.h | 2 +- include/maxscale/protocol/mysql.h | 2 +- include/maxscale/session.h | 34 +++++++- include/maxscale/worker.h | 5 ++ server/core/atomic.cc | 9 ++ server/core/dcb.cc | 6 +- server/core/maxscale/worker.hh | 8 ++ server/core/modulecmd.cc | 2 +- server/core/session.cc | 85 +++++++++++-------- .../protocol/MySQL/MySQLClient/mysql_client.c | 11 ++- server/modules/routing/debugcli/debugcmd.c | 2 +- .../schemarouter/schemaroutersession.cc | 6 +- 13 files changed, 120 insertions(+), 53 deletions(-) diff --git a/include/maxscale/atomic.h b/include/maxscale/atomic.h index bfcd1067d..72172c270 100644 --- a/include/maxscale/atomic.h +++ b/include/maxscale/atomic.h @@ -44,6 +44,7 @@ MXS_BEGIN_DECLS * @return The value of variable before the add occurred */ int atomic_add(int *variable, int value); +uint32_t atomic_add_uint32(uint32_t *variable, int32_t value); int64_t atomic_add_int64(int64_t *variable, int64_t value); uint64_t atomic_add_uint64(uint64_t *variable, int64_t value); diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index c39265d26..fa551ce3a 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -313,7 +313,7 @@ int dcb_isvalid(DCB *); /* Check the DCB is in the linked li int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */ int dcb_persistent_clean_count(DCB *, int, bool); /* Clean persistent and return count */ void dcb_hangup_foreach (struct server* server); -size_t dcb_get_session_id(DCB* dcb); +uint32_t dcb_get_session_id(DCB* dcb); char *dcb_role_name(DCB *); /* Return the name of a role */ int dcb_accept_SSL(DCB* dcb); int dcb_connect_SSL(DCB* dcb); diff --git a/include/maxscale/protocol/mysql.h b/include/maxscale/protocol/mysql.h index 6a33eeffe..a07262dae 100644 --- a/include/maxscale/protocol/mysql.h +++ b/include/maxscale/protocol/mysql.h @@ -320,7 +320,7 @@ typedef struct uint32_t server_capabilities; /*< server capabilities, created or received */ uint32_t client_capabilities; /*< client capabilities, created or received */ uint32_t extra_capabilities; /*< MariaDB 10.2 capabilities */ - unsigned long tid; /*< MySQL Thread ID, in handshake */ + uint32_t tid; /*< MySQL Thread ID, in handshake */ unsigned int charset; /*< MySQL character set at connect time */ int ignore_replies; /*< How many replies should be discarded */ GWBUF* stored_query; /*< Temporarily stored queries */ diff --git a/include/maxscale/session.h b/include/maxscale/session.h index 78e9b4ad6..dc93b8342 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -133,7 +133,7 @@ typedef struct session { skygw_chk_t ses_chk_top; mxs_session_state_t state; /*< Current descriptor state */ - size_t ses_id; /*< Unique session identifier */ + uint32_t ses_id; /*< Unique session identifier */ struct dcb *client_dcb; /*< The client connection */ struct mxs_router_session *router_session; /*< The router instance data */ MXS_SESSION_STATS stats; /*< Session statistics */ @@ -171,7 +171,30 @@ typedef struct session ((sess)->tail.clientReply)((sess)->tail.instance, \ (sess)->tail.session, (buf)) +/** + * Allocate a new session for a new client of the specified service. + * + * Create the link to the router session by calling the newSession + * entry point of the router using the router instance of the + * service this session is part of. + * + * @param service The service this connection was established by + * @param client_dcb The client side DCB + * @return The newly created session or NULL if an error occurred + */ MXS_SESSION *session_alloc(struct service *, struct dcb *); + +/** + * A version of session_alloc() which takes the session id number as parameter. + * The id should have been generated with session_get_next_id(). + * + * @param service The service this connection was established by + * @param client_dcb The client side DCB + * @param id Id for the new session. + * @return The newly created session or NULL if an error occurred + */ +MXS_SESSION *session_alloc_with_id(struct service *, struct dcb *, uint32_t); + MXS_SESSION *session_set_dummy(struct dcb *); const char *session_get_remote(const MXS_SESSION *); @@ -328,7 +351,14 @@ static inline bool session_set_autocommit(MXS_SESSION* ses, bool autocommit) * * @note The caller must free the session reference by calling session_put_ref */ -MXS_SESSION* session_get_by_id(int id); +MXS_SESSION* session_get_by_id(uint32_t id); + +/** + * Get the next available unique (assuming no overflow) session id number. + * + * @return An unused session id. + */ +uint32_t session_get_next_id(); /** * @brief Close a session diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index ef66c57fc..9c0619afe 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -116,4 +116,9 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, */ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); +// These automatically act on the currently executing worker thread. Not implemented yet. +void mxs_add_to_session_map(uint32_t id, MXS_SESSION* session); +void mxs_remove_from_session_map(uint32_t id); +MXS_SESSION* mxs_find_in_session_map(uint32_t id); + MXS_END_DECLS diff --git a/server/core/atomic.cc b/server/core/atomic.cc index cfbab7115..d2a6cac40 100644 --- a/server/core/atomic.cc +++ b/server/core/atomic.cc @@ -26,6 +26,15 @@ int atomic_add(int *variable, int value) #endif } +uint32_t atomic_add_uint32(uint32_t *variable, int32_t value) +{ +#ifdef MXS_USE_ATOMIC_BUILTINS + return __atomic_fetch_add(variable, value, __ATOMIC_SEQ_CST); +#else + return __sync_fetch_and_add(variable, value); +#endif +} + int64_t atomic_add_int64(int64_t *variable, int64_t value) { #ifdef MXS_USE_ATOMIC_BUILTINS diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 149e75d51..1bc04d195 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -162,8 +163,7 @@ static uint32_t dcb_process_poll_events(DCB *dcb, int thread_id, uint32_t ev); static void dcb_process_fake_events(DCB *dcb, int thread_id); static bool dcb_session_check(DCB *dcb, const char *); -size_t dcb_get_session_id( - DCB *dcb) +uint32_t dcb_get_session_id(DCB *dcb) { return (dcb && dcb->session) ? dcb->session->ses_id : 0; } @@ -1673,7 +1673,7 @@ dprintDCB(DCB *pdcb, DCB *dcb) if (dcb->session && dcb->session->state != SESSION_STATE_DUMMY) { - dcb_printf(pdcb, "\tOwning Session: %lu\n", dcb->session->ses_id); + dcb_printf(pdcb, "\tOwning Session: %" PRIu32 "\n", dcb->session->ses_id); } if (dcb->writeq) diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index a9e0c13c1..e2fa1c72b 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -14,7 +14,9 @@ #include #include +#include #include +#include #include "messagequeue.hh" #include "poll.h" #include "worker.h" @@ -67,6 +69,7 @@ public: typedef WORKER_STATISTICS STATISTICS; typedef WorkerTask Task; typedef WorkerDisposableTask DisposableTask; + typedef std::tr1::unordered_map SessionsById; enum state_t { @@ -449,6 +452,11 @@ private: bool m_started; /*< Whether the thread has been started or not. */ bool m_should_shutdown; /*< Whether shutdown should be performed. */ bool m_shutdown_initiated; /*< Whether shutdown has been initated. */ + SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map + * should contain sessions exclusive to this + * worker and not e.g. listener sessions. For now, + * it's up to the protocol to decide whether a new + * session is added to the map. */ }; } diff --git a/server/core/modulecmd.cc b/server/core/modulecmd.cc index ffa993891..456e8f57e 100644 --- a/server/core/modulecmd.cc +++ b/server/core/modulecmd.cc @@ -297,7 +297,7 @@ static bool process_argument(const MODULECMD *cmd, modulecmd_arg_type_t *type, c break; case MODULECMD_ARG_SESSION: - if ((arg->value.session = session_get_by_id(atoi((const char*)value)))) + if ((arg->value.session = session_get_by_id(strtoul((const char*)value, NULL, 0)))) { arg->type.type = MODULECMD_ARG_SESSION; } diff --git a/server/core/session.cc b/server/core/session.cc index 879aa66dd..989151b4c 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -29,6 +29,7 @@ */ #include +#include #include #include #include @@ -48,8 +49,10 @@ #include "maxscale/session.h" #include "maxscale/filter.h" -/** Global session id; updated safely by use of atomic_add */ -static int session_id; +/** Global session id counter. Must be updated atomically. Value 0 is reserved for + * dummy/unused sessions. + */ +static uint32_t next_session_id = 1; static struct session session_dummy_struct; @@ -59,7 +62,8 @@ static void session_simple_free(MXS_SESSION *session, DCB *dcb); static void session_add_to_all_list(MXS_SESSION *session); static MXS_SESSION *session_find_free(); static void session_final_free(MXS_SESSION *session); - +static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb, + MXS_SESSION* session); /** * The clientReply of the session. * @@ -87,30 +91,35 @@ session_initialize(MXS_SESSION *session) session->ses_chk_tail = CHK_NUM_SESSION; } -/** - * Allocate a new session for a new client of the specified service. - * - * Create the link to the router session by calling the newSession - * entry point of the router using the router instance of the - * service this session is part of. - * - * @param service The service this connection was established by - * @param client_dcb The client side DCB - * @return The newly created session or NULL if an error occured - */ -MXS_SESSION * -session_alloc(SERVICE *service, DCB *client_dcb) +MXS_SESSION* session_alloc(SERVICE *service, DCB *client_dcb) { MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session))); - if (NULL == session) { return NULL; } - session_initialize(session); - /** Assign a session id and increase */ - session->ses_id = (size_t)atomic_add(&session_id, 1) + 1; + session_initialize(session); + session->ses_id = session_get_next_id(); + return session_alloc_body(service, client_dcb, session); +} + +MXS_SESSION* session_alloc_with_id(SERVICE *service, DCB *client_dcb, uint32_t id) +{ + MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session))); + if (session == NULL) + { + return NULL; + } + + session_initialize(session); + session->ses_id = id; + return session_alloc_body(service, client_dcb, session); +} + +static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb, + MXS_SESSION* session) +{ session->ses_is_child = (bool) DCB_IS_CLONE(client_dcb); session->service = service; session->client_dcb = client_dcb; @@ -134,14 +143,13 @@ session_alloc(SERVICE *service, DCB *client_dcb) session->trx_state = SESSION_TRX_INACTIVE; session->autocommit = true; /* - * Only create a router session if we are not the listening - * DCB or an internal DCB. Creating a router session may create a connection to a - * backend server, depending upon the router module implementation - * and should be avoided for the listener session + * Only create a router session if we are not the listening DCB or an + * internal DCB. Creating a router session may create a connection to + * a backend server, depending upon the router module implementation + * and should be avoided for a listener session. * * Router session creation may create other DCBs that link to the - * session, therefore it is important that the session lock is - * relinquished before the router call. + * session. */ if (client_dcb->state != DCB_STATE_LISTENING && client_dcb->dcb_role != DCB_ROLE_INTERNAL) @@ -163,7 +171,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) * protocol end of the chain. */ // NOTE: Here we cast the router instance into a MXS_FILTER and - // NOTE: and the router session into a MXS_FILTER_SESSION and + // NOTE: the router session into a MXS_FILTER_SESSION and // NOTE: the router routeQuery into a filter routeQuery. That // NOTE: is in order to be able to treat the router as the first // NOTE: filter. @@ -197,13 +205,13 @@ session_alloc(SERVICE *service, DCB *client_dcb) if (session->client_dcb->user == NULL) { - MXS_INFO("Started session [%lu] for %s service ", + MXS_INFO("Started session [%" PRIu32 "] for %s service ", session->ses_id, service->name); } else { - MXS_INFO("Started %s client session [%lu] for '%s' from %s", + MXS_INFO("Started %s client session [%" PRIu32 "] for '%s' from %s", service->name, session->ses_id, session->client_dcb->user, @@ -212,7 +220,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) } else { - MXS_INFO("Start %s client session [%lu] for '%s' from %s failed, will be " + MXS_INFO("Start %s client session [%" PRIu32 "] for '%s' from %s failed, will be " "closed as soon as all related DCBs have been closed.", service->name, session->ses_id, @@ -224,7 +232,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) CHK_SESSION(session); client_dcb->session = session; - return SESSION_STATE_TO_BE_FREED == session->state ? NULL : session; + return (session->state == SESSION_STATE_TO_BE_FREED) ? NULL : session; } /** @@ -382,7 +390,7 @@ static void session_free(MXS_SESSION *session) MXS_FREE(session->filters); } - MXS_INFO("Stopped %s client session [%lu]", session->service->name, session->ses_id); + MXS_INFO("Stopped %s client session [%" PRIu32 "]", session->service->name, session->ses_id); /** If session doesn't have parent referencing to it, it can be freed */ if (!session->ses_is_child) @@ -494,7 +502,7 @@ dprintSession(DCB *dcb, MXS_SESSION *print_session) char buf[30]; int i; - dcb_printf(dcb, "Session %lu\n", print_session->ses_id); + dcb_printf(dcb, "Session %" PRIu32 "\n", print_session->ses_id); dcb_printf(dcb, "\tState: %s\n", session_state(print_session->state)); dcb_printf(dcb, "\tService: %s\n", print_session->service->name); @@ -534,7 +542,7 @@ bool dListSessions_cb(DCB *dcb, void *data) { DCB *out_dcb = (DCB*)data; MXS_SESSION *session = dcb->session; - dcb_printf(out_dcb, "%-16lu | %-15s | %-14s | %s\n", session->ses_id, + dcb_printf(out_dcb, "%-16" PRIu32 " | %-15s | %-14s | %s\n", session->ses_id, session->client_dcb && session->client_dcb->remote ? session->client_dcb->remote : "", session->service && session->service->name ? @@ -898,7 +906,7 @@ static bool ses_find_id(DCB *dcb, void *data) { void **params = (void**)data; MXS_SESSION **ses = (MXS_SESSION**)params[0]; - size_t *id = (size_t*)params[1]; + uint32_t *id = (uint32_t*)params[1]; bool rval = true; if (dcb->session->ses_id == *id) @@ -910,7 +918,7 @@ static bool ses_find_id(DCB *dcb, void *data) return rval; } -MXS_SESSION* session_get_by_id(int id) +MXS_SESSION* session_get_by_id(uint32_t id) { MXS_SESSION *session = NULL; void *params[] = {&session, &id}; @@ -981,3 +989,8 @@ void session_clear_stmt(MXS_SESSION *session) session->stmt.buffer = NULL; session->stmt.target = NULL; } + +uint32_t session_get_next_id() +{ + return atomic_add_uint32(&next_session_id, 1); +} \ No newline at end of file diff --git a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c index 4e48e2075..a1f335387 100644 --- a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c +++ b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c @@ -229,7 +229,6 @@ int MySQLSendHandshake(DCB* dcb) char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1] = ""; char *version_string; int len_version_string = 0; - int id_num; bool is_maria = false; @@ -275,10 +274,9 @@ int MySQLSendHandshake(DCB* dcb) memcpy(mysql_filler_ten + 6, &new_flags, sizeof(new_flags)); } - // thread id, now put thePID - id_num = getpid() + dcb->fd; - gw_mysql_set_byte4(mysql_thread_id_num, id_num); - + // Get the equivalent of the server process id. + protocol->tid = session_get_next_id(); + gw_mysql_set_byte4(mysql_thread_id_num, protocol->tid); memcpy(mysql_scramble_buf, server_scramble, 8); memcpy(mysql_plugin_data, server_scramble + 8, 12); @@ -702,7 +700,8 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read) * is changed so that future data will go through the * normal data handling function instead of this one. */ - MXS_SESSION *session = session_alloc(dcb->service, dcb); + MXS_SESSION *session = + session_alloc_with_id(dcb->service, dcb, protocol->tid); if (session != NULL) { diff --git a/server/modules/routing/debugcli/debugcmd.c b/server/modules/routing/debugcli/debugcmd.c index 90848a699..1ec2e4576 100644 --- a/server/modules/routing/debugcli/debugcmd.c +++ b/server/modules/routing/debugcli/debugcmd.c @@ -1724,7 +1724,7 @@ convert_arg(char *arg, int arg_type) break; case ARG_TYPE_SESSION: - rval = (unsigned long)session_get_by_id(strtol(arg, NULL, 0)); + rval = (unsigned long)session_get_by_id(strtoul(arg, NULL, 0)); break; case ARG_TYPE_MONITOR: diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index d20ab64dc..64a10bd61 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -13,6 +13,8 @@ #include "schemarouter.hh" +#include + #include #include #include @@ -361,7 +363,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) if (m_config->debug) { sprintf(errbuf + strlen(errbuf), - " ([%lu]: DB change failed)", + " ([%" PRIu32 "]: DB change failed)", m_client->session->ses_id); } @@ -989,7 +991,7 @@ bool SchemaRouterSession::handle_default_db() sprintf(errmsg, "Unknown database '%s'", m_connect_db.c_str()); if (m_config->debug) { - sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", + sprintf(errmsg + strlen(errmsg), " ([%" PRIu32 "]: DB not found on connect)", m_client->session->ses_id); } write_error_to_client(m_client, From 1b58a75f42110e95804d50ef2eba9c0c93984dc0 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Sat, 29 Apr 2017 08:22:32 +0300 Subject: [PATCH 3/4] Add concurrent execution helper to Worker Concurrently executing a task on all workers *and* waiting until all workers have executed the task seems to be common enough to warrant a helper function for that purpose. --- server/core/maxscale/worker.hh | 10 ++++++++++ server/core/worker.cc | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index e2fa1c72b..859ae9847 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -338,6 +338,16 @@ public: */ static size_t execute_on_all_serially(Task* pTask); + /** + * Executes a task on all workers concurrently and waits until + * all workers are done. + * + * @param pTask The task to be executed. + * + * @return How many workers the task was posted to. + */ + static size_t execute_on_all_concurrently(Task* pTask); + /** * Post a message to a worker. * diff --git a/server/core/worker.cc b/server/core/worker.cc index 4b3e5ac1f..5f5ec135c 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -618,6 +618,7 @@ size_t Worker::execute_on_all(std::auto_ptr sTask) return n; } + //static size_t Worker::execute_on_all_serially(Task* pTask) { @@ -638,6 +639,13 @@ size_t Worker::execute_on_all_serially(Task* pTask) return n; } +//static +size_t Worker::execute_on_all_concurrently(Task* pTask) +{ + Semaphore sem; + return sem.wait_n(Worker::execute_on_all(pTask, &sem)); +} + bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { // NOTE: No logging here, this function must be signal safe. From 00b6c10089d1d14dbe2b1e6e7285f3996a50876e Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Tue, 2 May 2017 11:22:37 +0300 Subject: [PATCH 4/4] Adjust Worker terminology - Posting a task to a worker for execution (without implicit wait) is called "post". - Posting a task to every worker for execution (without implicit wait) is called "broadcast". In these cases the task must be provided as a pointer or auto_ptr, to indicate that the provided pointer must remain alive for longer than the duration of the function call. - Posting a task to a worker for execution *and* waiting for all workers to have executed the task is called "execute" and the two variants are now called "execute_concurrently" and "execute_serially". In these cases the task is provided as a reference, since the functions will return only when all workers have (in concurrent or serial fashion) executed the task. That is, it need not remain alive for longer than the duration of the function call. --- server/core/dcb.cc | 5 ++--- server/core/maxscale/worker.hh | 41 +++++++++++++++++----------------- server/core/worker.cc | 27 ++++++++++++---------- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 1bc04d195..af83c99f7 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -3069,7 +3069,7 @@ private: bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data) { SerialDcbTask task(func, data); - Worker::execute_on_all_serially(&task); + Worker::execute_serially(task); return task.more(); } @@ -3104,9 +3104,8 @@ private: void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data) { - Semaphore sem; ParallelDcbTask task(func, data); - sem.wait_n(Worker::execute_on_all(&task, &sem)); + Worker::execute_concurrently(task); } int dcb_get_port(const DCB *dcb) diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index 859ae9847..35056713c 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -254,12 +254,12 @@ public: } /** - * Executes a task in the context of a Worker. + * Posts a task to a worker for execution. * * @param pTask The task to be executed. * @param pSem If non-NULL, will be posted once the task's `execute` return. * - * @return True if the task could be *posted*, false otherwise. + * @return True if the task could be posted (i.e. not executed), false otherwise. * * @attention The instance must remain valid for as long as it takes for the * task to be transferred to the worker and its `execute` function @@ -277,21 +277,21 @@ public: * MyResult& result = task.result(); * @endcode */ - bool execute(Task* pTask, Semaphore* pSem = NULL); + bool post(Task* pTask, Semaphore* pSem = NULL); /** - * Executes a disposable task in the context of a Worker. + * Posts a task to a worker for execution. * * @param pTask The task to be executed. * - * @return True if the task could be *posted*, false otherwise. + * @return True if the task could be posted (i.e. not executed), false otherwise. * * @attention Once the task has been executed, it will be deleted. */ - bool execute(std::auto_ptr sTask); + bool post(std::auto_ptr sTask); /** - * Executes a task on all workers. + * Posts a task to all workers for execution. * * @param pTask The task to be executed. * @param pSem If non-NULL, will be posted once per worker when the task's @@ -304,10 +304,10 @@ public: * have data specific to each worker that can be accessed * without locks. */ - static size_t execute_on_all(Task* pTask, Semaphore* pSem = NULL); + static size_t broadcast(Task* pTask, Semaphore* pSem = NULL); /** - * Executes a task on all workers. + * Posts a task to all workers for execution. * * @param pTask The task to be executed. * @@ -321,14 +321,14 @@ public: * @attention Once the task has been executed by all workers, it will * be deleted. */ - static size_t execute_on_all(std::auto_ptr sTask); + static size_t broadcast(std::auto_ptr sTask); /** - * Executes a task on all workers in serial mode. + * Executes a task on all workers in serial mode (the task is executed + * on at most one worker thread at a time). When the function returns + * the task has been executed on all workers. * - * The task is executed on at most one worker thread at a time. - * - * @param pTask The task to be executed. + * @param task The task to be executed. * * @return How many workers the task was posted to. * @@ -336,17 +336,18 @@ public: * to the other functions. Only use this function when printing thread-specific * data to stdout. */ - static size_t execute_on_all_serially(Task* pTask); + static size_t execute_serially(Task& task); /** - * Executes a task on all workers concurrently and waits until - * all workers are done. + * Executes a task on all workers concurrently and waits until all workers + * are done. That is, when the function returns the task has been executed + * by all workers. * - * @param pTask The task to be executed. + * @param task The task to be executed. * * @return How many workers the task was posted to. */ - static size_t execute_on_all_concurrently(Task* pTask); + static size_t execute_concurrently(Task& task); /** * Post a message to a worker. @@ -441,7 +442,7 @@ private: static Worker* create(int id, int epoll_listener_fd); - bool execute_disposable(DisposableTask* pTask); + bool post_disposable(DisposableTask* pTask); void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override diff --git a/server/core/worker.cc b/server/core/worker.cc index 5f5ec135c..11967f480 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -548,21 +548,23 @@ void Worker::set_maxwait(unsigned int maxwait) this_unit.max_poll_sleep = maxwait; } -bool Worker::execute(Task* pTask, Semaphore* pSem) +bool Worker::post(Task* pTask, Semaphore* pSem) { + // No logging here, function must be signal safe. intptr_t arg1 = reinterpret_cast(pTask); intptr_t arg2 = reinterpret_cast(pSem); return post_message(MXS_WORKER_MSG_TASK, arg1, arg2); } -bool Worker::execute(std::auto_ptr sTask) +bool Worker::post(std::auto_ptr sTask) { - return execute_disposable(sTask.release()); + // No logging here, function must be signal safe. + return post_disposable(sTask.release()); } // private -bool Worker::execute_disposable(DisposableTask* pTask) +bool Worker::post_disposable(DisposableTask* pTask) { pTask->inc_ref(); @@ -579,15 +581,16 @@ bool Worker::execute_disposable(DisposableTask* pTask) } //static -size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem) +size_t Worker::broadcast(Task* pTask, Semaphore* pSem) { + // No logging here, function must be signal safe. size_t n = 0; for (int i = 0; i < this_unit.n_workers; ++i) { Worker* pWorker = this_unit.ppWorkers[i]; - if (pWorker->execute(pTask, pSem)) + if (pWorker->post(pTask, pSem)) { ++n; } @@ -597,7 +600,7 @@ size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem) } //static -size_t Worker::execute_on_all(std::auto_ptr sTask) +size_t Worker::broadcast(std::auto_ptr sTask) { DisposableTask* pTask = sTask.release(); pTask->inc_ref(); @@ -608,7 +611,7 @@ size_t Worker::execute_on_all(std::auto_ptr sTask) { Worker* pWorker = this_unit.ppWorkers[i]; - if (pWorker->execute_disposable(pTask)) + if (pWorker->post_disposable(pTask)) { ++n; } @@ -620,7 +623,7 @@ size_t Worker::execute_on_all(std::auto_ptr sTask) } //static -size_t Worker::execute_on_all_serially(Task* pTask) +size_t Worker::execute_serially(Task& task) { Semaphore sem; size_t n = 0; @@ -629,7 +632,7 @@ size_t Worker::execute_on_all_serially(Task* pTask) { Worker* pWorker = this_unit.ppWorkers[i]; - if (pWorker->execute(pTask, &sem)) + if (pWorker->post(&task, &sem)) { sem.wait(); ++n; @@ -640,10 +643,10 @@ size_t Worker::execute_on_all_serially(Task* pTask) } //static -size_t Worker::execute_on_all_concurrently(Task* pTask) +size_t Worker::execute_concurrently(Task& task) { Semaphore sem; - return sem.wait_n(Worker::execute_on_all(pTask, &sem)); + return sem.wait_n(Worker::broadcast(&task, &sem)); } bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2)