Make dcb_foreach thread-safe
The function was no longer thread-safe as it used the obsolete per-thread spinlocks to iterate over the DCBs. Now the function uses the newly added WorkerTask class to iterate over them. Since the new WorkerTask mechanism is far superion to dcb_foreach, the latter is now deprecated.
This commit is contained in:
@ -325,13 +325,32 @@ void dcb_process_idle_sessions(int thr);
|
|||||||
/**
|
/**
|
||||||
* @brief Call a function for each connected DCB
|
* @brief Call a function for each connected DCB
|
||||||
*
|
*
|
||||||
|
* @deprecated You should not use this function, use dcb_foreach_parallel instead
|
||||||
|
*
|
||||||
* @param func Function to call. The function should return @c true to continue iteration
|
* @param func Function to call. The function should return @c true to continue iteration
|
||||||
* and @c false to stop iteration earlier. The first parameter is a DCB and the second
|
* and @c false to stop iteration earlier. The first parameter is a DCB and the second
|
||||||
* is the value of @c data that the user provided.
|
* is the value of @c data that the user provided.
|
||||||
* @param data User provided data passed as the second parameter to @c func
|
* @param data User provided data passed as the second parameter to @c func
|
||||||
* @return True if all DCBs were iterated, false if the callback returned false
|
* @return True if all DCBs were iterated, false if the callback returned false
|
||||||
*/
|
*/
|
||||||
bool dcb_foreach(bool (*func)(DCB *, void *), void *data);
|
bool dcb_foreach(bool (*func)(DCB *dcb, void *data), void *data);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Call a function for each connected DCB
|
||||||
|
*
|
||||||
|
* @note This function can call @c func from multiple thread at one time.
|
||||||
|
*
|
||||||
|
* @param func Function to call. The function should return @c true to continue iteration
|
||||||
|
* and @c false to stop iteration earlier. The first is a DCB and
|
||||||
|
* the second is this thread's value in the @c data array that
|
||||||
|
* the user provided.
|
||||||
|
*
|
||||||
|
* @param data Array of user provided data passed as the second parameter to @c func.
|
||||||
|
* The array must have more space for pointers thann the return
|
||||||
|
* value of `config_threadcount()`. The value passed to @c func will
|
||||||
|
* be the value of the array at the index of the current thread's ID.
|
||||||
|
*/
|
||||||
|
void dcb_foreach_parallel(bool (*func)(DCB *dcb, void *data), void **data);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Return the port number this DCB is connected to
|
* @brief Return the port number this DCB is connected to
|
||||||
|
@ -19,88 +19,51 @@
|
|||||||
* block is the user data that is handled by the epoll system and contains
|
* block is the user data that is handled by the epoll system and contains
|
||||||
* the state data and pointers to other components that relate to the
|
* the state data and pointers to other components that relate to the
|
||||||
* use of a file descriptor.
|
* use of a file descriptor.
|
||||||
*
|
|
||||||
* @verbatim
|
|
||||||
* Revision History
|
|
||||||
*
|
|
||||||
* Date Who Description
|
|
||||||
* 12/06/13 Mark Riddoch Initial implementation
|
|
||||||
* 21/06/13 Massimiliano Pinto free_dcb is used
|
|
||||||
* 25/06/13 Massimiliano Pinto Added checks to session and router_session
|
|
||||||
* 28/06/13 Mark Riddoch Changed the free mechanism to
|
|
||||||
* introduce a zombie state for the
|
|
||||||
* dcb
|
|
||||||
* 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and
|
|
||||||
* authlock for handling backend
|
|
||||||
* asynchronous protocol connection
|
|
||||||
* and a generic lock for backend
|
|
||||||
* authentication
|
|
||||||
* 16/07/2013 Massimiliano Pinto Added command type for dcb
|
|
||||||
* 23/07/2013 Mark Riddoch Tidy up logging
|
|
||||||
* 02/09/2013 Massimiliano Pinto Added session refcount
|
|
||||||
* 27/09/2013 Massimiliano Pinto dcb_read returns 0 if ioctl returns no
|
|
||||||
* error and 0 bytes to read.
|
|
||||||
* This fixes a bug with many reads from
|
|
||||||
* backend
|
|
||||||
* 07/05/2014 Mark Riddoch Addition of callback mechanism
|
|
||||||
* 20/06/2014 Mark Riddoch Addition of dcb_clone
|
|
||||||
* 29/05/2015 Markus Makela Addition of dcb_write_SSL
|
|
||||||
* 11/06/2015 Martin Brampton Persistent connnections and tidy up
|
|
||||||
* 07/07/2015 Martin Brampton Merged add to zombieslist into dcb_close,
|
|
||||||
* fixes for various error situations,
|
|
||||||
* remove dcb_set_state etc, simplifications.
|
|
||||||
* 10/07/2015 Martin Brampton Simplify, merge dcb_read and dcb_read_n
|
|
||||||
* 04/09/2015 Martin Brampton Changes to ensure DCB always has session pointer
|
|
||||||
* 28/09/2015 Martin Brampton Add counters, maxima for DCBs and zombies
|
|
||||||
* 29/05/2015 Martin Brampton Impose locking in dcb_call_foreach callbacks
|
|
||||||
* 17/10/2015 Martin Brampton Add hangup for each and bitmask display MaxAdmin
|
|
||||||
* 15/12/2015 Martin Brampton Merge most of SSL write code into non-SSL,
|
|
||||||
* enhance SSL code
|
|
||||||
* 07/02/2016 Martin Brampton Make dcb_read_SSL & dcb_create_SSL internal,
|
|
||||||
* further small SSL logic changes
|
|
||||||
* 31/05/2016 Martin Brampton Implement connection throttling
|
|
||||||
* 27/06/2016 Martin Brampton Implement list manager to manage DCB memory
|
|
||||||
*
|
|
||||||
* @endverbatim
|
|
||||||
*/
|
*/
|
||||||
#include <maxscale/dcb.h>
|
#include <maxscale/dcb.h>
|
||||||
|
|
||||||
|
#include <arpa/inet.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
#include <netinet/tcp.h>
|
||||||
|
#include <signal.h>
|
||||||
#include <stdarg.h>
|
#include <stdarg.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <signal.h>
|
|
||||||
#include <time.h>
|
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
#include <sys/ioctl.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
#include <maxscale/spinlock.h>
|
#include <maxscale/alloc.h>
|
||||||
|
#include <maxscale/atomic.h>
|
||||||
|
#include <maxscale/atomic.h>
|
||||||
|
#include <maxscale/hashtable.h>
|
||||||
|
#include <maxscale/hk_heartbeat.h>
|
||||||
|
#include <maxscale/limits.h>
|
||||||
|
#include <maxscale/listener.h>
|
||||||
|
#include <maxscale/log_manager.h>
|
||||||
|
#include <maxscale/platform.h>
|
||||||
|
#include <maxscale/poll.h>
|
||||||
|
#include <maxscale/router.h>
|
||||||
|
#include <maxscale/semaphore.hh>
|
||||||
#include <maxscale/server.h>
|
#include <maxscale/server.h>
|
||||||
#include <maxscale/service.h>
|
#include <maxscale/service.h>
|
||||||
#include <maxscale/router.h>
|
#include <maxscale/spinlock.h>
|
||||||
#include <maxscale/poll.h>
|
|
||||||
#include <maxscale/atomic.h>
|
|
||||||
#include <maxscale/limits.h>
|
|
||||||
#include <maxscale/log_manager.h>
|
|
||||||
#include <maxscale/hashtable.h>
|
|
||||||
#include <maxscale/listener.h>
|
|
||||||
#include <maxscale/hk_heartbeat.h>
|
|
||||||
#include <netinet/tcp.h>
|
|
||||||
#include <arpa/inet.h>
|
|
||||||
#include <sys/ioctl.h>
|
|
||||||
#include <sys/stat.h>
|
|
||||||
#include <sys/socket.h>
|
|
||||||
#include <sys/un.h>
|
|
||||||
#include <maxscale/alloc.h>
|
|
||||||
#include <maxscale/utils.h>
|
#include <maxscale/utils.h>
|
||||||
#include <maxscale/platform.h>
|
|
||||||
|
|
||||||
#include "maxscale/session.h"
|
|
||||||
#include "maxscale/modules.h"
|
#include "maxscale/modules.h"
|
||||||
#include "maxscale/queuemanager.h"
|
#include "maxscale/queuemanager.h"
|
||||||
|
#include "maxscale/semaphore.hh"
|
||||||
|
#include "maxscale/session.h"
|
||||||
#include "maxscale/worker.hh"
|
#include "maxscale/worker.hh"
|
||||||
|
#include "maxscale/workertask.hh"
|
||||||
|
|
||||||
using maxscale::Worker;
|
using maxscale::Worker;
|
||||||
|
using maxscale::WorkerTask;
|
||||||
|
using maxscale::Semaphore;
|
||||||
|
|
||||||
/* A DCB with null values, used for initialization */
|
/* A DCB with null values, used for initialization */
|
||||||
static DCB dcb_initialized;
|
static DCB dcb_initialized;
|
||||||
@ -3066,28 +3029,90 @@ void dcb_process_idle_sessions(int thr)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool dcb_foreach(bool(*func)(DCB *, void *), void *data)
|
/** Helper class for serial iteration over all DCBs */
|
||||||
|
class SerialDcbTask : public WorkerTask
|
||||||
{
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
int nthr = config_threadcount();
|
SerialDcbTask(bool(*func)(DCB *, void *), void *data):
|
||||||
bool more = true;
|
m_func(func),
|
||||||
|
m_data(data),
|
||||||
|
m_more(1)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < nthr && more; i++)
|
void execute(Worker& worker)
|
||||||
{
|
{
|
||||||
spinlock_acquire(&all_dcbs_lock[i]);
|
int thread_id = worker.id();
|
||||||
|
|
||||||
for (DCB *dcb = all_dcbs[i]; dcb && more; dcb = dcb->thread.next)
|
for (DCB *dcb = all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); dcb = dcb->thread.next)
|
||||||
{
|
{
|
||||||
if (!func(dcb, data))
|
if (!m_func(dcb, m_data))
|
||||||
{
|
{
|
||||||
more = false;
|
atomic_store_int32(&m_more, 0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
spinlock_release(&all_dcbs_lock[i]);
|
bool more() const
|
||||||
|
{
|
||||||
|
return m_more;
|
||||||
}
|
}
|
||||||
|
|
||||||
return more;
|
private:
|
||||||
|
bool(*m_func)(DCB *dcb, void *data);
|
||||||
|
void* m_data;
|
||||||
|
int m_more;
|
||||||
|
};
|
||||||
|
|
||||||
|
bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data)
|
||||||
|
{
|
||||||
|
SerialDcbTask task(func, data);
|
||||||
|
Worker::execute_on_all_serially(&task);
|
||||||
|
return task.more();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Helper class for parallel iteration over all DCBs */
|
||||||
|
class ParallelDcbTask : public WorkerTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
ParallelDcbTask(bool(*func)(DCB *, void *), void **data):
|
||||||
|
m_func(func),
|
||||||
|
m_data(data)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void execute(Worker& worker)
|
||||||
|
{
|
||||||
|
int thread_id = worker.id();
|
||||||
|
|
||||||
|
for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
|
||||||
|
{
|
||||||
|
if (!m_func(dcb, m_data[thread_id]))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool(*m_func)(DCB *dcb, void *data);
|
||||||
|
void** m_data;
|
||||||
|
};
|
||||||
|
|
||||||
|
void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data)
|
||||||
|
{
|
||||||
|
Semaphore sem;
|
||||||
|
ParallelDcbTask task(func, data);
|
||||||
|
size_t n = Worker::execute_on_all(&task, &sem);
|
||||||
|
|
||||||
|
// TODO: Use the multi-wait version of this function
|
||||||
|
for (size_t i = 0; i < n; i++)
|
||||||
|
{
|
||||||
|
sem.wait();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int dcb_get_port(const DCB *dcb)
|
int dcb_get_port(const DCB *dcb)
|
||||||
|
Reference in New Issue
Block a user