Files
doris/be/test/util/threadpool_test.cpp
lichaoyong f20eb12457 [util] Import ThreadPool and Thread from KUDU (#2915)
Thread pool design point:
  All tasks submitted directly to the thread pool enter a FIFO queue and are
dispatched to a worker thread when one becomes free. Tasks may also be
submitted via ThreadPoolTokens. The token wait() and shutdown() functions
can then be used to block on logical groups of tasks.
  A token operates in one of two ExecutionModes, determined at token
construction time:
  1. SERIAL: submitted tasks are run one at a time.
  2. CONCURRENT: submitted tasks may be run in parallel.
     This isn't unlike submitted without a token, but the logical grouping that tokens
     impart can be useful when a pool is shared by many contexts (e.g. to
     safely shut down one context, to derive context-specific metrics, etc.).
Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
processed in a round-robin fashion, one task at a time. This prevents them
from starving one another. However, tokenless (and CONCURRENT token-based)
tasks can starve SERIAL token-based tasks.

Thread design point:
  1. It is a thin wrapper around pthread that can register itself with the singleton ThreadMgr
(a private class implemented in thread.cpp entirely, which tracks all live threads so
that they may be monitored via the debug webpages). This class has a limited subset of
boost::thread's API. Construction is almost the same, but clients must supply a
category and a name for each thread so that they can be identified in the debug web
UI. Otherwise, join() is the only supported method from boost::thread.
  2. Each Thread object knows its operating system thread ID (TID), which can be used to
attach debuggers to specific threads, to retrieve resource-usage statistics from the
operating system, and to assign threads to resource control groups.
  3. Threads are shared objects, but in a degenerate way. They may only have
up to two referents: the caller that created the thread (parent), and
the thread itself (child). Moreover, the only two methods to mutate state
(join() and the destructor) are constrained: the child may not join() on
itself, and the destructor is only run when there's one referent left.
These constraints allow us to access thread internals without any locks.
2020-02-17 11:22:09 +08:00

797 lines
27 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <unistd.h>
#include <atomic>
#include <cstdint>
#include <functional>
#include <iterator>
#include <limits>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <gtest/gtest.h>
#include <boost/bind.hpp>
#include "common/logging.h"
#include "common/status.h"
#include "gutil/atomicops.h"
#include "gutil/port.h"
#include "gutil/ref_counted.h"
#include "gutil/strings/substitute.h"
#include "gutil/sysinfo.h"
#include "gutil/walltime.h"
#include "util/barrier.h"
#include "util/countdown_latch.h"
#include "util/spinlock.h"
#include "util/metrics.h"
#include "util/monotime.h"
#include "util/random.h"
#include "util/scoped_cleanup.h"
#include "util/threadpool.h"
using std::atomic;
using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
DECLARE_int32(thread_inject_start_latency_ms);
namespace doris {
static const char* kDefaultPoolName = "test";
class ThreadPoolTest : public ::testing::Test {
public:
virtual void SetUp() override {
ASSERT_TRUE(ThreadPoolBuilder(kDefaultPoolName).build(&_pool).ok());
}
Status rebuild_pool_with_builder(const ThreadPoolBuilder& builder) {
return builder.build(&_pool);
}
Status rebuild_pool_with_min_max(int min_threads, int max_threads) {
return ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_pool);
}
protected:
unique_ptr<ThreadPool> _pool;
};
TEST_F(ThreadPoolTest, TestNoTaskOpenClose) {
ASSERT_TRUE(rebuild_pool_with_min_max(4, 4).ok());
_pool->shutdown();
}
static void simple_task_method(int n, std::atomic<int32_t>* counter) {
while (n--) {
(*counter)++;
boost::detail::yield(n);
}
}
class SimpleTask : public Runnable {
public:
SimpleTask(int n, std::atomic<int32_t>* counter)
: _n(n), _counter(counter) {}
void run() override {
simple_task_method(_n, _counter);
}
private:
int _n;
std::atomic<int32_t>* _counter;
};
TEST_F(ThreadPoolTest, TestSimpleTasks) {
ASSERT_TRUE(rebuild_pool_with_min_max(4, 4).ok());
std::atomic<int32_t> counter(0);
std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 10, &counter)).ok());
ASSERT_TRUE(_pool->submit(task).ok());
ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 20, &counter)).ok());
ASSERT_TRUE(_pool->submit(task).ok());
_pool->wait();
ASSERT_EQ(10 + 15 + 20 + 15, counter.load());
_pool->shutdown();
}
class SlowTask : public Runnable {
public:
explicit SlowTask(CountDownLatch* latch)
: _latch(latch) {}
void run() override {
_latch->wait();
}
static shared_ptr<Runnable> new_slow_task(CountDownLatch* latch) {
return std::make_shared<SlowTask>(latch);
}
private:
CountDownLatch* _latch;
};
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(0)
.set_max_threads(3)
.set_idle_timeout(MonoDelta::FromMilliseconds(1))).ok());
// There are no threads to start with.
ASSERT_TRUE(_pool->num_threads() == 0);
// We get up to 3 threads when submitting work.
CountDownLatch latch(1);
SCOPED_CLEANUP({
latch.count_down();
});
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(2, _pool->num_threads());
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(3, _pool->num_threads());
// The 4th piece of work gets queued.
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(3, _pool->num_threads());
// Finish all work
latch.count_down();
_pool->wait();
ASSERT_EQ(0, _pool->_active_threads);
_pool->shutdown();
ASSERT_EQ(0, _pool->num_threads());
}
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
// By default a threadpool's max_threads is set to the number of CPUs, so
// this test submits more tasks than that to ensure that the number of CPUs
// isn't some kind of upper bound.
const int kNumCPUs = base::NumCPUs();
// Build a threadpool with no limit on the maximum number of threads.
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_max_threads(std::numeric_limits<int>::max())).ok());
CountDownLatch latch(1);
auto cleanup_latch = MakeScopedCleanup([&]() {
latch.count_down();
});
// submit tokenless tasks. Each should create a new thread.
for (int i = 0; i < kNumCPUs * 2; i++) {
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
}
ASSERT_EQ((kNumCPUs * 2), _pool->num_threads());
// submit tasks on two tokens. Only two threads should be created.
unique_ptr<ThreadPoolToken> t1 = _pool->new_token(ThreadPool::ExecutionMode::SERIAL);
unique_ptr<ThreadPoolToken> t2 = _pool->new_token(ThreadPool::ExecutionMode::SERIAL);
for (int i = 0; i < kNumCPUs * 2; i++) {
ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok());
}
ASSERT_EQ((kNumCPUs * 2) + 2, _pool->num_threads());
// submit more tokenless tasks. Each should create a new thread.
for (int i = 0; i < kNumCPUs; i++) {
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
}
ASSERT_EQ((kNumCPUs * 3) + 2, _pool->num_threads());
latch.count_down();
_pool->wait();
_pool->shutdown();
}
// Regression test for a bug where a task is submitted exactly
// as a thread is about to exit. Previously this could hang forever.
TEST_F(ThreadPoolTest, TestRace) {
alarm(60);
auto cleanup = MakeScopedCleanup([]() {
alarm(0); // Disable alarm on test exit.
});
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(0)
.set_max_threads(1)
.set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok());
for (int i = 0; i < 500; i++) {
CountDownLatch l(1);
ASSERT_TRUE(_pool->submit_func(boost::bind(&CountDownLatch::count_down, &l)).ok());
l.wait();
// Sleeping a different amount in each iteration makes it more likely to hit
// the bug.
SleepFor(MonoDelta::FromMicroseconds(i));
}
}
TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(1)
.set_max_threads(4)
.set_idle_timeout(MonoDelta::FromMilliseconds(1))).ok());
// There is 1 thread to start with.
ASSERT_EQ(1, _pool->num_threads());
// We get up to 4 threads when submitting work.
CountDownLatch latch(1);
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(1, _pool->num_threads());
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(2, _pool->num_threads());
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(3, _pool->num_threads());
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(4, _pool->num_threads());
// The 5th piece of work gets queued.
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_EQ(4, _pool->num_threads());
// Finish all work
latch.count_down();
_pool->wait();
ASSERT_EQ(0, _pool->_active_threads);
_pool->shutdown();
ASSERT_EQ(0, _pool->num_threads());
}
TEST_F(ThreadPoolTest, TestMaxQueueSize) {
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(1)
.set_max_threads(1)
.set_max_queue_size(1)).ok());
CountDownLatch latch(1);
// We will be able to submit two tasks: one for max_threads == 1 and one for
// max_queue_size == 1.
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
Status s = _pool->submit(SlowTask::new_slow_task(&latch));
CHECK(s.is_service_unavailable()) << "Expected failure due to queue blowout:"
<< s.to_string();
latch.count_down();
_pool->wait();
_pool->shutdown();
}
// Test that when we specify a zero-sized queue, the maximum number of threads
// running is used for enforcement.
TEST_F(ThreadPoolTest, TestZeroQueueSize) {
const int kMaxThreads = 4;
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_max_queue_size(0)
.set_max_threads(kMaxThreads)).ok());
CountDownLatch latch(1);
for (int i = 0; i < kMaxThreads; i++) {
ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok());
}
Status s = _pool->submit(SlowTask::new_slow_task(&latch));
ASSERT_TRUE(s.is_service_unavailable()) << s.to_string();
latch.count_down();
_pool->wait();
_pool->shutdown();
}
/*
// Test that a thread pool will crash if asked to run its own blocking
// functions in a pool thread.
//
// In a multi-threaded application, TSAN is unsafe to use following a fork().
// After a fork(), TSAN will:
// 1. Disable verification, expecting an exec() soon anyway, and
// 2. Die on future thread creation.
// For some reason, this test triggers behavior #2. We could disable it with
// the TSAN option die_after_fork=0, but this can (supposedly) lead to
// deadlocks, so we'll disable the entire test instead.
#ifndef THREAD_SANITIZER
TEST_F(ThreadPoolTest, TestDeadlocks) {
const char* death_msg = "called pool function that would result in deadlock";
ASSERT_DEATH({
ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok());
ASSERT_TRUE(_pool->submit_func(
Bind(&ThreadPool::shutdown, Unretained(_pool.get()))).ok());
_pool->wait();
}, death_msg);
ASSERT_DEATH({
ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok());
ASSERT_TRUE(_pool->submit_func(
Bind(&ThreadPool::ok(), Unretained(_pool.get()))).ok());
_pool->wait();
}, death_msg);
}
#endif
*/
class SlowDestructorRunnable : public Runnable {
public:
void run() override {}
virtual ~SlowDestructorRunnable() {
SleepFor(MonoDelta::FromMilliseconds(100));
}
};
// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks
// in the queue.
TEST_F(ThreadPoolTest, TestSlowDestructor) {
ASSERT_TRUE(rebuild_pool_with_min_max(1, 20).ok());
MonoTime start = MonoTime::Now();
for (int i = 0; i < 100; i++) {
shared_ptr<Runnable> task(new SlowDestructorRunnable());
ASSERT_TRUE(_pool->submit(std::move(task)).ok());
}
_pool->wait();
ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
}
// For test cases that should run with both kinds of tokens.
class ThreadPoolTestTokenTypes : public ThreadPoolTest,
public testing::WithParamInterface<ThreadPool::ExecutionMode> {};
INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes,
::testing::Values(ThreadPool::ExecutionMode::SERIAL,
ThreadPool::ExecutionMode::CONCURRENT));
TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) {
unique_ptr<ThreadPoolToken> t = _pool->new_token(GetParam());
int i = 0;
Status status = t->submit_func([&]() {
SleepFor(MonoDelta::FromMilliseconds(1));
i++;
});
ASSERT_TRUE(status.ok());
t->wait();
ASSERT_EQ(1, i);
}
TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) {
unique_ptr<ThreadPoolToken> t = _pool->new_token(ThreadPool::ExecutionMode::SERIAL);
int32_t seed = static_cast<int32_t>(GetCurrentTimeMicros());
srand(seed);
Random r(seed);
string result;
for (char c = 'a'; c < 'f'; c++) {
// Sleep a little first so that there's a higher chance of out-of-order
// appends if the submissions did execute in parallel.
int sleep_ms = r.Next() % 5;
Status status= t->submit_func([&result, c, sleep_ms]() {
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
result += c;
});
ASSERT_TRUE(status.ok());
}
t->wait();
ASSERT_EQ("abcde", result);
}
TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) {
const int kNumTokens = 5;
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_max_threads(kNumTokens)).ok());
vector<unique_ptr<ThreadPoolToken>> tokens;
// A violation to the tested invariant would yield a deadlock, so let's set
// up an alarm to bail us out.
alarm(60);
SCOPED_CLEANUP({
alarm(0); // Disable alarm on test exit.
});
std::shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1);
for (int i = 0; i < kNumTokens; i++) {
tokens.emplace_back(_pool->new_token(GetParam()));
ASSERT_TRUE(tokens.back()->submit_func([b]() {
b->wait();
}).ok());
}
// This will deadlock if the above tasks weren't all running concurrently.
b->wait();
}
TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) {
const int kNumSubmissions = 5;
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_max_threads(kNumSubmissions)).ok());
// A violation to the tested invariant would yield a deadlock, so let's set
// up an alarm to bail us out.
alarm(60);
SCOPED_CLEANUP({
alarm(0); // Disable alarm on test exit.
});
shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1);
unique_ptr<ThreadPoolToken> t = _pool->new_token(ThreadPool::ExecutionMode::CONCURRENT);
for (int i = 0; i < kNumSubmissions; i++) {
ASSERT_TRUE(t->submit_func([b]() {
b->wait();
}).ok());
}
// This will deadlock if the above tasks weren't all running concurrently.
b->wait();
}
TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_max_threads(4)).ok());
unique_ptr<ThreadPoolToken> t1(_pool->new_token(GetParam()));
unique_ptr<ThreadPoolToken> t2(_pool->new_token(GetParam()));
CountDownLatch l1(1);
CountDownLatch l2(1);
// A violation to the tested invariant would yield a deadlock, so let's set
// up an alarm to bail us out.
alarm(60);
SCOPED_CLEANUP({
alarm(0); // Disable alarm on test exit.
});
for (int i = 0; i < 3; i++) {
ASSERT_TRUE(t1->submit_func([&]() {
l1.wait();
}).ok());
}
for (int i = 0; i < 3; i++) {
ASSERT_TRUE(t2->submit_func([&]() {
l2.wait();
}).ok());
}
// Unblock all of t1's tasks, but not t2's tasks.
l1.count_down();
// If this also waited for t2's tasks, it would deadlock.
t1->shutdown();
// We can no longer submit to t1 but we can still submit to t2.
ASSERT_TRUE(t1->submit_func([](){}).is_service_unavailable());
ASSERT_TRUE(t2->submit_func([](){}).ok());
// Unblock t2's tasks.
l2.count_down();
t2->shutdown();
}
TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
const int kNumTokens = 3;
const int kNumSubmissions = 20;
int32_t seed = static_cast<int32_t>(GetCurrentTimeMicros());
srand(seed);
Random r(seed);
vector<unique_ptr<ThreadPoolToken>> tokens;
for (int i = 0; i < kNumTokens; i++) {
tokens.emplace_back(_pool->new_token(GetParam()));
}
atomic<int32_t> v(0);
for (int i = 0; i < kNumSubmissions; i++) {
// Sleep a little first to raise the likelihood of the test thread
// reaching wait() before the submissions finish.
int sleep_ms = r.Next() % 5;
auto task = [&v, sleep_ms]() {
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
v++;
};
// Half of the submissions will be token-less, and half will use a token.
if (i % 2 == 0) {
ASSERT_TRUE(_pool->submit_func(task).ok());
} else {
int token_idx = r.Next() % tokens.size();
ASSERT_TRUE(tokens[token_idx]->submit_func(task).ok());
}
}
_pool->wait();
ASSERT_EQ(kNumSubmissions, v);
}
TEST_F(ThreadPoolTest, TestFuzz) {
const int kNumOperations = 1000;
int32_t seed = static_cast<int32_t>(GetCurrentTimeMicros());
srand(seed);
Random r(seed);
vector<unique_ptr<ThreadPoolToken>> tokens;
for (int i = 0; i < kNumOperations; i++) {
// Operation distribution:
//
// - submit without a token: 40%
// - submit with a randomly selected token: 35%
// - Allocate a new token: 10%
// - Wait on a randomly selected token: 7%
// - shutdown a randomly selected token: 4%
// - Deallocate a randomly selected token: 2%
// - Wait for all submissions: 2%
int op = r.Next() % 100;
if (op < 40) {
// submit without a token.
int sleep_ms = r.Next() % 5;
ASSERT_TRUE(_pool->submit_func([sleep_ms]() {
// Sleep a little first to increase task overlap.
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
}).ok());
} else if (op < 75) {
// submit with a randomly selected token.
if (tokens.empty()) {
continue;
}
int sleep_ms = r.Next() % 5;
int token_idx = r.Next() % tokens.size();
Status s = tokens[token_idx]->submit_func([sleep_ms]() {
// Sleep a little first to increase task overlap.
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
});
ASSERT_TRUE(s.ok() || s.is_service_unavailable());
} else if (op < 85) {
// Allocate a token with a randomly selected policy.
ThreadPool::ExecutionMode mode = r.Next() % 2 ?
ThreadPool::ExecutionMode::SERIAL :
ThreadPool::ExecutionMode::CONCURRENT;
tokens.emplace_back(_pool->new_token(mode));
} else if (op < 92) {
// Wait on a randomly selected token.
if (tokens.empty()) {
continue;
}
int token_idx = r.Next() % tokens.size();
tokens[token_idx]->wait();
} else if (op < 96) {
// shutdown a randomly selected token.
if (tokens.empty()) {
continue;
}
int token_idx = r.Next() % tokens.size();
tokens[token_idx]->shutdown();
} else if (op < 98) {
// Deallocate a randomly selected token.
if (tokens.empty()) {
continue;
}
auto it = tokens.begin();
int token_idx = r.Next() % tokens.size();
std::advance(it, token_idx);
tokens.erase(it);
} else {
// Wait on everything.
ASSERT_LT(op, 100);
ASSERT_GE(op, 98);
_pool->wait();
}
}
// Some test runs will shut down the pool before the tokens, and some won't.
// Either way should be safe.
if (r.Next() % 2 == 0) {
_pool->shutdown();
}
}
TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_min_threads(1)
.set_max_threads(1)
.set_max_queue_size(1)).ok());
CountDownLatch latch(1);
unique_ptr<ThreadPoolToken> t = _pool->new_token(GetParam());
SCOPED_CLEANUP({
latch.count_down();
});
// We will be able to submit two tasks: one for max_threads == 1 and one for
// max_queue_size == 1.
ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok());
ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok());
Status s = t->submit(SlowTask::new_slow_task(&latch));
ASSERT_TRUE(s.is_service_unavailable());
}
TEST_F(ThreadPoolTest, TestTokenConcurrency) {
const int kNumTokens = 20;
const int kTestRuntimeSecs = 1;
const int kCycleThreads = 2;
const int kShutdownThreads = 2;
const int kWaitThreads = 2;
const int kSubmitThreads = 8;
vector<shared_ptr<ThreadPoolToken>> tokens;
int32_t seed = static_cast<int32_t>(GetCurrentTimeMicros());
srand(seed);
Random rng(seed);
// Protects 'tokens' and 'rng'.
SpinLock lock;
// Fetch a token from 'tokens' at random.
auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> {
std::lock_guard<SpinLock> l(lock);
int idx = rng.Uniform(kNumTokens);
return tokens[idx];
};
// Preallocate all of the tokens.
for (int i = 0; i < kNumTokens; i++) {
ThreadPool::ExecutionMode mode;
{
std::lock_guard<SpinLock> l(lock);
mode = rng.Next() % 2 ?
ThreadPool::ExecutionMode::SERIAL :
ThreadPool::ExecutionMode::CONCURRENT;
}
tokens.emplace_back(_pool->new_token(mode).release());
}
atomic<int64_t> total_num_tokens_cycled(0);
atomic<int64_t> total_num_tokens_shutdown(0);
atomic<int64_t> total_num_tokens_waited(0);
atomic<int64_t> total_num_tokens_submitted(0);
CountDownLatch latch(1);
vector<thread> threads;
for (int i = 0; i < kCycleThreads; i++) {
// Pick a token at random and replace it.
//
// The replaced token is only destroyed when the last ref is dropped,
// possibly by another thread.
threads.emplace_back([&]() {
int num_tokens_cycled = 0;
while (latch.count()) {
{
std::lock_guard<SpinLock> l(lock);
int idx = rng.Uniform(kNumTokens);
ThreadPool::ExecutionMode mode = rng.Next() % 2 ?
ThreadPool::ExecutionMode::SERIAL :
ThreadPool::ExecutionMode::CONCURRENT;
tokens[idx] = shared_ptr<ThreadPoolToken>(_pool->new_token(mode).release());
}
num_tokens_cycled++;
// Sleep a bit, otherwise this thread outpaces the other threads and
// nothing interesting happens to most tokens.
SleepFor(MonoDelta::FromMicroseconds(10));
}
total_num_tokens_cycled += num_tokens_cycled;
});
}
for (int i = 0; i < kShutdownThreads; i++) {
// Pick a token at random and shut it down. Submitting a task to a shut
// down token will return a ServiceUnavailable error.
threads.emplace_back([&]() {
int num_tokens_shutdown = 0;
while (latch.count()) {
GetRandomToken()->shutdown();
num_tokens_shutdown++;
}
total_num_tokens_shutdown += num_tokens_shutdown;
});
}
for (int i = 0; i < kWaitThreads; i++) {
// Pick a token at random and wait for any outstanding tasks.
threads.emplace_back([&]() {
int num_tokens_waited = 0;
while (latch.count()) {
GetRandomToken()->wait();
num_tokens_waited++;
}
total_num_tokens_waited += num_tokens_waited;
});
}
for (int i = 0; i < kSubmitThreads; i++) {
// Pick a token at random and submit a task to it.
threads.emplace_back([&]() {
int num_tokens_submitted = 0;
int32_t seed = static_cast<int32_t>(GetCurrentTimeMicros());
srand(seed);
Random rng(seed);
while (latch.count()) {
int sleep_ms = rng.Next() % 5;
Status s = GetRandomToken()->submit_func([sleep_ms]() {
// Sleep a little first so that tasks are running during other events.
SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
});
CHECK(s.ok() || s.is_service_unavailable());
num_tokens_submitted++;
}
total_num_tokens_submitted += num_tokens_submitted;
});
}
SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs));
latch.count_down();
for (auto& t : threads) {
t.join();
}
LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1",
kCycleThreads, total_num_tokens_cycled.load());
LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1",
kShutdownThreads, total_num_tokens_shutdown.load());
LOG(INFO) << Substitute("Tokens waited ($0 threads): $1",
kWaitThreads, total_num_tokens_waited.load());
LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1",
kSubmitThreads, total_num_tokens_submitted.load());
}
/*
TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
const int kNumThreads = 10;
// Test with a pool that allows for kNumThreads concurrent threads.
ASSERT_OK(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
.set_max_threads(kNumThreads)).ok());
// Submit kNumThreads slow tasks and unblock them, in order to produce
// kNumThreads worker threads.
CountDownLatch latch(1);
SCOPED_CLEANUP({
latch.CountDown();
});
for (int i = 0; i < kNumThreads; i++) {
ASSERT_OK(pool_->submit(SlowTask::new_slow_task(&latch)).ok());
}
ASSERT_EQ(kNumThreads, _pool->num_threads());
latch.count_down();
pool_->wait();
// The kNumThreads threads are idle and waiting for the idle timeout.
// Submit a slow trickle of lightning fast tasks.
//
// If the threads are woken up in FIFO order, this trickle is enough to
// prevent all of them from idling and the AssertEventually will time out.
//
// If LIFO order is used, the same thread will be reused for each task and
// the other threads will eventually time out.
AssertEventually([&]() {
ASSERT_OK(_pool->submit_func([](){}).ok());
SleepFor(MonoDelta::FromMilliseconds(10));
ASSERT_EQ(1, _pool->num_threads());
}, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
NO_PENDING_FATALS();
}
*/
} // namespace doris
int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}