MXS-1929: Add worker local storage of data

Data can now be stored on thread-local storage of the worker. By acquiring
a unique handle from the worker, a module can store a thread-local
value.

This functionality will be used to store configurations that are sometimes
updated at runtime but are largely read-only. By avoiding shared data
altogether, performance is not affected. The only synchronization that is
done is on update.

Also added a helper functions for broadcasting tasks on all routing
workers. With the old mxs_rworker_broadcast_message function, if a
function call was broadcasted it was always queued for execution. The
mxs_rworker_broadcast will immediately execute the task on the local
worker and queue it for execution of other routing workers.
This commit is contained in:
Markus Mäkelä
2018-07-26 10:10:05 +03:00
parent a833f39196
commit ff07009d8c
3 changed files with 187 additions and 0 deletions

View File

@ -68,6 +68,21 @@ int mxs_rworker_get_current_id();
*/
size_t mxs_rworker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
/**
* Call a function on all workers
*
* A convenience function for executing simple tasks on all workers. The task
* will be executed immediately on the current worker and thus recursive calls
* into functions should not be done.
*
* @param cb Callback to call
* @param data Data passed to the callback
*
* @return The number of messages posted; if less that ne number of workers
* then some postings failed.
*/
size_t mxs_rworker_broadcast(void (*cb)(void* data), void* data);
/**
* Add a session to the current routing worker's session container. Currently
* only required for some special commands e.g. "KILL <process_id>" to work.
@ -95,4 +110,47 @@ bool mxs_rworker_deregister_session(uint64_t id);
*/
MXS_SESSION* mxs_rworker_find_session(uint64_t id);
/**
* Worker local storage
*/
/**
* Initialize a globally unique data identifier
*
* The value returned by this function is used with the other data commands.
* The value is a unique handle to thread-local storage.
*
* @return The data identifier usable for worker local data storage
*/
uint64_t mxs_rworker_create_key();
/**
* Set local worker data on current worker
*
* @param key Key acquired with create_data
* @param data Data to store
* @param callback Callback used to delete the data, NULL if no deletion is
* required. This function is called by mxs_rworker_delete_data
* when the data is deleted.
*/
void mxs_rworker_set_data(uint64_t key, void* data, void (*callback)(void*));
/**
* Get local data from current worker
*
* @param key Key to use
*
* @return Data previously stored or NULL if no data was previously stored
*/
void* mxs_rworker_get_data(uint64_t key);
/**
* Deletes local data from all workers
*
* The key must not be used again after deletion.
*
* @param key Key to remove
*/
void mxs_rworker_delete_data(uint64_t key);
MXS_END_DECLS

View File

@ -13,8 +13,12 @@
*/
#include <maxscale/cppdefs.hh>
#include <unordered_map>
#include <maxscale/routingworker.h>
#include <maxscale/worker.hh>
#include "session.hh"
namespace maxscale
@ -35,6 +39,9 @@ public:
typedef Registry<MXS_SESSION> SessionsById;
typedef std::vector<DCB*> Zombies;
typedef std::unordered_map<uint64_t, void*> LocalData;
typedef std::unordered_map<uint64_t, void(*)(void*)> DataDeleters;
/**
* Initialize the routing worker mechanism.
*
@ -278,6 +285,75 @@ public:
*/
static RoutingWorker* pick_worker();
/**
* Worker local storage
*/
/**
* Initialize a globally unique data identifier
*
* @return The data identifier usable for worker local data storage
*/
static uint64_t create_key()
{
static uint64_t id_generator = 0;
return atomic_add_uint64(&id_generator, 1);
}
/**
* Set local data
*
* @param key Key acquired with create_local_data
* @param data Data to store
*/
void set_data(uint64_t key, void* data, void (*callback)(void*))
{
if (callback)
{
m_data_deleters[key] = callback;
}
m_local_data[key] = data;
}
/**
* Get local data
*
* @param key Key to use
*
* @return Data previously stored
*/
void* get_data(uint64_t key)
{
auto it = m_local_data.find(key);
return it != m_local_data.end() ? it->second : NULL;
}
/**
* Deletes local data
*
* If a callback was passed when the data was set, it will be called.
*
* @param key Key to remove
*/
void delete_data(uint64_t key)
{
auto data = m_local_data.find(key);
if (data != m_local_data.end())
{
auto deleter = m_data_deleters.find(key);
if (deleter != m_data_deleters.end())
{
deleter->second(data->second);
m_data_deleters.erase(deleter);
}
m_local_data.erase(data);
}
}
private:
const int m_id; /*< The id of the worker. */
SessionsById m_sessions; /*< A mapping of session_id->MXS_SESSION. The map
@ -286,6 +362,8 @@ private:
* it's up to the protocol to decide whether a new
* session is added to the map. */
Zombies m_zombies; /*< DCBs to be deleted. */
LocalData m_local_data; /*< Data local to this worker */
DataDeleters m_data_deleters; /*< Delete functions for the local data */
RoutingWorker();
virtual ~RoutingWorker();

View File

@ -997,6 +997,57 @@ private:
const char* m_zHost;
};
class FunctionTask: public maxscale::WorkerDisposableTask
{
public:
FunctionTask(std::function<void ()> cb):
m_cb(cb)
{
}
void execute(Worker& worker)
{
m_cb();
}
protected:
std::function<void ()> m_cb;
};
}
size_t mxs_rworker_broadcast(void (*cb)(void* data), void* data)
{
return RoutingWorker::broadcast(std::auto_ptr<FunctionTask>(new FunctionTask([&]()
{
cb(data);
})));
}
uint64_t mxs_rworker_create_key()
{
return RoutingWorker::create_key();
}
void mxs_rworker_set_data(uint64_t key, void* data, void (*callback)(void*))
{
RoutingWorker::get_current()->set_data(key, data, callback);
}
void* mxs_rworker_get_data(uint64_t key)
{
return RoutingWorker::get_current()->get_data(key);
}
void mxs_rworker_delete_data(uint64_t key)
{
auto func = [key]()
{
RoutingWorker::get_current()->delete_data(key);
};
std::auto_ptr<FunctionTask> task(new FunctionTask(func));
RoutingWorker::broadcast(task);
}
json_t* mxs_rworker_to_json(const char* zHost, int id)