Merge branch 'develop' into MXS-1209

This commit is contained in:
MassimilianoPinto 2017-04-26 16:21:39 +02:00
commit e10dd1329d
15 changed files with 492 additions and 103 deletions

View File

@ -36,6 +36,32 @@ int atomic_add(int *variable, int value);
int64_t atomic_add_int64(int64_t *variable, int64_t value);
uint64_t atomic_add_uint64(uint64_t *variable, int64_t value);
/**
* Implementation of an atomic load operation for the GCC environment.
*
* Loads a value from the contents of a location pointed to by the first parameter.
* The load operation is atomic and it uses the strongest memory ordering.
*
* @param variable Pointer the the variable to load from
* @return The stored value
*/
int atomic_load_int32(int *variable);
int64_t atomic_load_int64(int64_t *variable);
uint64_t atomic_load_uint64(uint64_t *variable);
/**
* Implementation of an atomic store operation for the GCC environment.
*
* Stores a value to the contents of a location pointed to by the first parameter.
* The store operation is atomic and it uses the strongest memory ordering.
*
* @param variable Pointer the the variable to store to
* @param value Value to be stored
*/
void atomic_store_int32(int *variable, int value);
void atomic_store_int64(int64_t *variable, int64_t value);
void atomic_store_uint64(uint64_t *variable, uint64_t value);
/**
* @brief Impose a full memory barrier
*

View File

@ -325,13 +325,32 @@ void dcb_process_idle_sessions(int thr);
/**
* @brief Call a function for each connected DCB
*
* @deprecated You should not use this function, use dcb_foreach_parallel instead
*
* @param func Function to call. The function should return @c true to continue iteration
* and @c false to stop iteration earlier. The first parameter is a DCB and the second
* is the value of @c data that the user provided.
* @param data User provided data passed as the second parameter to @c func
* @return True if all DCBs were iterated, false if the callback returned false
*/
bool dcb_foreach(bool (*func)(DCB *, void *), void *data);
bool dcb_foreach(bool (*func)(DCB *dcb, void *data), void *data);
/**
* @brief Call a function for each connected DCB
*
* @note This function can call @c func from multiple thread at one time.
*
* @param func Function to call. The function should return @c true to continue iteration
* and @c false to stop iteration earlier. The first is a DCB and
* the second is this thread's value in the @c data array that
* the user provided.
*
* @param data Array of user provided data passed as the second parameter to @c func.
* The array must have more space for pointers thann the return
* value of `config_threadcount()`. The value passed to @c func will
* be the value of the array at the index of the current thread's ID.
*/
void dcb_foreach_parallel(bool (*func)(DCB *dcb, void *data), void **data);
/**
* @brief Return the port number this DCB is connected to

View File

@ -95,7 +95,7 @@ public:
/**
* @brief Waits on the semaphore.
*
* If the semaphore count is greater that zero, decrements the count and
* If the semaphore count is greater than zero, decrements the count and
* returns immediately. Otherwise blocks the caller until someone posts
* the semaphore.
*
@ -120,6 +120,39 @@ public:
return rc == 0;
}
/**
* @brief Waits multiple times on the semaphore.
*
* If the semaphore count is greater than or equal to the specified amount,
* decrements the count and returns immediately. Otherwise blocks the caller
* until the semaphore has been posted the required number of times.
*
* @param n_wait How many times should be waited.
* @param signal_approach Whether signals should be ignored or honoured.
*
* @return How many times the semaphore has been waited on.
*
* @attention The function can return a different number than `n_wait` only
* if `signal_approach` is `HONOUR_SIGNALS`.
*/
size_t wait_n(size_t n_wait,
signal_approach_t signal_approach = IGNORE_SIGNALS) const
{
bool waited = true;
size_t n_waited = 0;
while (waited && n_wait--)
{
waited = wait(signal_approach);
if (waited)
{
++n_waited;
}
}
return n_waited;
}
/**
* @brief Waits on the semaphore.
*
@ -190,6 +223,46 @@ public:
return rc == 0;
}
/**
* @brief Waits on the semaphore.
*
* Waits on the sempahore the specified number of times, at most until the
* specified time.
*
* @param n_wait How many times should be waited.
* @param ts The *absolute* time until which the waiting at
* most is performed.
* @param signal_approach Whether signals should be ignored or honoured.
*
* @return How many times the semaphore has been waited on. If the
* function times out or is interrupted, then the returned
* value will be less than `n_wait`.
*
* @attention If the function returns a value less than `n_count` and
* `signal_approch` is `HONOUR_SIGNALS` then the caller must check
* the value of `errno` to find out whether the call was timed out or
* interrupted. In the former case the value will be `ETIMEDOUT`
* and in the latter `EINTR.
*/
size_t timedwait_n(size_t n_wait,
struct timespec& ts,
signal_approach_t signal_approach = IGNORE_SIGNALS) const
{
bool waited = true;
size_t n_waited = 0;
while (waited && n_wait--)
{
waited = timedwait(ts, signal_approach);
if (waited)
{
++n_waited;
}
}
return n_waited;
}
/**
* @brief Waits on the semaphore.
*
@ -212,7 +285,43 @@ public:
*/
bool timedwait(time_t seconds,
long nseconds,
signal_approach_t signal_approach = IGNORE_SIGNALS) const;
signal_approach_t signal_approach = IGNORE_SIGNALS) const
{
timespec ts;
get_current_timespec(seconds, nseconds, &ts);
return timedwait(ts, signal_approach);
}
/**
* @brief Waits on the semaphore.
*
* Waits on the sempahore the specified number of times at most until the
* specified time.
*
* @param n_wait How many times should be waited.
* @param seconds How many seconds to wait at most.
* @param nseconds How many nanonseconds to wait at most.
* @param signal_approach Whether signals should be ignored or honoured.
*
* @return How many times the semaphore has been waited on. If the
* function times out or is interrupted, then the returned
* value will be less than `n_wait`.
*
* @attention If the function returns a value less than `n_count` and
* `signal_approch` is `HONOUR_SIGNALS` then the caller must check
* the value of `errno` to find out whether the call was timed out or
* interrupted. In the former case the value will be `ETIMEDOUT`
* and in the latter `EINTR.
*/
size_t timedwait_n(size_t n_wait,
time_t seconds,
long nseconds,
signal_approach_t signal_approach = IGNORE_SIGNALS) const
{
timespec ts;
get_current_timespec(seconds, nseconds, &ts);
return timedwait_n(n_wait, ts, signal_approach);
}
/**
* @brief Waits on the semaphore.
@ -237,6 +346,36 @@ public:
return timedwait(seconds, 0, signal_approach);
}
/**
* @brief Waits on the semaphore.
*
* Waits on the sempahore the specified number of times at most until the
* specified time.
*
* @param n_wait How many times should be waited.
* @param seconds How many seconds to wait at most.
* @param signal_approach Whether signals should be ignored or honoured.
*
* @return How many times the semaphore has been waited on. If the
* function times out or is interrupted, then the returned
* value will be less than `n_wait`.
*
* @attention If the function returns a value less than `n_count` and
* `signal_approch` is `HONOUR_SIGNALS` then the caller must check
* the value of `errno` to find out whether the call was timed out or
* interrupted. In the former case the value will be `ETIMEDOUT`
* and in the latter `EINTR.
*/
bool timedwait_n(size_t n_wait,
time_t seconds,
signal_approach_t signal_approach = IGNORE_SIGNALS) const
{
return timedwait_n(n_wait, seconds, 0, signal_approach);
}
private:
static void get_current_timespec(time_t seconds, long nseconds, timespec* pTs);
private:
mutable sem_t m_sem;
};

View File

@ -69,6 +69,15 @@ MXS_WORKER* mxs_worker_get(int worker_id);
*/
int mxs_worker_id(MXS_WORKER* pWorker);
/**
* Return the id of the worker.
*
* @return The id of the worker.
*
* @attention If there is no current worker, then -1 will be returned.
*/
int mxs_worker_get_current_id();
/**
* Post a message to a worker.
*

View File

@ -31,3 +31,33 @@ uint64_t atomic_add_uint64(uint64_t *variable, int64_t value)
{
return __sync_fetch_and_add(variable, value);
}
int atomic_load_int32(int *variable)
{
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
}
int64_t atomic_load_int64(int64_t *variable)
{
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
}
uint64_t atomic_load_uint64(uint64_t *variable)
{
return __atomic_load_n(variable, __ATOMIC_SEQ_CST);
}
void atomic_store_int32(int *variable, int value)
{
return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST);
}
void atomic_store_int64(int64_t *variable, int64_t value)
{
return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST);
}
void atomic_store_uint64(uint64_t *variable, uint64_t value)
{
return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST);
}

View File

@ -19,88 +19,51 @@
* block is the user data that is handled by the epoll system and contains
* the state data and pointers to other components that relate to the
* use of a file descriptor.
*
* @verbatim
* Revision History
*
* Date Who Description
* 12/06/13 Mark Riddoch Initial implementation
* 21/06/13 Massimiliano Pinto free_dcb is used
* 25/06/13 Massimiliano Pinto Added checks to session and router_session
* 28/06/13 Mark Riddoch Changed the free mechanism to
* introduce a zombie state for the
* dcb
* 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and
* authlock for handling backend
* asynchronous protocol connection
* and a generic lock for backend
* authentication
* 16/07/2013 Massimiliano Pinto Added command type for dcb
* 23/07/2013 Mark Riddoch Tidy up logging
* 02/09/2013 Massimiliano Pinto Added session refcount
* 27/09/2013 Massimiliano Pinto dcb_read returns 0 if ioctl returns no
* error and 0 bytes to read.
* This fixes a bug with many reads from
* backend
* 07/05/2014 Mark Riddoch Addition of callback mechanism
* 20/06/2014 Mark Riddoch Addition of dcb_clone
* 29/05/2015 Markus Makela Addition of dcb_write_SSL
* 11/06/2015 Martin Brampton Persistent connnections and tidy up
* 07/07/2015 Martin Brampton Merged add to zombieslist into dcb_close,
* fixes for various error situations,
* remove dcb_set_state etc, simplifications.
* 10/07/2015 Martin Brampton Simplify, merge dcb_read and dcb_read_n
* 04/09/2015 Martin Brampton Changes to ensure DCB always has session pointer
* 28/09/2015 Martin Brampton Add counters, maxima for DCBs and zombies
* 29/05/2015 Martin Brampton Impose locking in dcb_call_foreach callbacks
* 17/10/2015 Martin Brampton Add hangup for each and bitmask display MaxAdmin
* 15/12/2015 Martin Brampton Merge most of SSL write code into non-SSL,
* enhance SSL code
* 07/02/2016 Martin Brampton Make dcb_read_SSL & dcb_create_SSL internal,
* further small SSL logic changes
* 31/05/2016 Martin Brampton Implement connection throttling
* 27/06/2016 Martin Brampton Implement list manager to manage DCB memory
*
* @endverbatim
*/
#include <maxscale/dcb.h>
#include <arpa/inet.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <time.h>
#include <maxscale/spinlock.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
#include <maxscale/atomic.h>
#include <maxscale/hashtable.h>
#include <maxscale/hk_heartbeat.h>
#include <maxscale/limits.h>
#include <maxscale/listener.h>
#include <maxscale/log_manager.h>
#include <maxscale/platform.h>
#include <maxscale/poll.h>
#include <maxscale/router.h>
#include <maxscale/semaphore.hh>
#include <maxscale/server.h>
#include <maxscale/service.h>
#include <maxscale/router.h>
#include <maxscale/poll.h>
#include <maxscale/atomic.h>
#include <maxscale/limits.h>
#include <maxscale/log_manager.h>
#include <maxscale/hashtable.h>
#include <maxscale/listener.h>
#include <maxscale/hk_heartbeat.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <maxscale/alloc.h>
#include <maxscale/spinlock.h>
#include <maxscale/utils.h>
#include <maxscale/platform.h>
#include "maxscale/session.h"
#include "maxscale/modules.h"
#include "maxscale/queuemanager.h"
#include "maxscale/semaphore.hh"
#include "maxscale/session.h"
#include "maxscale/worker.hh"
#include "maxscale/workertask.hh"
using maxscale::Worker;
using maxscale::WorkerTask;
using maxscale::Semaphore;
/* A DCB with null values, used for initialization */
static DCB dcb_initialized;
@ -3066,28 +3029,84 @@ void dcb_process_idle_sessions(int thr)
}
}
bool dcb_foreach(bool(*func)(DCB *, void *), void *data)
/** Helper class for serial iteration over all DCBs */
class SerialDcbTask : public WorkerTask
{
public:
int nthr = config_threadcount();
bool more = true;
for (int i = 0; i < nthr && more; i++)
SerialDcbTask(bool(*func)(DCB *, void *), void *data):
m_func(func),
m_data(data),
m_more(1)
{
spinlock_acquire(&all_dcbs_lock[i]);
for (DCB *dcb = all_dcbs[i]; dcb && more; dcb = dcb->thread.next)
{
if (!func(dcb, data))
{
more = false;
}
}
spinlock_release(&all_dcbs_lock[i]);
}
return more;
void execute(Worker& worker)
{
int thread_id = worker.id();
for (DCB *dcb = all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); dcb = dcb->thread.next)
{
if (!m_func(dcb, m_data))
{
atomic_store_int32(&m_more, 0);
break;
}
}
}
bool more() const
{
return m_more;
}
private:
bool(*m_func)(DCB *dcb, void *data);
void* m_data;
int m_more;
};
bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data)
{
SerialDcbTask task(func, data);
Worker::execute_on_all_serially(&task);
return task.more();
}
/** Helper class for parallel iteration over all DCBs */
class ParallelDcbTask : public WorkerTask
{
public:
ParallelDcbTask(bool(*func)(DCB *, void *), void **data):
m_func(func),
m_data(data)
{
}
void execute(Worker& worker)
{
int thread_id = worker.id();
for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next)
{
if (!m_func(dcb, m_data[thread_id]))
{
break;
}
}
}
private:
bool(*m_func)(DCB *dcb, void *data);
void** m_data;
};
void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data)
{
Semaphore sem;
ParallelDcbTask task(func, data);
sem.wait_n(Worker::execute_on_all(&task, &sem));
}
int dcb_get_port(const DCB *dcb)

View File

@ -320,6 +320,21 @@ public:
*/
static size_t execute_on_all(std::auto_ptr<DisposableTask> sTask);
/**
* Executes a task on all workers in serial mode.
*
* The task is executed on at most one worker thread at a time.
*
* @param pTask The task to be executed.
*
* @return How many workers the task was posted to.
*
* @warning This function is extremely inefficient and will be slow compared
* to the other functions. Only use this function when printing thread-specific
* data to stdout.
*/
static size_t execute_on_all_serially(Task* pTask);
/**
* Post a message to a worker.
*

View File

@ -55,8 +55,8 @@ protected:
private:
friend class Worker;
void inc_count();
void dec_count();
void inc_ref();
void dec_ref();
private:
int32_t m_count;

View File

@ -17,13 +17,14 @@
namespace maxscale
{
bool Semaphore::timedwait(time_t seconds,
long nseconds,
signal_approach_t signal_approach) const
//static
void Semaphore::get_current_timespec(time_t seconds,
long nseconds,
timespec* pTs)
{
ss_dassert(nseconds <= 999999999);
timespec ts;
timespec& ts = *pTs;
ss_debug(int rc=) clock_gettime(CLOCK_REALTIME, &ts);
ss_dassert(rc == 0);
@ -39,8 +40,6 @@ bool Semaphore::timedwait(time_t seconds,
}
ts.tv_nsec = nseconds_sum;
return timedwait(ts, signal_approach);
}
}

View File

@ -1,3 +1,4 @@
add_executable(test_atomic testatomic.c)
add_executable(test_adminusers testadminusers.c)
add_executable(test_buffer testbuffer.c)
add_executable(test_dcb testdcb.c)
@ -22,6 +23,7 @@ add_executable(testmaxscalepcre2 testmaxscalepcre2.c)
add_executable(testmodulecmd testmodulecmd.c)
add_executable(testconfig testconfig.c)
add_executable(trxboundaryparser_profile trxboundaryparser_profile.cc)
target_link_libraries(test_atomic maxscale-common)
target_link_libraries(test_adminusers maxscale-common)
target_link_libraries(test_buffer maxscale-common)
target_link_libraries(test_dcb maxscale-common)
@ -46,6 +48,7 @@ target_link_libraries(testmaxscalepcre2 maxscale-common)
target_link_libraries(testmodulecmd maxscale-common)
target_link_libraries(testconfig maxscale-common)
target_link_libraries(trxboundaryparser_profile maxscale-common)
add_test(TestAtomic test_atomic)
add_test(TestAdminUsers test_adminusers)
add_test(TestBuffer test_buffer)
add_test(TestDCB test_dcb)

View File

@ -0,0 +1,88 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cdefs.h>
#include <stdio.h>
#include <maxscale/atomic.h>
#include <maxscale/debug.h>
#include <maxscale/thread.h>
#define NTHR 10
static int running = 0;
static int expected = 0;
void test_add(void* data)
{
int id = (size_t)data;
while (atomic_load_int32(&running))
{
atomic_add(&expected, id);
atomic_add(&expected, -id);
ss_dassert(atomic_load_int32(&expected) >= 0);
}
}
void test_load_store(void* data)
{
int id = (size_t)data;
while (atomic_load_int32(&running))
{
if (atomic_load_int32(&expected) % NTHR == id)
{
ss_dassert(atomic_add(&expected, 1) % NTHR == id + 1);
}
}
}
int run_test(void(*func)(void*))
{
THREAD threads[NTHR];
atomic_store_int32(&expected, 0);
atomic_store_int32(&running, 1);
for (int i = 0; i < NTHR; i++)
{
if (thread_start(&threads[i], func, NULL) == NULL)
{
ss_dassert(false);
}
}
thread_millisleep(2500);
atomic_store_int32(&running, 0);
for (int i = 0; i < NTHR; i++)
{
thread_wait(threads[i]);
}
return atomic_load_int32(&expected);
}
int main(int argc, char** argv)
{
int rval = 0;
run_test(test_load_store);
run_test(test_add);
return rval;
}

View File

@ -65,6 +65,14 @@ void test_simple()
ss_dassert(rv);
cout << "Waited" << endl;
sem2.post();
sem2.post();
sem2.post();
cout << "Waiting 3 times for semaphore with a count of 3." << endl;
rv = sem2.wait_n(3);
cout << "Waited" << endl;
Semaphore sem3;
time_t started;
@ -118,10 +126,7 @@ void test_threads()
cout << "Waiting for threads." << endl;
for (int i = 0; i < n_threads; ++i)
{
sem.wait();
}
sem.wait_n(n_threads);
cout << "Joining threads." << endl;

View File

@ -214,6 +214,16 @@ bool Worker::init()
MXS_ERROR("Could not allocate an epoll instance.");
}
if (this_unit.initialized)
{
// When the initialization has successfully been performed, we set the
// current_worker_id of this thread to 0. That way any connections that
// are made during service startup (after this function returns, but
// bofore the workes have been started) will be handled by the worker
// that will be running in the main thread.
this_thread.current_worker_id = 0;
}
return this_unit.initialized;
}
@ -502,6 +512,11 @@ MXS_WORKER* mxs_worker_get(int worker_id)
return Worker::get(worker_id);
}
int mxs_worker_get_current_id()
{
return Worker::get_current_id();
}
Worker* Worker::get_current()
{
Worker* pWorker = NULL;
@ -549,7 +564,7 @@ bool Worker::execute(std::auto_ptr<DisposableTask> sTask)
// private
bool Worker::execute_disposable(DisposableTask* pTask)
{
pTask->inc_count();
pTask->inc_ref();
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
@ -557,7 +572,7 @@ bool Worker::execute_disposable(DisposableTask* pTask)
if (!posted)
{
pTask->dec_count();
pTask->dec_ref();
}
return posted;
@ -585,7 +600,7 @@ size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem)
size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask)
{
DisposableTask* pTask = sTask.release();
pTask->inc_count();
pTask->inc_ref();
size_t n = 0;
@ -599,7 +614,26 @@ size_t Worker::execute_on_all(std::auto_ptr<DisposableTask> sTask)
}
}
pTask->dec_count();
pTask->dec_ref();
return n;
}
//static
size_t Worker::execute_on_all_serially(Task* pTask)
{
Semaphore sem;
size_t n = 0;
for (int i = 0; i < this_unit.n_workers; ++i)
{
Worker* pWorker = this_unit.ppWorkers[i];
if (pWorker->execute(pTask, &sem))
{
sem.wait();
++n;
}
}
return n;
}
@ -838,7 +872,7 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms
{
DisposableTask *pTask = reinterpret_cast<DisposableTask*>(msg.arg1());
pTask->execute(*this);
pTask->dec_count();
pTask->dec_ref();
}
break;

View File

@ -13,6 +13,7 @@
#include "maxscale/workertask.hh"
#include <maxscale/atomic.h>
#include <maxscale/debug.h>
namespace maxscale
{
@ -32,13 +33,15 @@ WorkerDisposableTask::WorkerDisposableTask()
{
}
void WorkerDisposableTask::inc_count()
void WorkerDisposableTask::inc_ref()
{
atomic_add(&m_count, 1);
}
void WorkerDisposableTask::dec_count()
void WorkerDisposableTask::dec_ref()
{
ss_dassert(atomic_load_int32(&m_count) > 0);
if (atomic_add(&m_count, -1) == 1)
{
delete this;

View File

@ -17,7 +17,7 @@ if (ROCKSDB_BUILT)
storage_rocksdb.cc
)
add_dependencies(storage_rocksdb RocksDB)
target_link_libraries(storage_rocksdb maxscale-common ${JANSSON_LIBRARIES} ${ROCKSDB_LIB} ${ROCKSDB_LINK_LIBS})
target_link_libraries(storage_rocksdb maxscale-common ${JANSSON_LIBRARIES} ${ROCKSDB_LIB} ${ROCKSDB_LINK_LIBS} lz4)
set_target_properties(storage_rocksdb PROPERTIES VERSION "1.0.0")
set_target_properties(storage_rocksdb PROPERTIES LINK_FLAGS -Wl,-z,defs)
install_module(storage_rocksdb core)