Add utility class MaybeWorkerThread

The class will be used in experiment aiming at reducing the number of
used threads. The experiment will remove the need for the Pacer TQ and
RTP module worker TQ.
The helper ensure calls are made on either the worker thread a TQ
depending on the field trial
"WebRTC-SendPacketsOnWorkerThread"

Bug: webrtc:14502
Change-Id: I47581e3e3203712a244f1cb76952cd94734cc3f1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/277444
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38289}
This commit is contained in:
Per Kjellander
2022-10-03 11:48:23 +02:00
committed by WebRTC LUCI CQ
parent 8b04ef268c
commit edcae05bd4
5 changed files with 370 additions and 10 deletions

View File

@ -227,6 +227,7 @@ if (rtc_include_tests && !build_with_chromium) {
"pacing:pacing_unittests",
"remote_bitrate_estimator:remote_bitrate_estimator_unittests",
"rtp_rtcp:rtp_rtcp_unittests",
"utility:utility_unittests",
"video_coding:video_coding_unittests",
"video_coding/timing:timing_unittests",
]

View File

@ -8,18 +8,39 @@
import("../../webrtc.gni")
if (is_android) {
rtc_library("utility") {
visibility = [ "*" ]
rtc_source_set("utility") {
sources = [
"maybe_worker_thread.cc",
"maybe_worker_thread.h",
]
sources = [
deps = [
"../../api:field_trials_view",
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/task_queue:pending_task_safety_flag",
"../../rtc_base:checks",
"../../rtc_base:logging",
"../../rtc_base:macromagic",
"../../rtc_base:rtc_event",
"../../rtc_base:rtc_task_queue",
]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings:strings",
]
if (is_android) {
visibility = [ "*" ]
sources += [
"include/helpers_android.h",
"include/jvm_android.h",
"source/helpers_android.cc",
"source/jvm_android.cc",
]
deps = [
deps += [
"../../api:sequence_checker",
"../../rtc_base:checks",
"../../rtc_base:logging",
@ -27,10 +48,27 @@ if (is_android) {
"../../rtc_base/system:arch",
]
}
} else {
# Add an empty source set so that dependent targets may include utility
# unconditionally.
rtc_source_set("utility") {
visibility = [ "*" ]
}
if (rtc_include_tests) {
rtc_library("utility_unittests") {
testonly = true
sources = [ "maybe_worker_thread_unittests.cc" ]
deps = [
":utility",
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/task_queue:default_task_queue_factory",
"../../api/task_queue:pending_task_safety_flag",
"../../api/units:time_delta",
"../../rtc_base:rtc_event",
"../../rtc_base:threading",
"../../test:explicit_key_value_config",
"../../test:field_trial",
"../../test:test_main",
"../../test:test_support",
"../../test/time_controller",
]
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/utility/maybe_worker_thread.h"
#include <utility>
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_queue.h"
namespace webrtc {
MaybeWorkerThread::MaybeWorkerThread(const FieldTrialsView& field_trials,
absl::string_view task_queue_name,
TaskQueueFactory* factory)
: owned_task_queue_(
field_trials.IsEnabled("WebRTC-SendPacketsOnWorkerThread")
? nullptr
: factory->CreateTaskQueue(task_queue_name,
rtc::TaskQueue::Priority::NORMAL)),
worker_thread_(TaskQueueBase::Current()) {
RTC_DCHECK(worker_thread_);
RTC_LOG(LS_INFO) << "WebRTC-SendPacketsOnWorkerThread"
<< (owned_task_queue_ ? " Disabled" : " Enabled");
}
MaybeWorkerThread::~MaybeWorkerThread() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
}
void MaybeWorkerThread::RunSynchronous(absl::AnyInvocable<void() &&> task) {
if (owned_task_queue_) {
rtc::Event thread_sync_event;
auto closure = [&thread_sync_event, task = std::move(task)]() mutable {
std::move(task)();
thread_sync_event.Set();
};
owned_task_queue_->PostTask(std::move(closure));
thread_sync_event.Wait(rtc::Event::kForever);
} else {
RTC_DCHECK_RUN_ON(&sequence_checker_);
std::move(task)();
}
}
void MaybeWorkerThread::RunOrPost(absl::AnyInvocable<void() &&> task) {
if (owned_task_queue_) {
owned_task_queue_->PostTask(std::move(task));
} else {
RTC_DCHECK_RUN_ON(&sequence_checker_);
std::move(task)();
}
}
TaskQueueBase* MaybeWorkerThread::TaskQueueForDelayedTasks() const {
RTC_DCHECK(IsCurrent());
return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
}
TaskQueueBase* MaybeWorkerThread::TaskQueueForPost() const {
RTC_DLOG_IF(LS_WARNING, IsCurrent())
<< "TaskQueueForPost called on the current thread. Ok only in unit "
"tests.";
return owned_task_queue_ ? owned_task_queue_.get() : worker_thread_;
}
bool MaybeWorkerThread::IsCurrent() const {
if (owned_task_queue_) {
return owned_task_queue_->IsCurrent();
}
return worker_thread_->IsCurrent();
}
absl::AnyInvocable<void() &&> MaybeWorkerThread::MaybeSafeTask(
rtc::scoped_refptr<PendingTaskSafetyFlag> flag,
absl::AnyInvocable<void() &&> task) {
if (owned_task_queue_) {
return task;
} else {
return SafeTask(std::move(flag), std::move(task));
}
}
} // namespace webrtc

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_UTILITY_MAYBE_WORKER_THREAD_H_
#define MODULES_UTILITY_MAYBE_WORKER_THREAD_H_
#include <memory>
#include "absl/strings/string_view.h"
#include "api/field_trials_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
// Helper class used by experiment to replace usage of the
// RTP worker task queue owned by RtpTransportControllerSend, and the pacer task
// queue owned by TaskQueuePacedSender with the one and only worker thread.
// Tasks will run on the target sequence which is either the worker thread or
// one of these task queues depending on the field trial
// "WebRTC-SendPacketsOnWorkerThread".
// This class is assumed to be created on the worker thread and the worker
// thread is assumed to outlive an instance of this class.
//
// Experiment can be tracked in
// https://bugs.chromium.org/p/webrtc/issues/detail?id=14502
//
// After experiment evaluation, this class should be deleted.
// Calls to RunOrPost and RunSynchronous should be removed and the task should
// be invoked immediately.
// Instead of MaybeSafeTask a SafeTask should be used when posting tasks.
class RTC_LOCKABLE MaybeWorkerThread {
public:
MaybeWorkerThread(const FieldTrialsView& field_trials,
absl::string_view task_queue_name,
TaskQueueFactory* factory);
~MaybeWorkerThread();
// Runs `task` immediately on the worker thread if in experiment, otherwise
// post the task on the task queue.
void RunOrPost(absl::AnyInvocable<void() &&> task);
// Runs `task` immediately on the worker thread if in experiment, otherwise
// post the task on the task queue and use an even to wait for completion.
void RunSynchronous(absl::AnyInvocable<void() &&> task);
// Used for posting delayed or repeated tasks on the worker thread or task
// queue depending on the field trial. DCHECKs that this method is called on
// the target sequence.
TaskQueueBase* TaskQueueForDelayedTasks() const;
// Used when a task has to be posted from one sequence to the target
// sequence. A task should only be posted if a sequence hop is needed.
TaskQueueBase* TaskQueueForPost() const;
// Workaround to use a SafeTask only if the target sequence is the worker
// thread. This is used when a SafeTask can not be used because the object
// that posted the task is not destroyed on the target sequence. Instead, the
// caller has to guarantee that this MaybeWorkerThread is destroyed first
// since that guarantee that the posted task is deleted or run before the
// owning class.
absl::AnyInvocable<void() &&> MaybeSafeTask(
rtc::scoped_refptr<PendingTaskSafetyFlag> flag,
absl::AnyInvocable<void() &&> task);
// To implement macro RTC_DCHECK_RUN_ON.
// Implementation delegate to the actual used sequence.
bool IsCurrent() const;
private:
SequenceChecker sequence_checker_;
const std::unique_ptr<TaskQueueBase, TaskQueueDeleter> owned_task_queue_;
TaskQueueBase* const worker_thread_;
};
} // namespace webrtc
#endif // MODULES_UTILITY_MAYBE_WORKER_THREAD_H_

View File

@ -0,0 +1,141 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include <memory>
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/event.h"
#include "test/explicit_key_value_config.h"
#include "test/gtest.h"
#include "test/time_controller/real_time_controller.h"
namespace webrtc {
namespace {
constexpr char kFieldTrialString[] =
"WebRTC-SendPacketsOnWorkerThread/Enabled/";
TEST(MaybeWorkerThreadTest, RunOrPostRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
bool run = false;
m.RunOrPost([&] {
EXPECT_TRUE(checker.IsCurrent());
run = true;
});
EXPECT_TRUE(run);
}
TEST(MaybeWorkerThreadTest, RunOrPostPostsOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
rtc::Event event;
m.RunOrPost([&] {
EXPECT_FALSE(checker.IsCurrent());
event.Set();
});
EXPECT_TRUE(event.Wait(TimeDelta::Seconds(10)));
}
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnWorkerThreadInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
bool run = false;
m.RunSynchronous([&] {
EXPECT_TRUE(checker.IsCurrent());
run = true;
});
EXPECT_TRUE(run);
}
TEST(MaybeWorkerThreadTest, RunSynchronousRunOnTqPerDefault) {
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
SequenceChecker checker;
bool run = false;
m.RunSynchronous([&] {
EXPECT_FALSE(checker.IsCurrent());
run = true;
});
EXPECT_TRUE(run);
}
TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskPerDefault) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does not have more references after a
// call.
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
rtc::scoped_refptr<PendingTaskSafetyFlag> flag =
PendingTaskSafetyFlag::Create();
auto closure = m.MaybeSafeTask(flag, [] {});
EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kDroppedLastRef);
flag.release();
}
TEST(MaybeWorkerThreadTest, MaybeSafeTaskDoesNotReturnSafeTaskInExperiment) {
// We cant really test that the return value from MaybeSafeTask is a SafeTask.
// But we can test that the safety flag does have one more references after a
// call.
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
rtc::scoped_refptr<PendingTaskSafetyFlag> flag =
PendingTaskSafetyFlag::Create();
auto closure = m.MaybeSafeTask(flag, [] {});
EXPECT_EQ(flag->Release(), rtc::RefCountReleaseStatus::kOtherRefsRemained);
flag.release();
}
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectPerDefault) {
test::ExplicitKeyValueConfig field_trial("");
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
EXPECT_FALSE(m.IsCurrent());
m.RunSynchronous([&] { EXPECT_TRUE(m.IsCurrent()); });
}
TEST(MaybeWorkerThreadTest, IsCurrentBehavesCorrectInExperiment) {
test::ExplicitKeyValueConfig field_trial(kFieldTrialString);
RealTimeController controller;
MaybeWorkerThread m(field_trial, "test_tq", controller.GetTaskQueueFactory());
EXPECT_TRUE(m.IsCurrent());
auto tq = controller.GetTaskQueueFactory()->CreateTaskQueue(
"tq", TaskQueueFactory::Priority::NORMAL);
rtc::Event event;
tq->PostTask([&] {
EXPECT_FALSE(m.IsCurrent());
event.Set();
});
ASSERT_TRUE(event.Wait(TimeDelta::Seconds(10)));
}
} // namespace
} // namespace webrtc