839 lines
25 KiB
C++
839 lines
25 KiB
C++
/*
|
|
* Copyright (c) 2018 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2026-01-04
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
#pragma once
|
|
|
|
#include <maxscale/ccdefs.hh>
|
|
|
|
#include <unordered_map>
|
|
#include <vector>
|
|
#include <mutex>
|
|
#include <type_traits>
|
|
#include <atomic>
|
|
|
|
#include <maxbase/atomic.hh>
|
|
#include <maxbase/semaphore.hh>
|
|
#include <maxbase/worker.hh>
|
|
#include <maxbase/stopwatch.hh>
|
|
#include <maxscale/poll.hh>
|
|
#include <maxscale/query_classifier.hh>
|
|
#include <maxscale/session.hh>
|
|
|
|
MXS_BEGIN_DECLS
|
|
|
|
// The worker ID of the "main" thread
|
|
#define MXS_RWORKER_MAIN -1
|
|
|
|
/**
|
|
* Return the routing worker associated with the provided worker id.
|
|
*
|
|
* @param worker_id A worker id. If MXS_RWORKER_MAIN is used, the
|
|
* routing worker running in the main thread will
|
|
* be returned.
|
|
*
|
|
* @return The corresponding routing worker instance, or NULL if the
|
|
* id does not correspond to a routing worker.
|
|
*/
|
|
MXB_WORKER* mxs_rworker_get(int worker_id);
|
|
|
|
/**
|
|
* Return the current routing worker.
|
|
*
|
|
* @return A routing worker, or NULL if there is no current routing worker.
|
|
*/
|
|
MXB_WORKER* mxs_rworker_get_current();
|
|
|
|
/**
|
|
* Return the id of the current routing worker.
|
|
*
|
|
* @return The id of the routing worker, or -1 if there is no current
|
|
* routing worker.
|
|
*/
|
|
int mxs_rworker_get_current_id();
|
|
|
|
/**
|
|
* Broadcast a message to all routing workers.
|
|
*
|
|
* @param msg_id The message id.
|
|
* @param arg1 Message specific first argument.
|
|
* @param arg2 Message specific second argument.
|
|
*
|
|
* @return The number of messages posted; if less that ne number of workers
|
|
* then some postings failed.
|
|
*
|
|
* @attention The return value tells *only* whether message could be posted,
|
|
* *not* that it has reached the worker.
|
|
*
|
|
* @attentsion Exactly the same arguments are passed to all workers. Take that
|
|
* into account if the passed data must be freed.
|
|
*
|
|
* @attention This function is signal safe.
|
|
*/
|
|
size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
|
|
|
/**
|
|
* Call a function on all workers
|
|
*
|
|
* A convenience function for executing simple tasks on all workers. The task
|
|
* will be executed immediately on the current worker and thus recursive calls
|
|
* into functions should not be done.
|
|
*
|
|
* @param cb Callback to call
|
|
* @param data Data passed to the callback
|
|
*
|
|
* @return The number of messages posted; if less that ne number of workers
|
|
* then some postings failed.
|
|
*/
|
|
size_t mxs_rworker_broadcast(void (* cb)(void* data), void* data);
|
|
|
|
/**
|
|
* Add a session to the current routing worker's session container. Currently
|
|
* only required for some special commands e.g. "KILL <process_id>" to work.
|
|
*
|
|
* @param session Session to add.
|
|
* @return true if successful, false if id already existed in map.
|
|
*/
|
|
bool mxs_rworker_register_session(MXS_SESSION* session);
|
|
|
|
/**
|
|
* Remove a session from the current routing worker's session container. Does
|
|
* not actually remove anything from an epoll-set or affect the session in any
|
|
* way.
|
|
*
|
|
* @param id Which id to remove.
|
|
* @return The removed session or NULL if not found.
|
|
*/
|
|
bool mxs_rworker_deregister_session(uint64_t id);
|
|
|
|
/**
|
|
* Find a session in the current routing worker's session container.
|
|
*
|
|
* @param id Which id to find.
|
|
* @return The found session or NULL if not found.
|
|
*/
|
|
MXS_SESSION* mxs_rworker_find_session(uint64_t id);
|
|
|
|
/**
|
|
* Worker local storage
|
|
*/
|
|
|
|
/**
|
|
* Initialize a globally unique data identifier
|
|
*
|
|
* The value returned by this function is used with the other data commands.
|
|
* The value is a unique handle to thread-local storage.
|
|
*
|
|
* @return The data identifier usable for worker local data storage
|
|
*/
|
|
uint64_t mxs_rworker_create_key();
|
|
|
|
/**
|
|
* Set local worker data on current worker
|
|
*
|
|
* @param key Key acquired with create_data
|
|
* @param data Data to store
|
|
* @param callback Callback used to delete the data, NULL if no deletion is
|
|
* required. This function is called by mxs_rworker_delete_data
|
|
* when the data is deleted.
|
|
*/
|
|
void mxs_rworker_set_data(uint64_t key, void* data, void (* callback)(void*));
|
|
|
|
/**
|
|
* Get local data from current worker
|
|
*
|
|
* @param key Key to use
|
|
*
|
|
* @return Data previously stored or NULL if no data was previously stored
|
|
*/
|
|
void* mxs_rworker_get_data(uint64_t key);
|
|
|
|
/**
|
|
* Deletes local data from all workers
|
|
*
|
|
* The key must not be used again after deletion.
|
|
*
|
|
* @param key Key to remove
|
|
*/
|
|
void mxs_rworker_delete_data(uint64_t key);
|
|
|
|
MXS_END_DECLS
|
|
|
|
namespace maxscale
|
|
{
|
|
|
|
class RoutingWorker : public mxb::Worker
|
|
, private MXB_POLL_DATA
|
|
{
|
|
RoutingWorker(const RoutingWorker&) = delete;
|
|
RoutingWorker& operator=(const RoutingWorker&) = delete;
|
|
|
|
public:
|
|
enum
|
|
{
|
|
MAIN = -1
|
|
};
|
|
|
|
typedef Registry<MXS_SESSION> SessionsById;
|
|
typedef std::vector<DCB*> Zombies;
|
|
|
|
typedef std::vector<void*> LocalData;
|
|
typedef std::vector<void (*)(void*)> DataDeleters;
|
|
|
|
/**
|
|
* Initialize the routing worker mechanism.
|
|
*
|
|
* To be called once at process startup. This will cause as many workers
|
|
* to be created as the number of threads defined.
|
|
*
|
|
* @return True if the initialization succeeded, false otherwise.
|
|
*/
|
|
static bool init();
|
|
|
|
/**
|
|
* Finalize the worker mechanism.
|
|
*
|
|
* To be called once at process shutdown. This will cause all workers
|
|
* to be destroyed. When the function is called, no worker should be
|
|
* running anymore.
|
|
*/
|
|
static void finish();
|
|
|
|
/**
|
|
* Add a file descriptor to the epoll instance shared between all workers.
|
|
* Events occuring on the provided file descriptor will be handled by all
|
|
* workers. This is primarily intended for listening sockets where the
|
|
* only event is EPOLLIN, signaling that accept() can be used on the listening
|
|
* socket for creating a connected socket to a client.
|
|
*
|
|
* @param fd The file descriptor to be added.
|
|
* @param events Mask of epoll event types.
|
|
* @param pData The poll data associated with the descriptor:
|
|
*
|
|
* data->handler : Handler that knows how to deal with events
|
|
* for this particular type of 'struct mxs_poll_data'.
|
|
* data->thread.id: 0
|
|
*
|
|
* @return True, if the descriptor could be added, false otherwise.
|
|
*/
|
|
static bool add_shared_fd(int fd, uint32_t events, MXB_POLL_DATA* pData);
|
|
|
|
/**
|
|
* Remove a file descriptor from the epoll instance shared between all workers.
|
|
*
|
|
* @param fd The file descriptor to be removed.
|
|
*
|
|
* @return True on success, false on failure.
|
|
*/
|
|
static bool remove_shared_fd(int fd);
|
|
|
|
/**
|
|
* Returns the id of the routing worker
|
|
*
|
|
* @return The id of the routing worker.
|
|
*/
|
|
int id() const override
|
|
{
|
|
return m_id;
|
|
}
|
|
|
|
/**
|
|
* Register zombie for later deletion.
|
|
*
|
|
* @param pZombie DCB that will be deleted at end of event loop.
|
|
*
|
|
* @note The DCB must be owned by this worker.
|
|
*/
|
|
void register_zombie(DCB* pZombie);
|
|
|
|
/**
|
|
* Return a reference to the session registry of this worker.
|
|
*
|
|
* @return Session registry.
|
|
*/
|
|
SessionsById& session_registry();
|
|
|
|
/**
|
|
* Return the worker associated with the provided worker id.
|
|
*
|
|
* @param worker_id A worker id. By specifying MAIN, the routing worker
|
|
* running in the main thread will be returned.
|
|
*
|
|
* @return The corresponding worker instance, or NULL if the id does
|
|
* not correspond to a worker.
|
|
*/
|
|
static RoutingWorker* get(int worker_id);
|
|
|
|
/**
|
|
* Return the worker associated with the current thread.
|
|
*
|
|
* @return The worker instance, or NULL if the current thread does not have a worker.
|
|
*/
|
|
static RoutingWorker* get_current();
|
|
|
|
/**
|
|
* Return the worker id associated with the current thread.
|
|
*
|
|
* @return A worker instance, or -1 if the current thread does not have a worker.
|
|
*/
|
|
static int get_current_id();
|
|
|
|
/**
|
|
* Starts all routing workers.
|
|
*
|
|
* @return True, if all workers could be started.
|
|
*/
|
|
static bool start_workers();
|
|
|
|
/**
|
|
* Waits for all routing workers.
|
|
*/
|
|
static void join_workers();
|
|
|
|
/**
|
|
* Deprecated
|
|
*/
|
|
static void set_nonblocking_polls(unsigned int nbpolls);
|
|
|
|
/**
|
|
* Deprecated
|
|
*/
|
|
static void set_maxwait(unsigned int maxwait);
|
|
|
|
/**
|
|
* Posts a task to all workers for execution.
|
|
*
|
|
* @param pTask The task to be executed.
|
|
* @param pSem If non-NULL, will be posted once per worker when the task's
|
|
* `execute` return.
|
|
*
|
|
* @return How many workers the task was posted to.
|
|
*
|
|
* @attention The very same task will be posted to all workers. The task
|
|
* should either not have any sharable data or then it should
|
|
* have data specific to each worker that can be accessed
|
|
* without locks.
|
|
*
|
|
* @attention The task will be posted to each routing worker using the
|
|
* EXECUTE_AUTO execution mode. That is, if the calling thread
|
|
* is that of a routing worker, then the task will be executed
|
|
* directly without going through the message loop of the worker,
|
|
* otherwise the task is delivered via the message loop.
|
|
*/
|
|
static size_t broadcast(Task* pTask, mxb::Semaphore* pSem = NULL);
|
|
|
|
/**
|
|
* Posts a task to all workers for execution.
|
|
*
|
|
* @param pTask The task to be executed.
|
|
*
|
|
* @return How many workers the task was posted to.
|
|
*
|
|
* @attention The very same task will be posted to all workers. The task
|
|
* should either not have any sharable data or then it should
|
|
* have data specific to each worker that can be accessed
|
|
* without locks.
|
|
*
|
|
* @attention Once the task has been executed by all workers, it will
|
|
* be deleted.
|
|
*
|
|
* @attention The task will be posted to each routing worker using the
|
|
* EXECUTE_AUTO execution mode. That is, if the calling thread
|
|
* is that of a routing worker, then the task will be executed
|
|
* directly without going through the message loop of the worker,
|
|
* otherwise the task is delivered via the message loop.
|
|
*/
|
|
static size_t broadcast(std::unique_ptr<DisposableTask> sTask);
|
|
|
|
/**
|
|
* Posts a function to all workers for execution.
|
|
*
|
|
* @param pSem If non-NULL, will be posted once the task's `execute` return.
|
|
* @param mode Execution mode
|
|
*
|
|
* @return How many workers the task was posted to.
|
|
*/
|
|
static size_t broadcast(std::function<void ()> func, mxb::Semaphore* pSem, execute_mode_t mode);
|
|
|
|
static size_t broadcast(std::function<void ()> func, enum execute_mode_t mode)
|
|
{
|
|
return broadcast(func, NULL, mode);
|
|
}
|
|
|
|
/**
|
|
* Executes a task on all workers in serial mode (the task is executed
|
|
* on at most one worker thread at a time). When the function returns
|
|
* the task has been executed on all workers.
|
|
*
|
|
* @param task The task to be executed.
|
|
*
|
|
* @return How many workers the task was posted to.
|
|
*
|
|
* @warning This function is extremely inefficient and will be slow compared
|
|
* to the other functions. Only use this function when printing thread-specific
|
|
* data to stdout.
|
|
*
|
|
* @attention The task will be posted to each routing worker using the
|
|
* EXECUTE_AUTO execution mode. That is, if the calling thread
|
|
* is that of a routing worker, then the task will be executed
|
|
* directly without going through the message loop of the worker,
|
|
* otherwise the task is delivered via the message loop.
|
|
*/
|
|
static size_t execute_serially(Task& task);
|
|
static size_t execute_serially(std::function<void()> func);
|
|
|
|
/**
|
|
* Executes a task on all workers concurrently and waits until all workers
|
|
* are done. That is, when the function returns the task has been executed
|
|
* by all workers.
|
|
*
|
|
* @param task The task to be executed.
|
|
*
|
|
* @return How many workers the task was posted to.
|
|
*
|
|
* @attention The task will be posted to each routing worker using the
|
|
* EXECUTE_AUTO execution mode. That is, if the calling thread
|
|
* is that of a routing worker, then the task will be executed
|
|
* directly without going through the message loop of the worker,
|
|
* otherwise the task is delivered via the message loop.
|
|
*/
|
|
static size_t execute_concurrently(Task& task);
|
|
static size_t execute_concurrently(std::function<void()> func);
|
|
|
|
/**
|
|
* Broadcast a message to all worker.
|
|
*
|
|
* @param msg_id The message id.
|
|
* @param arg1 Message specific first argument.
|
|
* @param arg2 Message specific second argument.
|
|
*
|
|
* @return The number of messages posted; if less that ne number of workers
|
|
* then some postings failed.
|
|
*
|
|
* @attention The return value tells *only* whether message could be posted,
|
|
* *not* that it has reached the worker.
|
|
*
|
|
* @attentsion Exactly the same arguments are passed to all workers. Take that
|
|
* into account if the passed data must be freed.
|
|
*
|
|
* @attention This function is signal safe.
|
|
*/
|
|
static size_t broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
|
|
|
/**
|
|
* Initate shutdown of all workers.
|
|
*
|
|
* @attention A call to this function will only initiate the shutdowm,
|
|
* the workers will not have shut down when the function returns.
|
|
*
|
|
* @attention This function is signal safe.
|
|
*/
|
|
static void shutdown_all();
|
|
|
|
/**
|
|
* Returns statistics for all workers.
|
|
*
|
|
* @return Combined statistics.
|
|
*
|
|
* @attentions The statistics may no longer be accurate by the time it has
|
|
* been returned. The returned values may also not represent a
|
|
* 100% consistent set.
|
|
*/
|
|
static STATISTICS get_statistics();
|
|
|
|
/**
|
|
* Return a specific combined statistic value.
|
|
*
|
|
* @param what What to return.
|
|
*
|
|
* @return The corresponding value.
|
|
*/
|
|
static int64_t get_one_statistic(POLL_STAT what);
|
|
|
|
/**
|
|
* Get next worker
|
|
*
|
|
* @return The worker where work should be assigned
|
|
*/
|
|
static RoutingWorker* pick_worker();
|
|
|
|
/**
|
|
* Worker local storage
|
|
*/
|
|
|
|
/**
|
|
* Initialize a globally unique data identifier
|
|
*
|
|
* @return The data identifier usable for worker local data storage
|
|
*/
|
|
static uint64_t create_key()
|
|
{
|
|
static std::atomic<uint64_t> id_generator {0};
|
|
return id_generator.fetch_add(1, std::memory_order_relaxed);
|
|
}
|
|
|
|
/**
|
|
* Set local data
|
|
*
|
|
* @param key Key acquired with create_local_data
|
|
* @param data Data to store
|
|
*/
|
|
void set_data(uint64_t key, void* data, void (* callback)(void*))
|
|
{
|
|
if (m_local_data.size() <= key)
|
|
{
|
|
m_local_data.resize(key + 1, nullptr);
|
|
m_data_deleters.resize(key + 1, nullptr);
|
|
}
|
|
|
|
if (callback)
|
|
{
|
|
m_data_deleters[key] = callback;
|
|
}
|
|
|
|
m_local_data[key] = data;
|
|
}
|
|
|
|
/**
|
|
* Get local data
|
|
*
|
|
* @param key Key to use
|
|
*
|
|
* @return Data previously stored
|
|
*/
|
|
void* get_data(uint64_t key) const
|
|
{
|
|
return key < m_local_data.size() ? m_local_data[key] : nullptr;
|
|
}
|
|
|
|
/**
|
|
* Deletes local data
|
|
*
|
|
* If a callback was passed when the data was set, it will be called.
|
|
*
|
|
* @param key Key to remove
|
|
*/
|
|
void delete_data(uint64_t key)
|
|
{
|
|
if (key < m_local_data.size())
|
|
{
|
|
if (auto deleter = m_data_deleters[key])
|
|
{
|
|
deleter(m_local_data[key]);
|
|
}
|
|
|
|
m_data_deleters[key] = nullptr;
|
|
m_local_data[key] = nullptr;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Provides QC statistics of one workers
|
|
*
|
|
* @param id[in] Id of worker.
|
|
* @param pStats[out] The QC statistics of that worker.
|
|
*
|
|
* return True, if @c id referred to a worker, false otherwise.
|
|
*/
|
|
static bool get_qc_stats(int id, QC_CACHE_STATS* pStats);
|
|
|
|
/**
|
|
* Provides QC statistics of all workers
|
|
*
|
|
* @param all_stats Vector that on return will contain the statistics of all workers.
|
|
*/
|
|
static void get_qc_stats(std::vector<QC_CACHE_STATS>& all_stats);
|
|
|
|
/**
|
|
* Provides QC statistics of all workers as a Json object for use in the REST-API.
|
|
*/
|
|
static std::unique_ptr<json_t> get_qc_stats_as_json(const char* zHost);
|
|
|
|
/**
|
|
* Provides QC statistics of one worker as a Json object for use in the REST-API.
|
|
*
|
|
* @param zHost The name of the MaxScale host.
|
|
* @param id An id of a worker.
|
|
*
|
|
* @return A json object if @c id refers to a worker, NULL otherwise.
|
|
*/
|
|
static std::unique_ptr<json_t> get_qc_stats_as_json(const char* zHost, int id);
|
|
|
|
/**
|
|
* To be called from the initial (parent) thread if the systemd watchdog is on.
|
|
*/
|
|
static void set_watchdog_interval(uint64_t microseconds);
|
|
|
|
class WatchdogWorkaround;
|
|
friend WatchdogWorkaround;
|
|
|
|
/**
|
|
* @class WatchdogWorkaround
|
|
*
|
|
* RAII-class using which the systemd watchdog notification can be
|
|
* handled during synchronous worker activity that causes the epoll
|
|
* event handling to be stalled.
|
|
*
|
|
* The constructor turns on the workaround and the destructor
|
|
* turns it off.
|
|
*/
|
|
class WatchdogWorkaround
|
|
{
|
|
WatchdogWorkaround(const WatchdogWorkaround&);
|
|
WatchdogWorkaround& operator=(const WatchdogWorkaround&);
|
|
|
|
public:
|
|
/**
|
|
* Turns on the watchdog workaround for a specific worker.
|
|
*
|
|
* @param pWorker The worker for which the systemd notification
|
|
* should be arranged. Need not be the calling worker.
|
|
*/
|
|
WatchdogWorkaround(RoutingWorker* pWorker)
|
|
: m_pWorker(pWorker)
|
|
{
|
|
mxb_assert(pWorker);
|
|
m_pWorker->start_watchdog_workaround();
|
|
}
|
|
|
|
/**
|
|
* Turns on the watchdog workaround for the calling worker.
|
|
*/
|
|
WatchdogWorkaround()
|
|
: WatchdogWorkaround(RoutingWorker::get_current())
|
|
{
|
|
}
|
|
|
|
/**
|
|
* Turns off the watchdog workaround.
|
|
*/
|
|
~WatchdogWorkaround()
|
|
{
|
|
m_pWorker->stop_watchdog_workaround();
|
|
}
|
|
|
|
private:
|
|
RoutingWorker* m_pWorker;
|
|
};
|
|
|
|
private:
|
|
class WatchdogNotifier;
|
|
friend WatchdogNotifier;
|
|
|
|
const int m_id; /*< The id of the worker. */
|
|
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
|
|
* should contain sessions exclusive to this
|
|
* worker and not e.g. listener sessions. For now,
|
|
* it's up to the protocol to decide whether a new
|
|
* session is added to the map. */
|
|
Zombies m_zombies; /*< DCBs to be deleted. */
|
|
LocalData m_local_data; /*< Data local to this worker */
|
|
DataDeleters m_data_deleters; /*< Delete functions for the local data */
|
|
|
|
RoutingWorker();
|
|
virtual ~RoutingWorker();
|
|
|
|
static RoutingWorker* create(int epoll_listener_fd);
|
|
|
|
bool pre_run(); // override
|
|
void post_run(); // override
|
|
void epoll_tick(); // override
|
|
|
|
void delete_zombies();
|
|
void check_systemd_watchdog();
|
|
void start_watchdog_workaround();
|
|
void stop_watchdog_workaround();
|
|
|
|
static uint32_t epoll_instance_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events);
|
|
uint32_t handle_epoll_events(uint32_t events);
|
|
|
|
static maxbase::Duration s_watchdog_interval; /*< Duration between notifications, if any. */
|
|
static maxbase::TimePoint s_watchdog_next_check;/*< Next time to notify systemd. */
|
|
std::atomic<bool> m_alive; /*< Set to true in epoll_tick(), false on
|
|
* notification. */
|
|
WatchdogNotifier* m_pWatchdog_notifier; /*< Watchdog notifier, if systemd enabled. */
|
|
};
|
|
|
|
using WatchdogWorkaround = RoutingWorker::WatchdogWorkaround;
|
|
|
|
// Data local to a routing worker
|
|
template<class T>
|
|
class rworker_local
|
|
{
|
|
public:
|
|
|
|
rworker_local(const rworker_local&) = delete;
|
|
rworker_local& operator=(const rworker_local&) = delete;
|
|
|
|
// Default initialized
|
|
rworker_local()
|
|
: m_handle(mxs_rworker_create_key())
|
|
{
|
|
}
|
|
|
|
// Copy-constructed
|
|
rworker_local(const T& t)
|
|
: m_handle(mxs_rworker_create_key())
|
|
, m_value(t)
|
|
{
|
|
}
|
|
|
|
~rworker_local()
|
|
{
|
|
mxs_rworker_delete_data(m_handle);
|
|
}
|
|
|
|
// Converts to a T reference
|
|
operator T&() const
|
|
{
|
|
return *get_local_value();
|
|
}
|
|
|
|
// Arrow operator
|
|
T* operator->() const
|
|
{
|
|
return get_local_value();
|
|
}
|
|
|
|
// Dereference operator
|
|
T& operator*()
|
|
{
|
|
return *get_local_value();
|
|
}
|
|
|
|
// Const version of dereference operator
|
|
const T& operator*() const
|
|
{
|
|
return *get_local_value();
|
|
}
|
|
|
|
/**
|
|
* Assign a value
|
|
*
|
|
* Sets the master value and triggers an update on all workers. The value is updated instantly
|
|
* if the calling thread is a worker thread.
|
|
*
|
|
* @param t The new value to assign
|
|
*/
|
|
void assign(const T& t)
|
|
{
|
|
std::unique_lock<std::mutex> guard(m_lock);
|
|
m_value = t;
|
|
guard.unlock();
|
|
|
|
// Update the value on all workers
|
|
mxs_rworker_broadcast(update_value, this);
|
|
}
|
|
|
|
/**
|
|
* Get all local values
|
|
*
|
|
* Note: this method must be called from the main worker thread.
|
|
*
|
|
* @return A vector containing the individual values for each worker
|
|
*/
|
|
std::vector<T> values() const
|
|
{
|
|
mxb_assert_message(RoutingWorker::get_current() == RoutingWorker::get(RoutingWorker::MAIN),
|
|
"this method must be called from the main worker thread");
|
|
std::vector<T> rval;
|
|
std::mutex lock;
|
|
mxb::Semaphore sem;
|
|
|
|
auto n = RoutingWorker::broadcast([&]() {
|
|
std::lock_guard<std::mutex> guard(lock);
|
|
rval.push_back(*get_local_value());
|
|
},
|
|
&sem,
|
|
RoutingWorker::EXECUTE_AUTO);
|
|
|
|
sem.wait_n(n);
|
|
return rval;
|
|
}
|
|
|
|
private:
|
|
|
|
uint64_t m_handle; // The handle to the worker local data
|
|
typename std::remove_const<T>::type m_value; // The master value, never used directly
|
|
mutable std::mutex m_lock; // Protects the master value
|
|
|
|
private:
|
|
|
|
T* get_local_value() const
|
|
{
|
|
auto worker = RoutingWorker::get_current();
|
|
T* my_value = static_cast<T*>(worker->get_data(m_handle));
|
|
|
|
if (my_value == nullptr)
|
|
{
|
|
// First time we get the local value, allocate it from the master value
|
|
std::unique_lock<std::mutex> guard(m_lock);
|
|
my_value = new T(m_value);
|
|
guard.unlock();
|
|
|
|
worker->set_data(m_handle, my_value, destroy_value);
|
|
}
|
|
|
|
mxb_assert(my_value);
|
|
return my_value;
|
|
}
|
|
|
|
void update_local_value()
|
|
{
|
|
// As get_local_value can cause a lock to be taken, we need the pointer to our value before
|
|
// we lock the master value for the updating of our value.
|
|
T* my_value = get_local_value();
|
|
|
|
std::lock_guard<std::mutex> guard(m_lock);
|
|
*my_value = m_value;
|
|
}
|
|
|
|
static void update_value(void* data)
|
|
{
|
|
static_cast<rworker_local<T>*>(data)->update_local_value();
|
|
}
|
|
|
|
static void destroy_value(void* data)
|
|
{
|
|
delete static_cast<T*>(data);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* @brief Convert a routing worker to JSON format
|
|
*
|
|
* @param host Hostname of this server
|
|
* @param id ID of the worker
|
|
*
|
|
* @return JSON resource representing the worker
|
|
*/
|
|
json_t* mxs_rworker_to_json(const char* host, int id);
|
|
|
|
/**
|
|
* Convert routing workers into JSON format
|
|
*
|
|
* @param host Hostname of this server
|
|
*
|
|
* @return A JSON resource collection of workers
|
|
*
|
|
* @see mxs_json_resource()
|
|
*/
|
|
json_t* mxs_rworker_list_to_json(const char* host);
|
|
|
|
/**
|
|
* @brief MaxScale worker watchdog
|
|
*
|
|
* If this function returns, then MaxScale is alive. If not,
|
|
* then some thread is dead.
|
|
*/
|
|
void mxs_rworker_watchdog();
|