Fixes to posting delayed process tasks in SSCC.

The task queue based SendSideCongestionController (SSCC) was accessing
a unique pointer to the task queue from the task queue itself. This
triggered a tsan check failure when resetting the same unique pointer.

Also move declaration of SSCC member in RtpTransportControllerSend last,
to ensure that it, and its TaskQueue, are destroyed before other members.

Bug: webrtc:8415
Change-Id: I75c93f41deab637f7e4766ac4b61713c86f866e9
Reviewed-on: https://webrtc-review.googlesource.com/62143
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Björn Terelius <terelius@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#22478}
This commit is contained in:
Sebastian Jansson
2018-03-16 15:36:37 +01:00
committed by Commit Bot
parent 4ccc1c4bb6
commit 317a522876
5 changed files with 70 additions and 31 deletions

View File

@ -54,14 +54,14 @@ RtpTransportControllerSend::RtpTransportControllerSend(
const BitrateConstraints& bitrate_config)
: clock_(clock),
pacer_(clock, &packet_router_, event_log),
bitrate_configurator_(bitrate_config),
process_thread_(ProcessThread::Create("SendControllerThread")),
observer_(nullptr),
send_side_cc_(CreateController(clock,
event_log,
&pacer_,
bitrate_config,
TaskQueueExperimentEnabled())),
bitrate_configurator_(bitrate_config),
process_thread_(ProcessThread::Create("SendControllerThread")),
observer_(nullptr) {
TaskQueueExperimentEnabled())) {
process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE);
process_thread_->Start();

View File

@ -31,8 +31,9 @@ class RtcEventLog;
// TODO(nisse): When we get the underlying transports here, we should
// have one object implementing RtpTransportControllerSendInterface
// per transport, sharing the same congestion controller.
class RtpTransportControllerSend : public RtpTransportControllerSendInterface,
public NetworkChangedObserver {
class RtpTransportControllerSend final
: public RtpTransportControllerSendInterface,
public NetworkChangedObserver {
public:
RtpTransportControllerSend(Clock* clock,
RtcEventLog* event_log,
@ -83,14 +84,15 @@ class RtpTransportControllerSend : public RtpTransportControllerSendInterface,
const Clock* const clock_;
PacketRouter packet_router_;
PacedSender pacer_;
const std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
RtpKeepAliveConfig keepalive_;
RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_;
const std::unique_ptr<ProcessThread> process_thread_;
rtc::CriticalSection observer_crit_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_);
// Declared last since it will issue callbacks from a task queue. Declaring it
// last ensures that it is destroyed first.
const std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
RTC_DISALLOW_COPY_AND_ASSIGN(RtpTransportControllerSend);
};

View File

@ -130,19 +130,18 @@ class SendSideCongestionController
protected:
// TODO(srte): The tests should be rewritten to not depend on internals and
// these functions should be removed.
// Post tasks that are normally delayed. This allows unit tests to trigger
// process updates immediately.
void PostDelayedTasksForTest();
// Post periodic tasks just once. This allows unit tests to trigger process
// updates immediately.
void PostPeriodicTasksForTest();
// Waits for outstanding tasks to be finished. This allos unit tests to ensure
// that expected callbacks has be called.
// that expected callbacks has been called.
void WaitOnTasksForTest();
private:
void MaybeCreateControllers();
void StartProcess() RTC_RUN_ON(task_queue_);
void ProcessTask();
void StartPacerQueueUpdate();
void PacerQueueUpdateTask();
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
void MaybeUpdateOutstandingData();

View File

@ -18,6 +18,7 @@
#include "modules/congestion_controller/network_control/include/network_types.h"
#include "modules/congestion_controller/network_control/include/network_units.h"
#include "modules/remote_bitrate_estimator/include/bwe_defines.h"
#include "rtc_base/bind.h"
#include "rtc_base/checks.h"
#include "rtc_base/format_macros.h"
#include "rtc_base/logging.h"
@ -38,6 +39,7 @@ namespace webrtc_cc {
namespace {
const char kPacerPushbackExperiment[] = "WebRTC-PacerPushbackExperiment";
const int64_t PacerQueueUpdateIntervalMs = 25;
bool IsPacerPushbackExperimentEnabled() {
return webrtc::field_trial::IsEnabled(kPacerPushbackExperiment) ||
@ -96,6 +98,37 @@ TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
: DataRate::Infinity();
return msg;
}
// TODO(srte): Make sure this is reusable and move it to task_queue.h
// The template closure pattern is based on rtc::ClosureTask.
template <class Closure>
class PeriodicTask : public rtc::QueuedTask {
public:
PeriodicTask(Closure&& closure, int64_t period_ms)
: closure_(std::forward<Closure>(closure)), period_ms_(period_ms) {}
bool Run() override {
closure_();
// WrapUnique lets us repost this task on the TaskQueue.
rtc::TaskQueue::Current()->PostDelayedTask(rtc::WrapUnique(this),
period_ms_);
// Return false to tell TaskQueue to not destruct this object, since we have
// taken ownership with WrapUnique.
return false;
}
private:
typename std::remove_const<
typename std::remove_reference<Closure>::type>::type closure_;
const int64_t period_ms_;
};
template <class Closure>
static std::unique_ptr<rtc::QueuedTask> NewPeriodicTask(Closure&& closure,
int64_t period_ms) {
return rtc::MakeUnique<PeriodicTask<Closure>>(std::forward<Closure>(closure),
period_ms);
}
} // namespace
namespace send_side_cc_internal {
@ -314,7 +347,7 @@ void SendSideCongestionController::MaybeCreateControllers() {
controller_ =
controller_factory_->Create(control_handler_.get(), initial_config_);
UpdateStreamsConfig();
StartProcess();
StartProcessPeriodicTasks();
}
SendSideCongestionController::~SendSideCongestionController() {
@ -506,18 +539,23 @@ void SendSideCongestionController::Process() {
// Ignored, using task queue to process.
}
void SendSideCongestionController::StartProcess() {
void SendSideCongestionController::StartProcessPeriodicTasks() {
task_queue_->PostDelayedTask(
[this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
ProcessTask();
StartProcess();
},
NewPeriodicTask(
rtc::Bind(
&SendSideCongestionController::UpdateControllerWithTimeInterval,
this),
process_interval_.ms()),
process_interval_.ms());
task_queue_->PostDelayedTask(
NewPeriodicTask(
rtc::Bind(&SendSideCongestionController::UpdatePacerQueue, this),
PacerQueueUpdateIntervalMs),
PacerQueueUpdateIntervalMs);
}
void SendSideCongestionController::ProcessTask() {
RTC_DCHECK_RUN_ON(task_queue_.get());
void SendSideCongestionController::UpdateControllerWithTimeInterval() {
if (controller_) {
ProcessInterval msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
@ -525,8 +563,7 @@ void SendSideCongestionController::ProcessTask() {
}
}
void SendSideCongestionController::PacerQueueUpdateTask() {
RTC_DCHECK_RUN_ON(task_queue_.get());
void SendSideCongestionController::UpdatePacerQueue() {
if (control_handler_) {
PacerQueueUpdate msg;
msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs());
@ -591,10 +628,11 @@ SendSideCongestionController::GetTransportFeedbackVector() const {
return transport_feedback_adapter_.GetTransportFeedbackVector();
}
void SendSideCongestionController::PostDelayedTasksForTest() {
void SendSideCongestionController::PostPeriodicTasksForTest() {
task_queue_->PostTask([this]() {
ProcessTask();
PacerQueueUpdateTask();
RTC_DCHECK_RUN_ON(task_queue_.get());
UpdateControllerWithTimeInterval();
UpdatePacerQueue();
});
}

View File

@ -50,7 +50,7 @@ class SendSideCongestionControllerForTest
~SendSideCongestionControllerForTest() {}
void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); }
void Process() override {
SendSideCongestionController::PostDelayedTasksForTest();
SendSideCongestionController::PostPeriodicTasksForTest();
SendSideCongestionController::WaitOnTasksForTest();
}
};