MaxScale/include/maxscale/routingworker.hh
2022-01-04 15:47:38 +02:00

839 lines
25 KiB
C++

/*
* Copyright (c) 2018 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2026-01-04
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#include <maxscale/ccdefs.hh>
#include <unordered_map>
#include <vector>
#include <mutex>
#include <type_traits>
#include <atomic>
#include <maxbase/atomic.hh>
#include <maxbase/semaphore.hh>
#include <maxbase/worker.hh>
#include <maxbase/stopwatch.hh>
#include <maxscale/poll.hh>
#include <maxscale/query_classifier.hh>
#include <maxscale/session.hh>
MXS_BEGIN_DECLS
// The worker ID of the "main" thread
#define MXS_RWORKER_MAIN -1
/**
* Return the routing worker associated with the provided worker id.
*
* @param worker_id A worker id. If MXS_RWORKER_MAIN is used, the
* routing worker running in the main thread will
* be returned.
*
* @return The corresponding routing worker instance, or NULL if the
* id does not correspond to a routing worker.
*/
MXB_WORKER* mxs_rworker_get(int worker_id);
/**
* Return the current routing worker.
*
* @return A routing worker, or NULL if there is no current routing worker.
*/
MXB_WORKER* mxs_rworker_get_current();
/**
* Return the id of the current routing worker.
*
* @return The id of the routing worker, or -1 if there is no current
* routing worker.
*/
int mxs_rworker_get_current_id();
/**
* Broadcast a message to all routing workers.
*
* @param msg_id The message id.
* @param arg1 Message specific first argument.
* @param arg2 Message specific second argument.
*
* @return The number of messages posted; if less that ne number of workers
* then some postings failed.
*
* @attention The return value tells *only* whether message could be posted,
* *not* that it has reached the worker.
*
* @attentsion Exactly the same arguments are passed to all workers. Take that
* into account if the passed data must be freed.
*
* @attention This function is signal safe.
*/
size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
/**
* Call a function on all workers
*
* A convenience function for executing simple tasks on all workers. The task
* will be executed immediately on the current worker and thus recursive calls
* into functions should not be done.
*
* @param cb Callback to call
* @param data Data passed to the callback
*
* @return The number of messages posted; if less that ne number of workers
* then some postings failed.
*/
size_t mxs_rworker_broadcast(void (* cb)(void* data), void* data);
/**
* Add a session to the current routing worker's session container. Currently
* only required for some special commands e.g. "KILL <process_id>" to work.
*
* @param session Session to add.
* @return true if successful, false if id already existed in map.
*/
bool mxs_rworker_register_session(MXS_SESSION* session);
/**
* Remove a session from the current routing worker's session container. Does
* not actually remove anything from an epoll-set or affect the session in any
* way.
*
* @param id Which id to remove.
* @return The removed session or NULL if not found.
*/
bool mxs_rworker_deregister_session(uint64_t id);
/**
* Find a session in the current routing worker's session container.
*
* @param id Which id to find.
* @return The found session or NULL if not found.
*/
MXS_SESSION* mxs_rworker_find_session(uint64_t id);
/**
* Worker local storage
*/
/**
* Initialize a globally unique data identifier
*
* The value returned by this function is used with the other data commands.
* The value is a unique handle to thread-local storage.
*
* @return The data identifier usable for worker local data storage
*/
uint64_t mxs_rworker_create_key();
/**
* Set local worker data on current worker
*
* @param key Key acquired with create_data
* @param data Data to store
* @param callback Callback used to delete the data, NULL if no deletion is
* required. This function is called by mxs_rworker_delete_data
* when the data is deleted.
*/
void mxs_rworker_set_data(uint64_t key, void* data, void (* callback)(void*));
/**
* Get local data from current worker
*
* @param key Key to use
*
* @return Data previously stored or NULL if no data was previously stored
*/
void* mxs_rworker_get_data(uint64_t key);
/**
* Deletes local data from all workers
*
* The key must not be used again after deletion.
*
* @param key Key to remove
*/
void mxs_rworker_delete_data(uint64_t key);
MXS_END_DECLS
namespace maxscale
{
class RoutingWorker : public mxb::Worker
, private MXB_POLL_DATA
{
RoutingWorker(const RoutingWorker&) = delete;
RoutingWorker& operator=(const RoutingWorker&) = delete;
public:
enum
{
MAIN = -1
};
typedef Registry<MXS_SESSION> SessionsById;
typedef std::vector<DCB*> Zombies;
typedef std::vector<void*> LocalData;
typedef std::vector<void (*)(void*)> DataDeleters;
/**
* Initialize the routing worker mechanism.
*
* To be called once at process startup. This will cause as many workers
* to be created as the number of threads defined.
*
* @return True if the initialization succeeded, false otherwise.
*/
static bool init();
/**
* Finalize the worker mechanism.
*
* To be called once at process shutdown. This will cause all workers
* to be destroyed. When the function is called, no worker should be
* running anymore.
*/
static void finish();
/**
* Add a file descriptor to the epoll instance shared between all workers.
* Events occuring on the provided file descriptor will be handled by all
* workers. This is primarily intended for listening sockets where the
* only event is EPOLLIN, signaling that accept() can be used on the listening
* socket for creating a connected socket to a client.
*
* @param fd The file descriptor to be added.
* @param events Mask of epoll event types.
* @param pData The poll data associated with the descriptor:
*
* data->handler : Handler that knows how to deal with events
* for this particular type of 'struct mxs_poll_data'.
* data->thread.id: 0
*
* @return True, if the descriptor could be added, false otherwise.
*/
static bool add_shared_fd(int fd, uint32_t events, MXB_POLL_DATA* pData);
/**
* Remove a file descriptor from the epoll instance shared between all workers.
*
* @param fd The file descriptor to be removed.
*
* @return True on success, false on failure.
*/
static bool remove_shared_fd(int fd);
/**
* Returns the id of the routing worker
*
* @return The id of the routing worker.
*/
int id() const override
{
return m_id;
}
/**
* Register zombie for later deletion.
*
* @param pZombie DCB that will be deleted at end of event loop.
*
* @note The DCB must be owned by this worker.
*/
void register_zombie(DCB* pZombie);
/**
* Return a reference to the session registry of this worker.
*
* @return Session registry.
*/
SessionsById& session_registry();
/**
* Return the worker associated with the provided worker id.
*
* @param worker_id A worker id. By specifying MAIN, the routing worker
* running in the main thread will be returned.
*
* @return The corresponding worker instance, or NULL if the id does
* not correspond to a worker.
*/
static RoutingWorker* get(int worker_id);
/**
* Return the worker associated with the current thread.
*
* @return The worker instance, or NULL if the current thread does not have a worker.
*/
static RoutingWorker* get_current();
/**
* Return the worker id associated with the current thread.
*
* @return A worker instance, or -1 if the current thread does not have a worker.
*/
static int get_current_id();
/**
* Starts all routing workers.
*
* @return True, if all workers could be started.
*/
static bool start_workers();
/**
* Waits for all routing workers.
*/
static void join_workers();
/**
* Deprecated
*/
static void set_nonblocking_polls(unsigned int nbpolls);
/**
* Deprecated
*/
static void set_maxwait(unsigned int maxwait);
/**
* Posts a task to all workers for execution.
*
* @param pTask The task to be executed.
* @param pSem If non-NULL, will be posted once per worker when the task's
* `execute` return.
*
* @return How many workers the task was posted to.
*
* @attention The very same task will be posted to all workers. The task
* should either not have any sharable data or then it should
* have data specific to each worker that can be accessed
* without locks.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t broadcast(Task* pTask, mxb::Semaphore* pSem = NULL);
/**
* Posts a task to all workers for execution.
*
* @param pTask The task to be executed.
*
* @return How many workers the task was posted to.
*
* @attention The very same task will be posted to all workers. The task
* should either not have any sharable data or then it should
* have data specific to each worker that can be accessed
* without locks.
*
* @attention Once the task has been executed by all workers, it will
* be deleted.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t broadcast(std::unique_ptr<DisposableTask> sTask);
/**
* Posts a function to all workers for execution.
*
* @param pSem If non-NULL, will be posted once the task's `execute` return.
* @param mode Execution mode
*
* @return How many workers the task was posted to.
*/
static size_t broadcast(std::function<void ()> func, mxb::Semaphore* pSem, execute_mode_t mode);
static size_t broadcast(std::function<void ()> func, enum execute_mode_t mode)
{
return broadcast(func, NULL, mode);
}
/**
* Executes a task on all workers in serial mode (the task is executed
* on at most one worker thread at a time). When the function returns
* the task has been executed on all workers.
*
* @param task The task to be executed.
*
* @return How many workers the task was posted to.
*
* @warning This function is extremely inefficient and will be slow compared
* to the other functions. Only use this function when printing thread-specific
* data to stdout.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t execute_serially(Task& task);
static size_t execute_serially(std::function<void()> func);
/**
* Executes a task on all workers concurrently and waits until all workers
* are done. That is, when the function returns the task has been executed
* by all workers.
*
* @param task The task to be executed.
*
* @return How many workers the task was posted to.
*
* @attention The task will be posted to each routing worker using the
* EXECUTE_AUTO execution mode. That is, if the calling thread
* is that of a routing worker, then the task will be executed
* directly without going through the message loop of the worker,
* otherwise the task is delivered via the message loop.
*/
static size_t execute_concurrently(Task& task);
static size_t execute_concurrently(std::function<void()> func);
/**
* Broadcast a message to all worker.
*
* @param msg_id The message id.
* @param arg1 Message specific first argument.
* @param arg2 Message specific second argument.
*
* @return The number of messages posted; if less that ne number of workers
* then some postings failed.
*
* @attention The return value tells *only* whether message could be posted,
* *not* that it has reached the worker.
*
* @attentsion Exactly the same arguments are passed to all workers. Take that
* into account if the passed data must be freed.
*
* @attention This function is signal safe.
*/
static size_t broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
/**
* Initate shutdown of all workers.
*
* @attention A call to this function will only initiate the shutdowm,
* the workers will not have shut down when the function returns.
*
* @attention This function is signal safe.
*/
static void shutdown_all();
/**
* Returns statistics for all workers.
*
* @return Combined statistics.
*
* @attentions The statistics may no longer be accurate by the time it has
* been returned. The returned values may also not represent a
* 100% consistent set.
*/
static STATISTICS get_statistics();
/**
* Return a specific combined statistic value.
*
* @param what What to return.
*
* @return The corresponding value.
*/
static int64_t get_one_statistic(POLL_STAT what);
/**
* Get next worker
*
* @return The worker where work should be assigned
*/
static RoutingWorker* pick_worker();
/**
* Worker local storage
*/
/**
* Initialize a globally unique data identifier
*
* @return The data identifier usable for worker local data storage
*/
static uint64_t create_key()
{
static std::atomic<uint64_t> id_generator {0};
return id_generator.fetch_add(1, std::memory_order_relaxed);
}
/**
* Set local data
*
* @param key Key acquired with create_local_data
* @param data Data to store
*/
void set_data(uint64_t key, void* data, void (* callback)(void*))
{
if (m_local_data.size() <= key)
{
m_local_data.resize(key + 1, nullptr);
m_data_deleters.resize(key + 1, nullptr);
}
if (callback)
{
m_data_deleters[key] = callback;
}
m_local_data[key] = data;
}
/**
* Get local data
*
* @param key Key to use
*
* @return Data previously stored
*/
void* get_data(uint64_t key) const
{
return key < m_local_data.size() ? m_local_data[key] : nullptr;
}
/**
* Deletes local data
*
* If a callback was passed when the data was set, it will be called.
*
* @param key Key to remove
*/
void delete_data(uint64_t key)
{
if (key < m_local_data.size())
{
if (auto deleter = m_data_deleters[key])
{
deleter(m_local_data[key]);
}
m_data_deleters[key] = nullptr;
m_local_data[key] = nullptr;
}
}
/**
* Provides QC statistics of one workers
*
* @param id[in] Id of worker.
* @param pStats[out] The QC statistics of that worker.
*
* return True, if @c id referred to a worker, false otherwise.
*/
static bool get_qc_stats(int id, QC_CACHE_STATS* pStats);
/**
* Provides QC statistics of all workers
*
* @param all_stats Vector that on return will contain the statistics of all workers.
*/
static void get_qc_stats(std::vector<QC_CACHE_STATS>& all_stats);
/**
* Provides QC statistics of all workers as a Json object for use in the REST-API.
*/
static std::unique_ptr<json_t> get_qc_stats_as_json(const char* zHost);
/**
* Provides QC statistics of one worker as a Json object for use in the REST-API.
*
* @param zHost The name of the MaxScale host.
* @param id An id of a worker.
*
* @return A json object if @c id refers to a worker, NULL otherwise.
*/
static std::unique_ptr<json_t> get_qc_stats_as_json(const char* zHost, int id);
/**
* To be called from the initial (parent) thread if the systemd watchdog is on.
*/
static void set_watchdog_interval(uint64_t microseconds);
class WatchdogWorkaround;
friend WatchdogWorkaround;
/**
* @class WatchdogWorkaround
*
* RAII-class using which the systemd watchdog notification can be
* handled during synchronous worker activity that causes the epoll
* event handling to be stalled.
*
* The constructor turns on the workaround and the destructor
* turns it off.
*/
class WatchdogWorkaround
{
WatchdogWorkaround(const WatchdogWorkaround&);
WatchdogWorkaround& operator=(const WatchdogWorkaround&);
public:
/**
* Turns on the watchdog workaround for a specific worker.
*
* @param pWorker The worker for which the systemd notification
* should be arranged. Need not be the calling worker.
*/
WatchdogWorkaround(RoutingWorker* pWorker)
: m_pWorker(pWorker)
{
mxb_assert(pWorker);
m_pWorker->start_watchdog_workaround();
}
/**
* Turns on the watchdog workaround for the calling worker.
*/
WatchdogWorkaround()
: WatchdogWorkaround(RoutingWorker::get_current())
{
}
/**
* Turns off the watchdog workaround.
*/
~WatchdogWorkaround()
{
m_pWorker->stop_watchdog_workaround();
}
private:
RoutingWorker* m_pWorker;
};
private:
class WatchdogNotifier;
friend WatchdogNotifier;
const int m_id; /*< The id of the worker. */
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
* should contain sessions exclusive to this
* worker and not e.g. listener sessions. For now,
* it's up to the protocol to decide whether a new
* session is added to the map. */
Zombies m_zombies; /*< DCBs to be deleted. */
LocalData m_local_data; /*< Data local to this worker */
DataDeleters m_data_deleters; /*< Delete functions for the local data */
RoutingWorker();
virtual ~RoutingWorker();
static RoutingWorker* create(int epoll_listener_fd);
bool pre_run(); // override
void post_run(); // override
void epoll_tick(); // override
void delete_zombies();
void check_systemd_watchdog();
void start_watchdog_workaround();
void stop_watchdog_workaround();
static uint32_t epoll_instance_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events);
uint32_t handle_epoll_events(uint32_t events);
static maxbase::Duration s_watchdog_interval; /*< Duration between notifications, if any. */
static maxbase::TimePoint s_watchdog_next_check;/*< Next time to notify systemd. */
std::atomic<bool> m_alive; /*< Set to true in epoll_tick(), false on
* notification. */
WatchdogNotifier* m_pWatchdog_notifier; /*< Watchdog notifier, if systemd enabled. */
};
using WatchdogWorkaround = RoutingWorker::WatchdogWorkaround;
// Data local to a routing worker
template<class T>
class rworker_local
{
public:
rworker_local(const rworker_local&) = delete;
rworker_local& operator=(const rworker_local&) = delete;
// Default initialized
rworker_local()
: m_handle(mxs_rworker_create_key())
{
}
// Copy-constructed
rworker_local(const T& t)
: m_handle(mxs_rworker_create_key())
, m_value(t)
{
}
~rworker_local()
{
mxs_rworker_delete_data(m_handle);
}
// Converts to a T reference
operator T&() const
{
return *get_local_value();
}
// Arrow operator
T* operator->() const
{
return get_local_value();
}
// Dereference operator
T& operator*()
{
return *get_local_value();
}
// Const version of dereference operator
const T& operator*() const
{
return *get_local_value();
}
/**
* Assign a value
*
* Sets the master value and triggers an update on all workers. The value is updated instantly
* if the calling thread is a worker thread.
*
* @param t The new value to assign
*/
void assign(const T& t)
{
std::unique_lock<std::mutex> guard(m_lock);
m_value = t;
guard.unlock();
// Update the value on all workers
mxs_rworker_broadcast(update_value, this);
}
/**
* Get all local values
*
* Note: this method must be called from the main worker thread.
*
* @return A vector containing the individual values for each worker
*/
std::vector<T> values() const
{
mxb_assert_message(RoutingWorker::get_current() == RoutingWorker::get(RoutingWorker::MAIN),
"this method must be called from the main worker thread");
std::vector<T> rval;
std::mutex lock;
mxb::Semaphore sem;
auto n = RoutingWorker::broadcast([&]() {
std::lock_guard<std::mutex> guard(lock);
rval.push_back(*get_local_value());
},
&sem,
RoutingWorker::EXECUTE_AUTO);
sem.wait_n(n);
return rval;
}
private:
uint64_t m_handle; // The handle to the worker local data
typename std::remove_const<T>::type m_value; // The master value, never used directly
mutable std::mutex m_lock; // Protects the master value
private:
T* get_local_value() const
{
auto worker = RoutingWorker::get_current();
T* my_value = static_cast<T*>(worker->get_data(m_handle));
if (my_value == nullptr)
{
// First time we get the local value, allocate it from the master value
std::unique_lock<std::mutex> guard(m_lock);
my_value = new T(m_value);
guard.unlock();
worker->set_data(m_handle, my_value, destroy_value);
}
mxb_assert(my_value);
return my_value;
}
void update_local_value()
{
// As get_local_value can cause a lock to be taken, we need the pointer to our value before
// we lock the master value for the updating of our value.
T* my_value = get_local_value();
std::lock_guard<std::mutex> guard(m_lock);
*my_value = m_value;
}
static void update_value(void* data)
{
static_cast<rworker_local<T>*>(data)->update_local_value();
}
static void destroy_value(void* data)
{
delete static_cast<T*>(data);
}
};
}
/**
* @brief Convert a routing worker to JSON format
*
* @param host Hostname of this server
* @param id ID of the worker
*
* @return JSON resource representing the worker
*/
json_t* mxs_rworker_to_json(const char* host, int id);
/**
* Convert routing workers into JSON format
*
* @param host Hostname of this server
*
* @return A JSON resource collection of workers
*
* @see mxs_json_resource()
*/
json_t* mxs_rworker_list_to_json(const char* host);
/**
* @brief MaxScale worker watchdog
*
* If this function returns, then MaxScale is alive. If not,
* then some thread is dead.
*/
void mxs_rworker_watchdog();