diff --git a/include/maxscale/semaphore.hh b/include/maxscale/semaphore.hh new file mode 100644 index 000000000..8c6b55ff4 --- /dev/null +++ b/include/maxscale/semaphore.hh @@ -0,0 +1,244 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include +#include +#include + +namespace maxscale +{ + +class Semaphore +{ + Semaphore(const Semaphore&); + Semaphore& operator = (const Semaphore&); + +public: + enum signal_approach_t + { + HONOUR_SIGNALS, /* Honour signals and return when interrupted. */ + IGNORE_SIGNALS /* Ignore signals and re-issue the comment when signals occur. */ + }; + + /** + * @brief Constructor + * + * @param initial_count The initial count of the semaphore. + * + * @attention If the value `initial_count` is larger than `SEM_VALUE_MAX`, + * the the value will be adjusted down to `SEM_VALUE_MAX`. + */ + Semaphore(uint32_t initial_count = 0) + { + if (initial_count > SEM_VALUE_MAX) + { + initial_count = SEM_VALUE_MAX; + } + + ss_debug(int rc =) sem_init(&m_sem, 0, initial_count); + ss_dassert(rc == 0); + } + + /** + * @brief Destructor + * + * When the semaphore is destructed, its count should be 0 and nobody + * should be waiting on it. + */ + ~Semaphore() + { +#ifdef SS_DEBUG + int count; + int rc = sem_getvalue(&m_sem, &count); + ss_dassert(rc == 0); + ss_dassert(count == 0); +#endif + ss_debug(rc =) sem_destroy(&m_sem); + ss_dassert(rc == 0); + } + + /** + * @brief Post the semaphore. + * + * Increments the semaphore. If others threads were blocked in `wait` + * one of them will subsequently return. + * + * @return `True` if the semaphore could be posed, otherwise `false`. + * If `false` is returned, then the maximum count of the sempahore + * has been reached. + */ + bool post() const + { + int rc = sem_post(&m_sem); + ss_dassert((rc == 0) || (errno == EOVERFLOW)); +#ifdef SS_DEBUG + if ((rc != 0) && (errno == EOVERFLOW)) + { + ss_info_dassert(!true, "Semaphore overflow; indicates endless loop."); + } +#endif + return rc == 0; + } + + /** + * @brief Waits on the semaphore. + * + * If the semaphore count is greater that zero, decrements the count and + * returns immediately. Otherwise blocks the caller until someone posts + * the semaphore. + * + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return `True` if the semaphore was waited for, `false` otherwise. + * + * @attention The function can return `false` only if `signal_approach` + * is `HONOUR_SIGNALS`. + */ + bool wait(signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + int rc; + do + { + rc = sem_wait(&m_sem); + } + while ((rc != 0) && ((errno == EINTR) && (signal_approach == IGNORE_SIGNALS))); + + ss_dassert((rc == 0) || ((errno == EINTR) && (signal_approach == HONOUR_SIGNALS))); + + return rc == 0; + } + + /** + * @brief Waits on the semaphore. + * + * If the semaphore count is greater that zero, decrements the count and + * returns immediately. + * + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return `True` if the semaphore was waited for, `false` otherwise. + * + * @attention If the function returns `false` and `signal_approch` is + * `HONOUR_SIGNALS` then the caller must check the value of + * `errno` to find out whether the call was timed out or + * interrupted. In the former case the value will be `EAGAIN` + * and in the latter `EINTR`. + */ + bool trywait(signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + errno = 0; + + int rc; + do + { + rc = sem_trywait(&m_sem); + } + while ((rc != 0) && ((errno == EINTR) && (signal_approach == IGNORE_SIGNALS))); + + ss_dassert((rc == 0) || + (errno == EAGAIN) || + ((errno == EINTR) && (signal_approach == HONOUR_SIGNALS))); + + return rc == 0; + } + + /** + * @brief Waits on the semaphore. + * + * Waits on the sempahore at most until the specified time. + * + * @param ts The *absolute* time until which the waiting at + * most is performed. + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return True if the waiting could be performed, false otherwise. + * + * @attention If the function returns `false` and `signal_approch` is + * `HONOUR_SIGNALS` then the caller must check the value of + * `errno` to find out whether the call was timed out or + * interrupted. In the former case the value will be `ETIMEDOUT` + * and in the latter `EINTR. + */ + bool timedwait(struct timespec& ts, + signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + errno = 0; + + int rc; + do + { + rc = sem_timedwait(&m_sem, &ts); + } + while ((rc != 0) && ((errno == EINTR) && (signal_approach == IGNORE_SIGNALS))); + + ss_dassert((rc == 0) || + (errno == ETIMEDOUT) || + ((errno == EINTR) && (signal_approach == HONOUR_SIGNALS))); + + return rc == 0; + } + + /** + * @brief Waits on the semaphore. + * + * Waits on the sempahore at most until the specified amount of time + * has passed. + * + * @param seconds How many seconds to wait at most. + * @param nseconds How many nanonseconds to wait at most. + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return True if the waiting could be performed, false otherwise. + * + * @attention `nseconds` must be less than 1000000000. + * + * @attention If the function returns `false` and `signal_approch` is + * `HONOUR_SIGNALS` then the caller must check the value of + * `errno` to find out whether the call was timed out or + * interrupted. In the former case the value will be `ETIMEDOUT` + * and in the latter `EINTR. + */ + bool timedwait(time_t seconds, + long nseconds, + signal_approach_t signal_approach = IGNORE_SIGNALS) const; + + /** + * @brief Waits on the semaphore. + * + * Waits on the sempahore at most until the specified amount of time + * has passed. + * + * @param seconds How many seconds to wait at most. + * @param signal_approach Whether signals should be ignored or honoured. + * + * @return True if the waiting could be performed, false otherwise. + * + * @attention If the function returns `false` and `signal_approch` is + * `HONOUR_SIGNALS` then the caller must check the value of + * `errno` to find out whether the call was timed out or + * interrupted. In the former case the value will be `ETIMEDOUT` + * and in the latter `EINTR. + */ + bool timedwait(time_t seconds, + signal_approach_t signal_approach = IGNORE_SIGNALS) const + { + return timedwait(seconds, 0, signal_approach); + } + +private: + mutable sem_t m_sem; +}; + +} diff --git a/server/core/CMakeLists.txt b/server/core/CMakeLists.txt index ee6f12201..9f2e14b94 100644 --- a/server/core/CMakeLists.txt +++ b/server/core/CMakeLists.txt @@ -32,6 +32,7 @@ add_library(maxscale-common SHARED resultset.cc router.cc secrets.cc + semaphore.cc server.cc service.cc session.cc diff --git a/server/core/semaphore.cc b/server/core/semaphore.cc new file mode 100644 index 000000000..2d8471062 --- /dev/null +++ b/server/core/semaphore.cc @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include +#include + +namespace maxscale +{ + +bool Semaphore::timedwait(time_t seconds, + long nseconds, + signal_approach_t signal_approach) const +{ + ss_dassert(nseconds <= 999999999); + + timespec ts; + + ss_debug(int rc=) clock_gettime(CLOCK_REALTIME, &ts); + ss_dassert(rc == 0); + + ts.tv_sec += seconds; + + uint64_t nseconds_sum = ts.tv_nsec + nseconds; + + if (nseconds_sum > 1000000000) + { + ts.tv_sec += 1; + nseconds_sum -= 1000000000; + } + + ts.tv_nsec = nseconds_sum; + + return timedwait(ts, signal_approach); +} + +} diff --git a/server/core/test/CMakeLists.txt b/server/core/test/CMakeLists.txt index 077731156..d5dcd2e75 100644 --- a/server/core/test/CMakeLists.txt +++ b/server/core/test/CMakeLists.txt @@ -10,6 +10,7 @@ add_executable(test_logthrottling testlogthrottling.cc) add_executable(test_modutil testmodutil.c) add_executable(test_poll testpoll.c) add_executable(test_queuemanager testqueuemanager.c) +add_executable(test_semaphore testsemaphore.cc) add_executable(test_server testserver.c) add_executable(test_service testservice.c) add_executable(test_spinlock testspinlock.c) @@ -33,6 +34,7 @@ target_link_libraries(test_logthrottling maxscale-common) target_link_libraries(test_modutil maxscale-common) target_link_libraries(test_poll maxscale-common) target_link_libraries(test_queuemanager maxscale-common) +target_link_libraries(test_semaphore maxscale-common) target_link_libraries(test_server maxscale-common) target_link_libraries(test_service maxscale-common) target_link_libraries(test_spinlock maxscale-common) @@ -58,6 +60,7 @@ add_test(TestModutil test_modutil) add_test(NAME TestMaxPasswd COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/testmaxpasswd.sh) add_test(TestPoll test_poll) add_test(TestQueueManager test_queuemanager) +add_test(TestSemaphore test_semaphore) add_test(TestServer test_server) add_test(TestService test_service) add_test(TestSpinlock test_spinlock) diff --git a/server/core/test/testsemaphore.cc b/server/core/test/testsemaphore.cc new file mode 100644 index 000000000..8154e5ef5 --- /dev/null +++ b/server/core/test/testsemaphore.cc @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#if !defined(SS_DEBUG) +#define SS_DEBUG +#endif +#if defined(NDEBUG) +#undef NDEBUG +#endif + +#include +#include +#include +#include +#include +#include + +using namespace maxscale; +using namespace std; + +namespace +{ + +void test_simple() +{ + bool rv; + Semaphore sem1(1); + + cout << "Waiting for semaphore with a count of 1." << endl; + rv = sem1.wait(); + ss_dassert(rv); + cout << "Waited" << endl; + + Semaphore sem2(3); + + cout << "Waiting 3 times for semaphore with a count of 3." << endl; + rv = sem2.wait(); + ss_dassert(rv); + rv = sem2.wait(); + ss_dassert(rv); + rv = sem2.wait(); + ss_dassert(rv); + cout << "Waited" << endl; + + sem2.post(); + sem2.post(); + sem2.post(); + + cout << "Waiting 3 times for semaphore with a count of 3." << endl; + rv = sem2.wait(); + ss_dassert(rv); + rv = sem2.wait(); + ss_dassert(rv); + rv = sem2.wait(); + ss_dassert(rv); + cout << "Waited" << endl; + + Semaphore sem3; + + time_t started; + time_t finished; + time_t diff; + + cout << "Waiting 3 seconds for semaphore with a count of 0..." << endl; + started = time(NULL); + rv = sem3.timedwait(4); + finished = time(NULL); + diff = finished - started; + ss_dassert(!rv); + ss_dassert((diff >= 2) && (diff <= 4)); + cout << "Waited." << endl; + + cout << "Waiting 1 second for semaphore with a count of 0..." << endl; + started = time(NULL); + rv = sem3.timedwait(0, 999999999); + finished = time(NULL); + diff = finished - started; + ss_dassert(!rv); + ss_dassert((diff >= 0) && (diff <= 2)); + cout << "Waited." << endl; +} + +void* thread_main(void* pArg) +{ + Semaphore* pSem = static_cast(pArg); + + cout << "Hello from thread" << endl; + sleep(1); + + pSem->post(); + return 0; +} + +void test_threads() +{ + const int n_threads = 10; + pthread_t threads[n_threads]; + + Semaphore sem; + + cout << "Starting threads." << endl; + + for (int i = 0; i < n_threads; ++i) + { + int rc = pthread_create(&threads[i], NULL, thread_main, &sem); + ss_dassert(rc == 0); + } + + cout << "Waiting for threads." << endl; + + for (int i = 0; i < n_threads; ++i) + { + sem.wait(); + } + + cout << "Joining threads." << endl; + + for (int i = 0; i < n_threads; ++i) + { + pthread_join(threads[i], NULL); + } + + cout << "Joined." << endl; +} + +void* send_signal(void*) +{ + cout << "Sleeping 2 seconds." << endl; + sleep(2); + + cout << "Sending signal" << endl; + kill(getpid(), SIGTERM); + cout << "Sent signal" << endl; + + return NULL; +} + +void sighandler(int s) +{ +} + +void test_signal() +{ + Semaphore sem; + + signal(SIGTERM, sighandler); + + pthread_t thread; + int rc; + + rc = pthread_create(&thread, NULL, send_signal, NULL); + ss_dassert(rc == 0); + + bool waited; + + cout << "Waiting" << endl; + waited = sem.timedwait(4, Semaphore::HONOUR_SIGNALS); + cout << "Waited" << endl; + + // Should return false and errno should be EINTR. + ss_dassert(!waited && (errno == EINTR)); + + pthread_join(thread, NULL); + + rc = pthread_create(&thread, NULL, send_signal, NULL); + ss_dassert(rc == 0); + + cout << "Waiting" << endl; + waited = sem.timedwait(4, Semaphore::IGNORE_SIGNALS); + cout << "Waited" << endl; + + // Should return false and errno should be ETIMEDOUT. + ss_dassert(!waited && (errno == ETIMEDOUT)); + + pthread_join(thread, NULL); +} + +} + +int main(int argc, char* argv[]) +{ + test_simple(); + test_threads(); + test_signal(); + + return EXIT_SUCCESS; +}