Implement TaskQueueBase interface by SingleThreadedTaskQueueForTesting

that allows to use SingleThreadedTaskQueueForTesting as regular TaskQueue.
which allows components that currently depend on SingleThreadedTaskQueueForTesting
to depend on TaskQueueBase interface instead.
Those updates can be done one-by-one and in the end would allow to stop
using SingleThreadedTaskQueueForTesting in favor of other TaskQueue implementations.

Bug: webrtc:10933
Change-Id: I3e642c88c968012588b9d9c09918340f37bbedbd
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/154352
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Elad Alon <eladalon@webrtc.org>
Reviewed-by: Yves Gerey <yvesg@google.com>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29307}
This commit is contained in:
Danil Chapovalov
2019-09-25 17:21:52 +02:00
committed by Commit Bot
parent ad10222289
commit 71037a8e99
4 changed files with 78 additions and 38 deletions

View File

@ -381,6 +381,7 @@ if (rtc_include_tests) {
"../api:create_simulcast_test_fixture_api", "../api:create_simulcast_test_fixture_api",
"../api:scoped_refptr", "../api:scoped_refptr",
"../api:simulcast_test_fixture_api", "../api:simulcast_test_fixture_api",
"../api/task_queue:task_queue_test",
"../api/test/video:function_video_factory", "../api/test/video:function_video_factory",
"../api/video:builtin_video_bitrate_allocator_factory", "../api/video:builtin_video_bitrate_allocator_factory",
"../api/video:video_frame", "../api/video:video_frame",
@ -622,9 +623,11 @@ rtc_source_set("single_threaded_task_queue") {
"single_threaded_task_queue.h", "single_threaded_task_queue.h",
] ]
deps = [ deps = [
"../api/task_queue",
"../rtc_base:checks", "../rtc_base:checks",
"../rtc_base:deprecation", "../rtc_base:deprecation",
"../rtc_base:rtc_base_approved", "../rtc_base:rtc_base_approved",
"../rtc_base/task_utils:to_queued_task",
] ]
} }

View File

@ -20,15 +20,12 @@
namespace webrtc { namespace webrtc {
namespace test { namespace test {
DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::QueuedTask( DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::StoredTask(
DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id, DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId task_id,
int64_t earliest_execution_time, std::unique_ptr<QueuedTask> task)
DEPRECATED_SingleThreadedTaskQueueForTesting::Task task) : task_id(task_id), task(std::move(task)) {}
: task_id(task_id),
earliest_execution_time(earliest_execution_time),
task(task) {}
DEPRECATED_SingleThreadedTaskQueueForTesting::QueuedTask::~QueuedTask() = DEPRECATED_SingleThreadedTaskQueueForTesting::StoredTask::~StoredTask() =
default; default;
DEPRECATED_SingleThreadedTaskQueueForTesting:: DEPRECATED_SingleThreadedTaskQueueForTesting::
@ -43,13 +40,8 @@ DEPRECATED_SingleThreadedTaskQueueForTesting::
} }
DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId
DEPRECATED_SingleThreadedTaskQueueForTesting::PostTask(Task task) { DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayed(
return PostDelayedTask(task, 0); std::unique_ptr<QueuedTask> task,
}
DEPRECATED_SingleThreadedTaskQueueForTesting::TaskId
DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayedTask(
Task task,
int64_t delay_ms) { int64_t delay_ms) {
int64_t earliest_exec_time = rtc::TimeAfter(delay_ms); int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
@ -60,13 +52,11 @@ DEPRECATED_SingleThreadedTaskQueueForTesting::PostDelayedTask(
TaskId id = next_task_id_++; TaskId id = next_task_id_++;
// Insert after any other tasks with an earlier-or-equal target time. // Insert after any other tasks with an earlier-or-equal target time.
auto it = tasks_.begin(); // Note: multimap has promise "The order of the key-value pairs whose keys
for (; it != tasks_.end(); it++) { // compare equivalent is the order of insertion and does not change."
if (earliest_exec_time < (*it)->earliest_execution_time) { tasks_.emplace(std::piecewise_construct,
break; std::forward_as_tuple(earliest_exec_time),
} std::forward_as_tuple(id, std::move(task)));
}
tasks_.insert(it, std::make_unique<QueuedTask>(id, earliest_exec_time, task));
// This class is optimized for simplicty, not for performance. This will wake // This class is optimized for simplicty, not for performance. This will wake
// the thread up even if the next task in the queue is only scheduled for // the thread up even if the next task in the queue is only scheduled for
@ -93,7 +83,7 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::SendTask(Task task) {
bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) { bool DEPRECATED_SingleThreadedTaskQueueForTesting::CancelTask(TaskId task_id) {
rtc::CritScope lock(&cs_); rtc::CritScope lock(&cs_);
for (auto it = tasks_.begin(); it != tasks_.end(); it++) { for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
if ((*it)->task_id == task_id) { if (it->second.task_id == task_id) {
tasks_.erase(it); tasks_.erase(it);
return true; return true;
} }
@ -136,6 +126,7 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::Run(void* obj) {
} }
void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() { void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() {
CurrentTaskQueueSetter set_current(this);
while (true) { while (true) {
std::unique_ptr<QueuedTask> queued_task; std::unique_ptr<QueuedTask> queued_task;
@ -151,11 +142,13 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() {
return; return;
} }
if (!tasks_.empty()) { if (!tasks_.empty()) {
int64_t remaining_delay_ms = rtc::TimeDiff( auto next_delayed_task = tasks_.begin();
tasks_.front()->earliest_execution_time, rtc::TimeMillis()); int64_t earliest_exec_time = next_delayed_task->first;
int64_t remaining_delay_ms =
rtc::TimeDiff(earliest_exec_time, rtc::TimeMillis());
if (remaining_delay_ms <= 0) { if (remaining_delay_ms <= 0) {
queued_task = std::move(tasks_.front()); queued_task = std::move(next_delayed_task->second.task);
tasks_.pop_front(); tasks_.erase(next_delayed_task);
} else { } else {
wait_time = rtc::saturated_cast<int>(remaining_delay_ms); wait_time = rtc::saturated_cast<int>(remaining_delay_ms);
} }
@ -163,12 +156,19 @@ void DEPRECATED_SingleThreadedTaskQueueForTesting::RunLoop() {
} }
if (queued_task) { if (queued_task) {
queued_task->task(); if (!queued_task->Run()) {
queued_task.release();
}
} else { } else {
wake_up_.Wait(wait_time); wake_up_.Wait(wait_time);
} }
} }
} }
void DEPRECATED_SingleThreadedTaskQueueForTesting::Delete() {
Stop();
delete this;
}
} // namespace test } // namespace test
} // namespace webrtc } // namespace webrtc

View File

@ -11,13 +11,15 @@
#define TEST_SINGLE_THREADED_TASK_QUEUE_H_ #define TEST_SINGLE_THREADED_TASK_QUEUE_H_
#include <functional> #include <functional>
#include <list> #include <map>
#include <memory> #include <memory>
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/critical_section.h" #include "rtc_base/critical_section.h"
#include "rtc_base/deprecation.h" #include "rtc_base/deprecation.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread_checker.h" #include "rtc_base/thread_checker.h"
namespace webrtc { namespace webrtc {
@ -33,25 +35,29 @@ namespace test {
// resemble that of real WebRTC, thereby allowing us to replace some critical // resemble that of real WebRTC, thereby allowing us to replace some critical
// sections by thread-checkers. // sections by thread-checkers.
// This task is NOT tuned for performance, but rather for simplicity. // This task is NOT tuned for performance, but rather for simplicity.
class DEPRECATED_SingleThreadedTaskQueueForTesting { class DEPRECATED_SingleThreadedTaskQueueForTesting : public TaskQueueBase {
public: public:
using Task = std::function<void()>; using Task = std::function<void()>;
using TaskId = size_t; using TaskId = size_t;
constexpr static TaskId kInvalidTaskId = static_cast<TaskId>(-1); constexpr static TaskId kInvalidTaskId = static_cast<TaskId>(-1);
explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name); explicit DEPRECATED_SingleThreadedTaskQueueForTesting(const char* name);
~DEPRECATED_SingleThreadedTaskQueueForTesting(); ~DEPRECATED_SingleThreadedTaskQueueForTesting() override;
// Sends one task to the task-queue, and returns a handle by which the // Sends one task to the task-queue, and returns a handle by which the
// task can be cancelled. // task can be cancelled.
// This mimics the behavior of TaskQueue, but only for lambdas, rather than // This mimics the behavior of TaskQueue, but only for lambdas, rather than
// for both lambdas and QueuedTask objects. // for both lambdas and QueuedTask objects.
TaskId PostTask(Task task); TaskId PostTask(Task task) {
return PostDelayed(ToQueuedTask(std::move(task)), /*delay_ms=*/0);
}
// Same as PostTask(), but ensures that the task will not begin execution // Same as PostTask(), but ensures that the task will not begin execution
// less than |delay_ms| milliseconds after being posted; an upper bound // less than |delay_ms| milliseconds after being posted; an upper bound
// is not provided. // is not provided.
TaskId PostDelayedTask(Task task, int64_t delay_ms); TaskId PostDelayedTask(Task task, int64_t delay_ms) {
return PostDelayed(ToQueuedTask(std::move(task)), delay_ms);
}
// Send one task to the queue. The function does not return until the task // Send one task to the queue. The function does not return until the task
// has finished executing. No support for canceling the task. // has finished executing. No support for canceling the task.
@ -72,22 +78,36 @@ class DEPRECATED_SingleThreadedTaskQueueForTesting {
void Stop(); void Stop();
// Implements TaskQueueBase.
void Delete() override;
void PostTask(std::unique_ptr<QueuedTask> task) override {
PostDelayed(std::move(task), /*delay_ms=*/0);
}
void PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t delay_ms) override {
PostDelayed(std::move(task), delay_ms);
}
private: private:
struct QueuedTask { struct StoredTask {
QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task); StoredTask(TaskId task_id, std::unique_ptr<QueuedTask> task);
~QueuedTask(); ~StoredTask();
TaskId task_id; TaskId task_id;
int64_t earliest_execution_time; std::unique_ptr<QueuedTask> task;
Task task;
}; };
TaskId PostDelayed(std::unique_ptr<QueuedTask> task, int64_t delay_ms);
static void Run(void* obj); static void Run(void* obj);
void RunLoop(); void RunLoop();
rtc::CriticalSection cs_; rtc::CriticalSection cs_;
std::list<std::unique_ptr<QueuedTask>> tasks_ RTC_GUARDED_BY(cs_); // Tasks are ordered by earliest execution time.
std::multimap<int64_t, StoredTask> tasks_ RTC_GUARDED_BY(cs_);
rtc::ThreadChecker owner_thread_checker_; rtc::ThreadChecker owner_thread_checker_;
rtc::PlatformThread thread_; rtc::PlatformThread thread_;
bool running_ RTC_GUARDED_BY(cs_); bool running_ RTC_GUARDED_BY(cs_);

View File

@ -14,6 +14,7 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "api/task_queue/task_queue_test.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "test/gtest.h" #include "test/gtest.h"
@ -352,6 +353,22 @@ TEST(DEPRECATED_SingleThreadedTaskQueueForTestingTest,
EXPECT_LT(counter, tasks); EXPECT_LT(counter, tasks);
} }
class SingleThreadedTaskQueueForTestingFactory : public TaskQueueFactory {
public:
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
absl::string_view /* name */,
Priority /*priority*/) const override {
return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new DEPRECATED_SingleThreadedTaskQueueForTesting("noname"));
}
};
INSTANTIATE_TEST_SUITE_P(
DeprecatedSingleThreadedTaskQueueForTesting,
TaskQueueTest,
::testing::Values(
std::make_unique<SingleThreadedTaskQueueForTestingFactory>));
} // namespace } // namespace
} // namespace test } // namespace test
} // namespace webrtc } // namespace webrtc