From 122337569ccbc89273ce997db5dcd579a7db3b3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 14 Apr 2017 05:10:49 +0300 Subject: [PATCH 01/12] Add atomic store and load operations Added abstractions for storing and loading 32-bit and 64-bit values atomically. The functions currently use the GCC __atomic builtin atomics. --- include/maxscale/atomic.h | 26 ++++++++++++++++++++++++++ server/core/atomic.cc | 30 ++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/include/maxscale/atomic.h b/include/maxscale/atomic.h index 2813ede48..2a28750e3 100644 --- a/include/maxscale/atomic.h +++ b/include/maxscale/atomic.h @@ -36,6 +36,32 @@ int atomic_add(int *variable, int value); int64_t atomic_add_int64(int64_t *variable, int64_t value); uint64_t atomic_add_uint64(uint64_t *variable, int64_t value); +/** + * Implementation of an atomic load operation for the GCC environment. + * + * Loads a value from the contents of a location pointed to by the first parameter. + * The load operation is atomic and it uses the strongest memory ordering. + * + * @param variable Pointer the the variable to load from + * @return The stored value + */ +int atomic_read(int *variable); +int64_t atomic_read_int64(int64_t *variable); +uint64_t atomic_read_uint64(uint64_t *variable); + +/** + * Implementation of an atomic store operation for the GCC environment. + * + * Stores a value to the contents of a location pointed to by the first parameter. + * The store operation is atomic and it uses the strongest memory ordering. + * + * @param variable Pointer the the variable to store to + * @param value Value to be stored + */ +void atomic_write(int *variable, int value); +void atomic_write_int64(int64_t *variable, int64_t value); +void atomic_write_uint64(uint64_t *variable, uint64_t value); + /** * @brief Impose a full memory barrier * diff --git a/server/core/atomic.cc b/server/core/atomic.cc index cca7ea4d7..6eebd4170 100644 --- a/server/core/atomic.cc +++ b/server/core/atomic.cc @@ -31,3 +31,33 @@ uint64_t atomic_add_uint64(uint64_t *variable, int64_t value) { return __sync_fetch_and_add(variable, value); } + +int atomic_read(int *variable) +{ + return __atomic_load_n(variable, __ATOMIC_SEQ_CST); +} + +int64_t atomic_read_int64(int64_t *variable) +{ + return __atomic_load_n(variable, __ATOMIC_SEQ_CST); +} + +uint64_t atomic_read_uint64(uint64_t *variable) +{ + return __atomic_load_n(variable, __ATOMIC_SEQ_CST); +} + +void atomic_write(int *variable, int value) +{ + return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); +} + +void atomic_write_int64(int64_t *variable, int64_t value) +{ + return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); +} + +void atomic_write_uint64(uint64_t *variable, uint64_t value) +{ + return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); +} From f91d415be134846925ef9ad1ff7523174984a652 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 14 Apr 2017 05:48:48 +0300 Subject: [PATCH 02/12] Add simple test for atomic operations The test runs some simple tests with the atomic operations in MaxScale. --- server/core/test/CMakeLists.txt | 3 ++ server/core/test/testatomic.c | 88 +++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 server/core/test/testatomic.c diff --git a/server/core/test/CMakeLists.txt b/server/core/test/CMakeLists.txt index d5dcd2e75..8e9629b39 100644 --- a/server/core/test/CMakeLists.txt +++ b/server/core/test/CMakeLists.txt @@ -1,3 +1,4 @@ +add_executable(test_atomic testatomic.c) add_executable(test_adminusers testadminusers.c) add_executable(test_buffer testbuffer.c) add_executable(test_dcb testdcb.c) @@ -22,6 +23,7 @@ add_executable(testmaxscalepcre2 testmaxscalepcre2.c) add_executable(testmodulecmd testmodulecmd.c) add_executable(testconfig testconfig.c) add_executable(trxboundaryparser_profile trxboundaryparser_profile.cc) +target_link_libraries(test_atomic maxscale-common) target_link_libraries(test_adminusers maxscale-common) target_link_libraries(test_buffer maxscale-common) target_link_libraries(test_dcb maxscale-common) @@ -46,6 +48,7 @@ target_link_libraries(testmaxscalepcre2 maxscale-common) target_link_libraries(testmodulecmd maxscale-common) target_link_libraries(testconfig maxscale-common) target_link_libraries(trxboundaryparser_profile maxscale-common) +add_test(TestAtomic test_atomic) add_test(TestAdminUsers test_adminusers) add_test(TestBuffer test_buffer) add_test(TestDCB test_dcb) diff --git a/server/core/test/testatomic.c b/server/core/test/testatomic.c new file mode 100644 index 000000000..331f48bf3 --- /dev/null +++ b/server/core/test/testatomic.c @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include + +#include + +#include +#include +#include + + +#define NTHR 10 + +static int running = 0; +static int expected = 0; + +void test_add(void* data) +{ + int id = (size_t)data; + + while (atomic_read(&running)) + { + atomic_add(&expected, id); + atomic_add(&expected, -id); + ss_dassert(atomic_read(&expected) >= 0); + } +} + + +void test_load_store(void* data) +{ + int id = (size_t)data; + + while (atomic_read(&running)) + { + if (atomic_read(&expected) % NTHR == id) + { + ss_dassert(atomic_add(&expected, 1) % NTHR == id + 1); + } + } +} + +int run_test(void(*func)(void*)) +{ + THREAD threads[NTHR]; + + atomic_write(&expected, 0); + atomic_write(&running, 1); + + for (int i = 0; i < NTHR; i++) + { + if (thread_start(&threads[i], func, NULL) == NULL) + { + ss_dassert(false); + } + } + + thread_millisleep(2500); + atomic_write(&running, 0); + + for (int i = 0; i < NTHR; i++) + { + thread_wait(threads[i]); + } + + return atomic_read(&expected); +} + +int main(int argc, char** argv) +{ + int rval = 0; + + run_test(test_load_store); + run_test(test_add); + + return rval; +} From 19cf8c489ee8ee3ea187ba8bb9e47841304f97ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 19 Apr 2017 16:55:14 +0300 Subject: [PATCH 03/12] Rename atomic store and load functions The atomic store and load functions are now called atomic_store_X and atomic_load_X where X is one of int32, int64 or uint64. --- include/maxscale/atomic.h | 12 ++++++------ server/core/atomic.cc | 12 ++++++------ server/core/test/testatomic.c | 16 ++++++++-------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/include/maxscale/atomic.h b/include/maxscale/atomic.h index 2a28750e3..c5d8c8cc4 100644 --- a/include/maxscale/atomic.h +++ b/include/maxscale/atomic.h @@ -45,9 +45,9 @@ uint64_t atomic_add_uint64(uint64_t *variable, int64_t value); * @param variable Pointer the the variable to load from * @return The stored value */ -int atomic_read(int *variable); -int64_t atomic_read_int64(int64_t *variable); -uint64_t atomic_read_uint64(uint64_t *variable); +int atomic_load_int32(int *variable); +int64_t atomic_load_int64(int64_t *variable); +uint64_t atomic_load_uint64(uint64_t *variable); /** * Implementation of an atomic store operation for the GCC environment. @@ -58,9 +58,9 @@ uint64_t atomic_read_uint64(uint64_t *variable); * @param variable Pointer the the variable to store to * @param value Value to be stored */ -void atomic_write(int *variable, int value); -void atomic_write_int64(int64_t *variable, int64_t value); -void atomic_write_uint64(uint64_t *variable, uint64_t value); +void atomic_store_int32(int *variable, int value); +void atomic_store_int64(int64_t *variable, int64_t value); +void atomic_store_uint64(uint64_t *variable, uint64_t value); /** * @brief Impose a full memory barrier diff --git a/server/core/atomic.cc b/server/core/atomic.cc index 6eebd4170..549431724 100644 --- a/server/core/atomic.cc +++ b/server/core/atomic.cc @@ -32,32 +32,32 @@ uint64_t atomic_add_uint64(uint64_t *variable, int64_t value) return __sync_fetch_and_add(variable, value); } -int atomic_read(int *variable) +int atomic_load_int32(int *variable) { return __atomic_load_n(variable, __ATOMIC_SEQ_CST); } -int64_t atomic_read_int64(int64_t *variable) +int64_t atomic_load_int64(int64_t *variable) { return __atomic_load_n(variable, __ATOMIC_SEQ_CST); } -uint64_t atomic_read_uint64(uint64_t *variable) +uint64_t atomic_load_uint64(uint64_t *variable) { return __atomic_load_n(variable, __ATOMIC_SEQ_CST); } -void atomic_write(int *variable, int value) +void atomic_store_int32(int *variable, int value) { return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); } -void atomic_write_int64(int64_t *variable, int64_t value) +void atomic_store_int64(int64_t *variable, int64_t value) { return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); } -void atomic_write_uint64(uint64_t *variable, uint64_t value) +void atomic_store_uint64(uint64_t *variable, uint64_t value) { return __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); } diff --git a/server/core/test/testatomic.c b/server/core/test/testatomic.c index 331f48bf3..da405567a 100644 --- a/server/core/test/testatomic.c +++ b/server/core/test/testatomic.c @@ -29,11 +29,11 @@ void test_add(void* data) { int id = (size_t)data; - while (atomic_read(&running)) + while (atomic_load_int32(&running)) { atomic_add(&expected, id); atomic_add(&expected, -id); - ss_dassert(atomic_read(&expected) >= 0); + ss_dassert(atomic_load_int32(&expected) >= 0); } } @@ -42,9 +42,9 @@ void test_load_store(void* data) { int id = (size_t)data; - while (atomic_read(&running)) + while (atomic_load_int32(&running)) { - if (atomic_read(&expected) % NTHR == id) + if (atomic_load_int32(&expected) % NTHR == id) { ss_dassert(atomic_add(&expected, 1) % NTHR == id + 1); } @@ -55,8 +55,8 @@ int run_test(void(*func)(void*)) { THREAD threads[NTHR]; - atomic_write(&expected, 0); - atomic_write(&running, 1); + atomic_store_int32(&expected, 0); + atomic_store_int32(&running, 1); for (int i = 0; i < NTHR; i++) { @@ -67,14 +67,14 @@ int run_test(void(*func)(void*)) } thread_millisleep(2500); - atomic_write(&running, 0); + atomic_store_int32(&running, 0); for (int i = 0; i < NTHR; i++) { thread_wait(threads[i]); } - return atomic_read(&expected); + return atomic_load_int32(&expected); } int main(int argc, char** argv) From 1eb409a6662047faba9d292b939905e2eee80211 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Mon, 24 Apr 2017 15:38:08 +0300 Subject: [PATCH 04/12] Add possibility to wait several times on a semaphore --- include/maxscale/semaphore.hh | 143 +++++++++++++++++++++++++++++- server/core/semaphore.cc | 11 ++- server/core/test/testsemaphore.cc | 13 ++- 3 files changed, 155 insertions(+), 12 deletions(-) diff --git a/include/maxscale/semaphore.hh b/include/maxscale/semaphore.hh index 8c6b55ff4..c24a7125a 100644 --- a/include/maxscale/semaphore.hh +++ b/include/maxscale/semaphore.hh @@ -95,7 +95,7 @@ public: /** * @brief Waits on the semaphore. * - * If the semaphore count is greater that zero, decrements the count and + * If the semaphore count is greater than zero, decrements the count and * returns immediately. Otherwise blocks the caller until someone posts * the semaphore. * @@ -120,6 +120,39 @@ public: return rc == 0; } + /** + * @brief Waits multiple times on the semaphore. + * + * If the semaphore count is greater than or equal to the specified amount, + * decrements the count and returns immediately. Otherwise blocks the caller + * until the semaphore has been posted the required number of times. + * + * @param n_wait How many times should be waited. + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return How many times the semaphore has been waited on. + * + * @attention The function can return a different number than `n_wait` only + * if `signal_approach` is `HONOUR_SIGNALS`. + */ + size_t wait_n(size_t n_wait, + signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + bool waited = true; + size_t n_waited = 0; + + while (waited && n_wait--) + { + waited = wait(signal_approach); + if (waited) + { + ++n_waited; + } + } + + return n_waited; + } + /** * @brief Waits on the semaphore. * @@ -190,6 +223,46 @@ public: return rc == 0; } + /** + * @brief Waits on the semaphore. + * + * Waits on the sempahore the specified number of times, at most until the + * specified time. + * + * @param n_wait How many times should be waited. + * @param ts The *absolute* time until which the waiting at + * most is performed. + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return How many times the semaphore has been waited on. If the + * function times out or is interrupted, then the returned + * value will be less than `n_wait`. + * + * @attention If the function returns a value less than `n_count` and + * `signal_approch` is `HONOUR_SIGNALS` then the caller must check + * the value of `errno` to find out whether the call was timed out or + * interrupted. In the former case the value will be `ETIMEDOUT` + * and in the latter `EINTR. + */ + size_t timedwait_n(size_t n_wait, + struct timespec& ts, + signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + bool waited = true; + size_t n_waited = 0; + + while (waited && n_wait--) + { + waited = timedwait(ts, signal_approach); + if (waited) + { + ++n_waited; + } + } + + return n_waited; + } + /** * @brief Waits on the semaphore. * @@ -212,7 +285,43 @@ public: */ bool timedwait(time_t seconds, long nseconds, - signal_approach_t signal_approach = IGNORE_SIGNALS) const; + signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + timespec ts; + get_current_timespec(seconds, nseconds, &ts); + return timedwait(ts, signal_approach); + } + + /** + * @brief Waits on the semaphore. + * + * Waits on the sempahore the specified number of times at most until the + * specified time. + * + * @param n_wait How many times should be waited. + * @param seconds How many seconds to wait at most. + * @param nseconds How many nanonseconds to wait at most. + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return How many times the semaphore has been waited on. If the + * function times out or is interrupted, then the returned + * value will be less than `n_wait`. + * + * @attention If the function returns a value less than `n_count` and + * `signal_approch` is `HONOUR_SIGNALS` then the caller must check + * the value of `errno` to find out whether the call was timed out or + * interrupted. In the former case the value will be `ETIMEDOUT` + * and in the latter `EINTR. + */ + size_t timedwait_n(size_t n_wait, + time_t seconds, + long nseconds, + signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + timespec ts; + get_current_timespec(seconds, nseconds, &ts); + return timedwait_n(n_wait, ts, signal_approach); + } /** * @brief Waits on the semaphore. @@ -237,6 +346,36 @@ public: return timedwait(seconds, 0, signal_approach); } + /** + * @brief Waits on the semaphore. + * + * Waits on the sempahore the specified number of times at most until the + * specified time. + * + * @param n_wait How many times should be waited. + * @param seconds How many seconds to wait at most. + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return How many times the semaphore has been waited on. If the + * function times out or is interrupted, then the returned + * value will be less than `n_wait`. + * + * @attention If the function returns a value less than `n_count` and + * `signal_approch` is `HONOUR_SIGNALS` then the caller must check + * the value of `errno` to find out whether the call was timed out or + * interrupted. In the former case the value will be `ETIMEDOUT` + * and in the latter `EINTR. + */ + bool timedwait_n(size_t n_wait, + time_t seconds, + signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + return timedwait_n(n_wait, seconds, 0, signal_approach); + } + +private: + static void get_current_timespec(time_t seconds, long nseconds, timespec* pTs); + private: mutable sem_t m_sem; }; diff --git a/server/core/semaphore.cc b/server/core/semaphore.cc index 2d8471062..82c40e1e2 100644 --- a/server/core/semaphore.cc +++ b/server/core/semaphore.cc @@ -17,13 +17,14 @@ namespace maxscale { -bool Semaphore::timedwait(time_t seconds, - long nseconds, - signal_approach_t signal_approach) const +//static +void Semaphore::get_current_timespec(time_t seconds, + long nseconds, + timespec* pTs) { ss_dassert(nseconds <= 999999999); - timespec ts; + timespec& ts = *pTs; ss_debug(int rc=) clock_gettime(CLOCK_REALTIME, &ts); ss_dassert(rc == 0); @@ -39,8 +40,6 @@ bool Semaphore::timedwait(time_t seconds, } ts.tv_nsec = nseconds_sum; - - return timedwait(ts, signal_approach); } } diff --git a/server/core/test/testsemaphore.cc b/server/core/test/testsemaphore.cc index 8154e5ef5..f5e2b6c59 100644 --- a/server/core/test/testsemaphore.cc +++ b/server/core/test/testsemaphore.cc @@ -65,6 +65,14 @@ void test_simple() ss_dassert(rv); cout << "Waited" << endl; + sem2.post(); + sem2.post(); + sem2.post(); + + cout << "Waiting 3 times for semaphore with a count of 3." << endl; + rv = sem2.wait_n(3); + cout << "Waited" << endl; + Semaphore sem3; time_t started; @@ -118,10 +126,7 @@ void test_threads() cout << "Waiting for threads." << endl; - for (int i = 0; i < n_threads; ++i) - { - sem.wait(); - } + sem.wait_n(n_threads); cout << "Joining threads." << endl; From 55011c29512fbb80e8e9f04784d2b4699912dbbc Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Tue, 25 Apr 2017 11:12:23 +0300 Subject: [PATCH 05/12] Add safety check and rename ref mgmt functions --- server/core/maxscale/workertask.hh | 4 ++-- server/core/worker.cc | 10 +++++----- server/core/workertask.cc | 7 +++++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/server/core/maxscale/workertask.hh b/server/core/maxscale/workertask.hh index e8186013d..483903470 100644 --- a/server/core/maxscale/workertask.hh +++ b/server/core/maxscale/workertask.hh @@ -55,8 +55,8 @@ protected: private: friend class Worker; - void inc_count(); - void dec_count(); + void inc_ref(); + void dec_ref(); private: int32_t m_count; diff --git a/server/core/worker.cc b/server/core/worker.cc index 0f95fffe2..a369067b8 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -549,7 +549,7 @@ bool Worker::execute(std::auto_ptr sTask) // private bool Worker::execute_disposable(DisposableTask* pTask) { - pTask->inc_count(); + pTask->inc_ref(); intptr_t arg1 = reinterpret_cast(pTask); @@ -557,7 +557,7 @@ bool Worker::execute_disposable(DisposableTask* pTask) if (!posted) { - pTask->dec_count(); + pTask->dec_ref(); } return posted; @@ -585,7 +585,7 @@ size_t Worker::execute_on_all(Task* pTask, Semaphore* pSem) size_t Worker::execute_on_all(std::auto_ptr sTask) { DisposableTask* pTask = sTask.release(); - pTask->inc_count(); + pTask->inc_ref(); size_t n = 0; @@ -599,7 +599,7 @@ size_t Worker::execute_on_all(std::auto_ptr sTask) } } - pTask->dec_count(); + pTask->dec_ref(); return n; } @@ -838,7 +838,7 @@ void Worker::handle_message(MessageQueue& queue, const MessageQueue::Message& ms { DisposableTask *pTask = reinterpret_cast(msg.arg1()); pTask->execute(*this); - pTask->dec_count(); + pTask->dec_ref(); } break; diff --git a/server/core/workertask.cc b/server/core/workertask.cc index c8d661a22..e4bd1fabe 100644 --- a/server/core/workertask.cc +++ b/server/core/workertask.cc @@ -13,6 +13,7 @@ #include "maxscale/workertask.hh" #include +#include namespace maxscale { @@ -32,13 +33,15 @@ WorkerDisposableTask::WorkerDisposableTask() { } -void WorkerDisposableTask::inc_count() +void WorkerDisposableTask::inc_ref() { atomic_add(&m_count, 1); } -void WorkerDisposableTask::dec_count() +void WorkerDisposableTask::dec_ref() { + ss_dassert(atomic_read(&m_count) > 0); + if (atomic_add(&m_count, -1) == 1) { delete this; From ea39b15bbbe7d555b77359f1ed94fe58c26ed29c Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Tue, 25 Apr 2017 11:12:23 +0300 Subject: [PATCH 06/12] Add safety check and rename ref mgmt functions --- server/core/workertask.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/core/workertask.cc b/server/core/workertask.cc index e4bd1fabe..f4ab02329 100644 --- a/server/core/workertask.cc +++ b/server/core/workertask.cc @@ -40,7 +40,7 @@ void WorkerDisposableTask::inc_ref() void WorkerDisposableTask::dec_ref() { - ss_dassert(atomic_read(&m_count) > 0); + ss_dassert(atomic_load_int32(&m_count) > 0); if (atomic_add(&m_count, -1) == 1) { From 963ff0216de46b053d299875ca2b99cd7f77162b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 24 Apr 2017 16:07:42 +0300 Subject: [PATCH 07/12] Allow serial execution of worker tasks The Worker::execute_on_all_wait is intended to be used with dcb_foreach which expects a single-threaded context for its function. --- server/core/maxscale/worker.hh | 15 +++++++++++++++ server/core/worker.cc | 19 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index 69f3f49bd..a9e0c13c1 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -320,6 +320,21 @@ public: */ static size_t execute_on_all(std::auto_ptr sTask); + /** + * Executes a task on all workers in serial mode. + * + * The task is executed on at most one worker thread at a time. + * + * @param pTask The task to be executed. + * + * @return How many workers the task was posted to. + * + * @warning This function is extremely inefficient and will be slow compared + * to the other functions. Only use this function when printing thread-specific + * data to stdout. + */ + static size_t execute_on_all_serially(Task* pTask); + /** * Post a message to a worker. * diff --git a/server/core/worker.cc b/server/core/worker.cc index a369067b8..ff7cd61df 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -603,6 +603,25 @@ size_t Worker::execute_on_all(std::auto_ptr sTask) return n; } +//static +size_t Worker::execute_on_all_serially(Task* pTask) +{ + Semaphore sem; + size_t n = 0; + + for (int i = 0; i < this_unit.n_workers; ++i) + { + Worker* pWorker = this_unit.ppWorkers[i]; + + if (pWorker->execute(pTask, &sem)) + { + sem.wait(); + ++n; + } + } + + return n; +} bool Worker::post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2) { From c3df805b222ded8c94a0c79c1f7f97486c3120ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 24 Apr 2017 16:22:19 +0300 Subject: [PATCH 08/12] Make dcb_foreach thread-safe The function was no longer thread-safe as it used the obsolete per-thread spinlocks to iterate over the DCBs. Now the function uses the newly added WorkerTask class to iterate over them. Since the new WorkerTask mechanism is far superion to dcb_foreach, the latter is now deprecated. --- include/maxscale/dcb.h | 21 ++++- server/core/dcb.cc | 185 +++++++++++++++++++++++------------------ 2 files changed, 125 insertions(+), 81 deletions(-) diff --git a/include/maxscale/dcb.h b/include/maxscale/dcb.h index dee34ef13..c39265d26 100644 --- a/include/maxscale/dcb.h +++ b/include/maxscale/dcb.h @@ -325,13 +325,32 @@ void dcb_process_idle_sessions(int thr); /** * @brief Call a function for each connected DCB * + * @deprecated You should not use this function, use dcb_foreach_parallel instead + * * @param func Function to call. The function should return @c true to continue iteration * and @c false to stop iteration earlier. The first parameter is a DCB and the second * is the value of @c data that the user provided. * @param data User provided data passed as the second parameter to @c func * @return True if all DCBs were iterated, false if the callback returned false */ -bool dcb_foreach(bool (*func)(DCB *, void *), void *data); +bool dcb_foreach(bool (*func)(DCB *dcb, void *data), void *data); + +/** + * @brief Call a function for each connected DCB + * + * @note This function can call @c func from multiple thread at one time. + * + * @param func Function to call. The function should return @c true to continue iteration + * and @c false to stop iteration earlier. The first is a DCB and + * the second is this thread's value in the @c data array that + * the user provided. + * + * @param data Array of user provided data passed as the second parameter to @c func. + * The array must have more space for pointers thann the return + * value of `config_threadcount()`. The value passed to @c func will + * be the value of the array at the index of the current thread's ID. + */ +void dcb_foreach_parallel(bool (*func)(DCB *dcb, void *data), void **data); /** * @brief Return the port number this DCB is connected to diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 437d991e8..4f5aa5fc3 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -19,88 +19,51 @@ * block is the user data that is handled by the epoll system and contains * the state data and pointers to other components that relate to the * use of a file descriptor. - * - * @verbatim - * Revision History - * - * Date Who Description - * 12/06/13 Mark Riddoch Initial implementation - * 21/06/13 Massimiliano Pinto free_dcb is used - * 25/06/13 Massimiliano Pinto Added checks to session and router_session - * 28/06/13 Mark Riddoch Changed the free mechanism to - * introduce a zombie state for the - * dcb - * 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and - * authlock for handling backend - * asynchronous protocol connection - * and a generic lock for backend - * authentication - * 16/07/2013 Massimiliano Pinto Added command type for dcb - * 23/07/2013 Mark Riddoch Tidy up logging - * 02/09/2013 Massimiliano Pinto Added session refcount - * 27/09/2013 Massimiliano Pinto dcb_read returns 0 if ioctl returns no - * error and 0 bytes to read. - * This fixes a bug with many reads from - * backend - * 07/05/2014 Mark Riddoch Addition of callback mechanism - * 20/06/2014 Mark Riddoch Addition of dcb_clone - * 29/05/2015 Markus Makela Addition of dcb_write_SSL - * 11/06/2015 Martin Brampton Persistent connnections and tidy up - * 07/07/2015 Martin Brampton Merged add to zombieslist into dcb_close, - * fixes for various error situations, - * remove dcb_set_state etc, simplifications. - * 10/07/2015 Martin Brampton Simplify, merge dcb_read and dcb_read_n - * 04/09/2015 Martin Brampton Changes to ensure DCB always has session pointer - * 28/09/2015 Martin Brampton Add counters, maxima for DCBs and zombies - * 29/05/2015 Martin Brampton Impose locking in dcb_call_foreach callbacks - * 17/10/2015 Martin Brampton Add hangup for each and bitmask display MaxAdmin - * 15/12/2015 Martin Brampton Merge most of SSL write code into non-SSL, - * enhance SSL code - * 07/02/2016 Martin Brampton Make dcb_read_SSL & dcb_create_SSL internal, - * further small SSL logic changes - * 31/05/2016 Martin Brampton Implement connection throttling - * 27/06/2016 Martin Brampton Implement list manager to manage DCB memory - * - * @endverbatim */ #include +#include #include +#include +#include #include #include #include #include -#include -#include #include +#include +#include +#include +#include +#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include -#include -#include "maxscale/session.h" #include "maxscale/modules.h" #include "maxscale/queuemanager.h" +#include "maxscale/semaphore.hh" +#include "maxscale/session.h" #include "maxscale/worker.hh" +#include "maxscale/workertask.hh" using maxscale::Worker; +using maxscale::WorkerTask; +using maxscale::Semaphore; /* A DCB with null values, used for initialization */ static DCB dcb_initialized; @@ -3066,28 +3029,90 @@ void dcb_process_idle_sessions(int thr) } } -bool dcb_foreach(bool(*func)(DCB *, void *), void *data) +/** Helper class for serial iteration over all DCBs */ +class SerialDcbTask : public WorkerTask { +public: - int nthr = config_threadcount(); - bool more = true; - - for (int i = 0; i < nthr && more; i++) + SerialDcbTask(bool(*func)(DCB *, void *), void *data): + m_func(func), + m_data(data), + m_more(1) { - spinlock_acquire(&all_dcbs_lock[i]); - - for (DCB *dcb = all_dcbs[i]; dcb && more; dcb = dcb->thread.next) - { - if (!func(dcb, data)) - { - more = false; - } - } - - spinlock_release(&all_dcbs_lock[i]); } - return more; + void execute(Worker& worker) + { + int thread_id = worker.id(); + + for (DCB *dcb = all_dcbs[thread_id]; dcb && atomic_load_int32(&m_more); dcb = dcb->thread.next) + { + if (!m_func(dcb, m_data)) + { + atomic_store_int32(&m_more, 0); + break; + } + } + } + + bool more() const + { + return m_more; + } + +private: + bool(*m_func)(DCB *dcb, void *data); + void* m_data; + int m_more; +}; + +bool dcb_foreach(bool(*func)(DCB *dcb, void *data), void *data) +{ + SerialDcbTask task(func, data); + Worker::execute_on_all_serially(&task); + return task.more(); +} + +/** Helper class for parallel iteration over all DCBs */ +class ParallelDcbTask : public WorkerTask +{ +public: + + ParallelDcbTask(bool(*func)(DCB *, void *), void **data): + m_func(func), + m_data(data) + { + } + + void execute(Worker& worker) + { + int thread_id = worker.id(); + + for (DCB *dcb = all_dcbs[thread_id]; dcb; dcb = dcb->thread.next) + { + if (!m_func(dcb, m_data[thread_id])) + { + break; + } + } + } + +private: + bool(*m_func)(DCB *dcb, void *data); + void** m_data; +}; + +void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data) +{ + Semaphore sem; + ParallelDcbTask task(func, data); + size_t n = Worker::execute_on_all(&task, &sem); + + // TODO: Use the multi-wait version of this function + for (size_t i = 0; i < n; i++) + { + sem.wait(); + } } int dcb_get_port(const DCB *dcb) From 4cdb7bc49be197c52a75ddb42c9943ea67a2003d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 25 Apr 2017 15:05:24 +0300 Subject: [PATCH 09/12] Use Semaphore multi-wait The dcb_foreach_parallel now uses the correct method of the Semaphore class. --- server/core/dcb.cc | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/server/core/dcb.cc b/server/core/dcb.cc index 4f5aa5fc3..db968f372 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -3106,13 +3106,7 @@ void dcb_foreach_parallel(bool(*func)(DCB *dcb, void *data), void **data) { Semaphore sem; ParallelDcbTask task(func, data); - size_t n = Worker::execute_on_all(&task, &sem); - - // TODO: Use the multi-wait version of this function - for (size_t i = 0; i < n; i++) - { - sem.wait(); - } + sem.wait_n(Worker::execute_on_all(&task, &sem)); } int dcb_get_port(const DCB *dcb) From 48ed7792a5e656ba707fbfad336bf1c730dc5a2d Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Wed, 26 Apr 2017 10:57:50 +0300 Subject: [PATCH 10/12] Arrange so that startup connections are handled by main Worker When the Worker mechanism has been initialized the current_worker_id of the calling thread is set to 0. That way, connections can be created after Worker::init() has been called, but before the workers have been started. Such connections will be handled by the worker that is running in the main thread. --- server/core/worker.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/server/core/worker.cc b/server/core/worker.cc index ff7cd61df..8b0275cd1 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -214,6 +214,16 @@ bool Worker::init() MXS_ERROR("Could not allocate an epoll instance."); } + if (this_unit.initialized) + { + // When the initialization has successfully been performed, we set the + // current_worker_id of this thread to 0. That way any connections that + // are made during service startup (after this function returns, but + // bofore the workes have been started) will be handled by the worker + // that will be running in the main thread. + this_thread.current_worker_id = 0; + } + return this_unit.initialized; } From c8c2822c7bf4d6704bd5bd8f96bbaf4b91a7e9cb Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Wed, 26 Apr 2017 13:15:23 +0300 Subject: [PATCH 11/12] Link storage_rocksdb with lz4 --- .../modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt b/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt index 3d76a018d..451c7a7c2 100644 --- a/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt +++ b/server/modules/filter/cache/storage/storage_rocksdb/CMakeLists.txt @@ -17,7 +17,7 @@ if (ROCKSDB_BUILT) storage_rocksdb.cc ) add_dependencies(storage_rocksdb RocksDB) - target_link_libraries(storage_rocksdb maxscale-common ${JANSSON_LIBRARIES} ${ROCKSDB_LIB} ${ROCKSDB_LINK_LIBS}) + target_link_libraries(storage_rocksdb maxscale-common ${JANSSON_LIBRARIES} ${ROCKSDB_LIB} ${ROCKSDB_LINK_LIBS} lz4) set_target_properties(storage_rocksdb PROPERTIES VERSION "1.0.0") set_target_properties(storage_rocksdb PROPERTIES LINK_FLAGS -Wl,-z,defs) install_module(storage_rocksdb core) From 728c780187128cb6192963589f3adee0b3df8e11 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Wed, 26 Apr 2017 13:17:23 +0300 Subject: [PATCH 12/12] Expose current worker id to c-files --- include/maxscale/worker.h | 9 +++++++++ server/core/worker.cc | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 06a753fb3..ef66c57fc 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -69,6 +69,15 @@ MXS_WORKER* mxs_worker_get(int worker_id); */ int mxs_worker_id(MXS_WORKER* pWorker); +/** + * Return the id of the worker. + * + * @return The id of the worker. + * + * @attention If there is no current worker, then -1 will be returned. + */ +int mxs_worker_get_current_id(); + /** * Post a message to a worker. * diff --git a/server/core/worker.cc b/server/core/worker.cc index 8b0275cd1..4b3e5ac1f 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -512,6 +512,11 @@ MXS_WORKER* mxs_worker_get(int worker_id) return Worker::get(worker_id); } +int mxs_worker_get_current_id() +{ + return Worker::get_current_id(); +} + Worker* Worker::get_current() { Worker* pWorker = NULL;