Make rtc::Thread a TaskQueue

in support of converging on single way to run asynchronous tasks in webrtc

Bug: b/144982320
Change-Id: I200ad298136d11764a3f5c0547ebcba51aceafa0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158782
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29896}
This commit is contained in:
Danil Chapovalov
2019-11-22 15:52:40 +01:00
committed by Commit Bot
parent 2aaf4afb09
commit 912b3b83b3
5 changed files with 79 additions and 8 deletions

View File

@ -114,15 +114,11 @@ bool RtcEventLogImpl::StartLogging(std::unique_ptr<RtcEventLogOutput> output,
void RtcEventLogImpl::StopLogging() { void RtcEventLogImpl::StopLogging() {
RTC_LOG(LS_INFO) << "Stopping WebRTC event log."; RTC_LOG(LS_INFO) << "Stopping WebRTC event log.";
// TODO(danilchap): Do not block current thread waiting on the task queue.
// It might work for now, for current callers, but disallows caller to share
// threads with the |task_queue_|.
rtc::Event output_stopped; rtc::Event output_stopped;
StopLogging([&output_stopped]() { output_stopped.Set(); }); StopLogging([&output_stopped]() { output_stopped.Set(); });
// By making sure StopLogging() is not executed on a task queue,
// we ensure it's not running on a thread that is shared with |task_queue_|,
// meaning the following Wait() will not block forever.
RTC_DCHECK(TaskQueueBase::Current() == nullptr);
output_stopped.Wait(rtc::Event::kForever); output_stopped.Wait(rtc::Event::kForever);
RTC_LOG(LS_INFO) << "WebRTC event log successfully stopped."; RTC_LOG(LS_INFO) << "WebRTC event log successfully stopped.";

View File

@ -786,6 +786,7 @@ rtc_library("rtc_base") {
":stringutils", ":stringutils",
"../api:array_view", "../api:array_view",
"../api:scoped_refptr", "../api:scoped_refptr",
"../api/task_queue",
"network:sent_packet", "network:sent_packet",
"system:file_wrapper", "system:file_wrapper",
"system:rtc_export", "system:rtc_export",
@ -1368,6 +1369,8 @@ if (rtc_include_tests) {
":stringutils", ":stringutils",
":testclient", ":testclient",
"../api:array_view", "../api:array_view",
"../api/task_queue",
"../api/task_queue:task_queue_test",
"../test:fileutils", "../test:fileutils",
"../test:test_main", "../test:test_main",
"../test:test_support", "../test:test_support",

View File

@ -335,6 +335,7 @@ void* Thread::PreRun(void* pv) {
Thread* thread = static_cast<Thread*>(pv); Thread* thread = static_cast<Thread*>(pv);
ThreadManager::Instance()->SetCurrentThread(thread); ThreadManager::Instance()->SetCurrentThread(thread);
rtc::SetCurrentThreadName(thread->name_.c_str()); rtc::SetCurrentThreadName(thread->name_.c_str());
CurrentTaskQueueSetter set_current_task_queue(thread);
#if defined(WEBRTC_MAC) #if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool; ScopedAutoReleasePool pool;
#endif #endif
@ -475,6 +476,41 @@ void Thread::InvokeInternal(const Location& posted_from,
Send(posted_from, handler); Send(posted_from, handler);
} }
void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
RTC_DCHECK(msg);
auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data());
// MessageQueue expects handler to own Message::pdata when OnMessage is called
// Since MessageData is no longer needed, delete it.
delete data;
// QueuedTask interface uses Run return value to communicate who owns the
// task. false means QueuedTask took the ownership.
if (!task->Run())
task.release();
}
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
// Though Post takes MessageData by raw pointer (last parameter), it still
// takes it with ownership.
Post(RTC_FROM_HERE, &queued_task_handler_,
/*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}
void Thread::PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) {
// Though PostDelayed takes MessageData by raw pointer (last parameter),
// it still takes it with ownership.
PostDelayed(RTC_FROM_HERE, milliseconds, &queued_task_handler_,
/*id=*/0,
new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}
void Thread::Delete() {
Stop();
delete this;
}
bool Thread::IsProcessingMessagesForTesting() { bool Thread::IsProcessingMessagesForTesting() {
return (owned_ || IsCurrent()) && return (owned_ || IsCurrent()) &&
MessageQueue::IsProcessingMessagesForTesting(); MessageQueue::IsProcessingMessagesForTesting();

View File

@ -21,6 +21,8 @@
#if defined(WEBRTC_POSIX) #if defined(WEBRTC_POSIX)
#include <pthread.h> #include <pthread.h>
#endif #endif
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/constructor_magic.h" #include "rtc_base/constructor_magic.h"
#include "rtc_base/location.h" #include "rtc_base/location.h"
#include "rtc_base/message_handler.h" #include "rtc_base/message_handler.h"
@ -133,7 +135,8 @@ struct _SendMessage {
// WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread().
class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue { class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue,
public webrtc::TaskQueueBase {
public: public:
explicit Thread(SocketServer* ss); explicit Thread(SocketServer* ss);
explicit Thread(std::unique_ptr<SocketServer> ss); explicit Thread(std::unique_ptr<SocketServer> ss);
@ -263,6 +266,12 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue {
std::forward<FunctorT>(functor))); std::forward<FunctorT>(functor)));
} }
// From TaskQueueBase
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) override;
void Delete() override;
// From MessageQueue // From MessageQueue
bool IsProcessingMessagesForTesting() override; bool IsProcessingMessagesForTesting() override;
void Clear(MessageHandler* phandler, void Clear(MessageHandler* phandler,
@ -325,6 +334,10 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue {
friend class ScopedDisallowBlockingCalls; friend class ScopedDisallowBlockingCalls;
private: private:
class QueuedTaskHandler final : public MessageHandler {
public:
void OnMessage(Message* msg) override;
};
// Sets the per-thread allow-blocking-calls flag and returns the previous // Sets the per-thread allow-blocking-calls flag and returns the previous
// value. Must be called on this thread. // value. Must be called on this thread.
bool SetAllowBlockingCalls(bool allow); bool SetAllowBlockingCalls(bool allow);
@ -381,6 +394,9 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue {
// Only touched from the worker thread itself. // Only touched from the worker thread itself.
bool blocking_calls_allowed_ = true; bool blocking_calls_allowed_ = true;
// Runs webrtc::QueuedTask posted to the Thread.
QueuedTaskHandler queued_task_handler_;
friend class ThreadManager; friend class ThreadManager;
RTC_DISALLOW_COPY_AND_ASSIGN(Thread); RTC_DISALLOW_COPY_AND_ASSIGN(Thread);

View File

@ -12,6 +12,8 @@
#include <memory> #include <memory>
#include "api/task_queue/task_queue_factory.h"
#include "api/task_queue/task_queue_test.h"
#include "rtc_base/async_invoker.h" #include "rtc_base/async_invoker.h"
#include "rtc_base/async_udp_socket.h" #include "rtc_base/async_udp_socket.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
@ -900,5 +902,23 @@ TEST(ThreadPostTaskTest, InvokesInPostedOrder) {
fourth.Wait(Event::kForever); fourth.Wait(Event::kForever);
} }
class ThreadFactory : public webrtc::TaskQueueFactory {
public:
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>
CreateTaskQueue(absl::string_view /* name */,
Priority /*priority*/) const override {
std::unique_ptr<Thread> thread = Thread::Create();
thread->Start();
return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
thread.release());
}
};
using ::webrtc::TaskQueueTest;
INSTANTIATE_TEST_SUITE_P(RtcThread,
TaskQueueTest,
::testing::Values(std::make_unique<ThreadFactory>));
} // namespace } // namespace
} // namespace rtc } // namespace rtc