From ff07009d8cab41c442f5a3d94900bca7c403ddd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Thu, 26 Jul 2018 10:10:05 +0300 Subject: [PATCH] MXS-1929: Add worker local storage of data Data can now be stored on thread-local storage of the worker. By acquiring a unique handle from the worker, a module can store a thread-local value. This functionality will be used to store configurations that are sometimes updated at runtime but are largely read-only. By avoiding shared data altogether, performance is not affected. The only synchronization that is done is on update. Also added a helper functions for broadcasting tasks on all routing workers. With the old mxs_rworker_broadcast_message function, if a function call was broadcasted it was always queued for execution. The mxs_rworker_broadcast will immediately execute the task on the local worker and queue it for execution of other routing workers. --- include/maxscale/routingworker.h | 58 ++++++++++++++++++++ server/core/internal/routingworker.hh | 78 +++++++++++++++++++++++++++ server/core/routingworker.cc | 51 ++++++++++++++++++ 3 files changed, 187 insertions(+) diff --git a/include/maxscale/routingworker.h b/include/maxscale/routingworker.h index 009932d27..b38ecebdc 100644 --- a/include/maxscale/routingworker.h +++ b/include/maxscale/routingworker.h @@ -68,6 +68,21 @@ int mxs_rworker_get_current_id(); */ size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); +/** + * Call a function on all workers + * + * A convenience function for executing simple tasks on all workers. The task + * will be executed immediately on the current worker and thus recursive calls + * into functions should not be done. + * + * @param cb Callback to call + * @param data Data passed to the callback + * + * @return The number of messages posted; if less that ne number of workers + * then some postings failed. + */ +size_t mxs_rworker_broadcast(void (*cb)(void* data), void* data); + /** * Add a session to the current routing worker's session container. Currently * only required for some special commands e.g. "KILL " to work. @@ -95,4 +110,47 @@ bool mxs_rworker_deregister_session(uint64_t id); */ MXS_SESSION* mxs_rworker_find_session(uint64_t id); +/** + * Worker local storage + */ + +/** + * Initialize a globally unique data identifier + * + * The value returned by this function is used with the other data commands. + * The value is a unique handle to thread-local storage. + * + * @return The data identifier usable for worker local data storage + */ +uint64_t mxs_rworker_create_key(); + +/** + * Set local worker data on current worker + * + * @param key Key acquired with create_data + * @param data Data to store + * @param callback Callback used to delete the data, NULL if no deletion is + * required. This function is called by mxs_rworker_delete_data + * when the data is deleted. + */ +void mxs_rworker_set_data(uint64_t key, void* data, void (*callback)(void*)); + +/** + * Get local data from current worker + * + * @param key Key to use + * + * @return Data previously stored or NULL if no data was previously stored + */ +void* mxs_rworker_get_data(uint64_t key); + +/** + * Deletes local data from all workers + * + * The key must not be used again after deletion. + * + * @param key Key to remove + */ +void mxs_rworker_delete_data(uint64_t key); + MXS_END_DECLS diff --git a/server/core/internal/routingworker.hh b/server/core/internal/routingworker.hh index 02dd350df..5cedc6f66 100644 --- a/server/core/internal/routingworker.hh +++ b/server/core/internal/routingworker.hh @@ -13,8 +13,12 @@ */ #include + +#include + #include #include + #include "session.hh" namespace maxscale @@ -35,6 +39,9 @@ public: typedef Registry SessionsById; typedef std::vector Zombies; + typedef std::unordered_map LocalData; + typedef std::unordered_map DataDeleters; + /** * Initialize the routing worker mechanism. * @@ -278,6 +285,75 @@ public: */ static RoutingWorker* pick_worker(); + /** + * Worker local storage + */ + + /** + * Initialize a globally unique data identifier + * + * @return The data identifier usable for worker local data storage + */ + static uint64_t create_key() + { + static uint64_t id_generator = 0; + return atomic_add_uint64(&id_generator, 1); + } + + /** + * Set local data + * + * @param key Key acquired with create_local_data + * @param data Data to store + */ + void set_data(uint64_t key, void* data, void (*callback)(void*)) + { + if (callback) + { + m_data_deleters[key] = callback; + } + + m_local_data[key] = data; + } + + /** + * Get local data + * + * @param key Key to use + * + * @return Data previously stored + */ + void* get_data(uint64_t key) + { + auto it = m_local_data.find(key); + return it != m_local_data.end() ? it->second : NULL; + } + + /** + * Deletes local data + * + * If a callback was passed when the data was set, it will be called. + * + * @param key Key to remove + */ + void delete_data(uint64_t key) + { + auto data = m_local_data.find(key); + + if (data != m_local_data.end()) + { + auto deleter = m_data_deleters.find(key); + + if (deleter != m_data_deleters.end()) + { + deleter->second(data->second); + m_data_deleters.erase(deleter); + } + + m_local_data.erase(data); + } + } + private: const int m_id; /*< The id of the worker. */ SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map @@ -286,6 +362,8 @@ private: * it's up to the protocol to decide whether a new * session is added to the map. */ Zombies m_zombies; /*< DCBs to be deleted. */ + LocalData m_local_data; /*< Data local to this worker */ + DataDeleters m_data_deleters; /*< Delete functions for the local data */ RoutingWorker(); virtual ~RoutingWorker(); diff --git a/server/core/routingworker.cc b/server/core/routingworker.cc index f6f61cf84..843d6964c 100644 --- a/server/core/routingworker.cc +++ b/server/core/routingworker.cc @@ -997,6 +997,57 @@ private: const char* m_zHost; }; +class FunctionTask: public maxscale::WorkerDisposableTask +{ +public: + FunctionTask(std::function cb): + m_cb(cb) + { + } + + void execute(Worker& worker) + { + m_cb(); + } + +protected: + std::function m_cb; +}; + +} + +size_t mxs_rworker_broadcast(void (*cb)(void* data), void* data) +{ + return RoutingWorker::broadcast(std::auto_ptr(new FunctionTask([&]() + { + cb(data); + }))); +} + +uint64_t mxs_rworker_create_key() +{ + return RoutingWorker::create_key(); +} + +void mxs_rworker_set_data(uint64_t key, void* data, void (*callback)(void*)) +{ + RoutingWorker::get_current()->set_data(key, data, callback); +} + +void* mxs_rworker_get_data(uint64_t key) +{ + return RoutingWorker::get_current()->get_data(key); +} + +void mxs_rworker_delete_data(uint64_t key) +{ + auto func = [key]() + { + RoutingWorker::get_current()->delete_data(key); + }; + + std::auto_ptr task(new FunctionTask(func)); + RoutingWorker::broadcast(task); } json_t* mxs_rworker_to_json(const char* zHost, int id)