Make NetworkStateEstimator injectable in RemoteBitrateEstimator

The NetworkStateEstimator is updated on every incoming RTP packet if available.

A rtcp::RemoteEstimate packet is sent every time a rtcp::TransportFeedback packet is sent.

BUG=webrtc:10742

Change-Id: I4cd8e9d85d35faf76aeefd2e26c2a9fe1a62ca3b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/152161
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29143}
This commit is contained in:
Per Kjellander
2019-09-10 19:28:06 +02:00
committed by Commit Bot
parent 467073a0c1
commit 52f7ae7c89
14 changed files with 270 additions and 64 deletions

View File

@ -101,10 +101,19 @@ struct RTPHeaderExtension {
Timestamp GetAbsoluteSendTimestamp() const {
RTC_DCHECK(hasAbsoluteSendTime);
RTC_DCHECK(absoluteSendTime < (1ul << 24));
return Timestamp::us((absoluteSendTime * 1000000L) /
return Timestamp::us((absoluteSendTime * 1000000ll) /
(1 << kAbsSendTimeFraction));
}
TimeDelta GetAbsoluteSendTimeDelta(uint32_t previous_sendtime) const {
RTC_DCHECK(hasAbsoluteSendTime);
RTC_DCHECK(absoluteSendTime < (1ul << 24));
RTC_DCHECK(previous_sendtime < (1ul << 24));
int32_t delta =
static_cast<int32_t>((absoluteSendTime - previous_sendtime) << 8) >> 8;
return TimeDelta::us((delta * 1000000ll) / (1 << kAbsSendTimeFraction));
}
bool hasTransmissionTimeOffset;
int32_t transmissionTimeOffset;
bool hasAbsoluteSendTime;

View File

@ -110,3 +110,16 @@ if (rtc_include_tests) {
]
}
}
if (rtc_include_tests) {
rtc_source_set("mock_network_control") {
testonly = true
sources = [
"test/mock_network_control.h",
]
deps = [
":network_control",
"../../test:test_support",
]
}
}

View File

@ -110,7 +110,11 @@ class NetworkStateEstimator {
// Gets the current best estimate according to the estimator.
virtual absl::optional<NetworkStateEstimate> GetCurrentEstimate() = 0;
// Called with per packet feedback regarding receive time.
// Used when the NetworkStateEstimator runs in the sending endpoint.
virtual void OnTransportPacketsFeedback(const TransportPacketsFeedback&) = 0;
// Called with per packet feedback regarding receive time.
// Used when the NetworkStateEstimator runs in the receiving endpoint.
virtual void OnReceivedPacket(const PacketResult&) {}
// Called when the receiving or sending endpoint changes address.
virtual void OnRouteChange(const NetworkRouteChange&) = 0;
virtual ~NetworkStateEstimator() = default;

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef API_TRANSPORT_TEST_MOCK_NETWORK_CONTROL_H_
#define API_TRANSPORT_TEST_MOCK_NETWORK_CONTROL_H_
#include "api/transport/network_control.h"
#include "test/gmock.h"
namespace webrtc {
class MockNetworkStateEstimator : public NetworkStateEstimator {
public:
MOCK_METHOD0(GetCurrentEstimate, absl::optional<NetworkStateEstimate>());
MOCK_METHOD1(OnTransportPacketsFeedback,
void(const TransportPacketsFeedback&));
MOCK_METHOD1(OnReceivedPacket, void(const PacketResult&));
MOCK_METHOD1(OnRouteChange, void(const NetworkRouteChange&));
};
} // namespace webrtc
#endif // API_TRANSPORT_TEST_MOCK_NETWORK_CONTROL_H_

View File

@ -27,6 +27,7 @@ rtc_static_library("congestion_controller") {
deps = [
"..:module_api",
"../../api/transport:field_trial_based_config",
"../../api/transport:network_control",
"../pacing",
"../remote_bitrate_estimator",
"../rtp_rtcp:rtp_rtcp_format",

View File

@ -15,6 +15,7 @@
#include <vector>
#include "api/transport/field_trial_based_config.h"
#include "api/transport/network_control.h"
#include "modules/include/module.h"
#include "modules/remote_bitrate_estimator/remote_estimator_proxy.h"
#include "rtc_base/constructor_magic.h"
@ -33,6 +34,10 @@ class ReceiveSideCongestionController : public CallStatsObserver,
public Module {
public:
ReceiveSideCongestionController(Clock* clock, PacketRouter* packet_router);
ReceiveSideCongestionController(
Clock* clock,
PacketRouter* packet_router,
NetworkStateEstimator* network_state_estimator);
~ReceiveSideCongestionController() override {}

View File

@ -121,18 +121,24 @@ void ReceiveSideCongestionController::WrappingBitrateEstimator::
ReceiveSideCongestionController::ReceiveSideCongestionController(
Clock* clock,
PacketRouter* packet_router)
: ReceiveSideCongestionController(clock, packet_router, nullptr) {}
ReceiveSideCongestionController::ReceiveSideCongestionController(
Clock* clock,
PacketRouter* packet_router,
NetworkStateEstimator* network_state_estimator)
: remote_bitrate_estimator_(packet_router, clock),
remote_estimator_proxy_(clock, packet_router, &field_trial_config_) {}
remote_estimator_proxy_(clock,
packet_router,
&field_trial_config_,
network_state_estimator) {}
void ReceiveSideCongestionController::OnReceivedPacket(
int64_t arrival_time_ms,
size_t payload_size,
const RTPHeader& header) {
// Send-side BWE.
if (header.extension.hasTransportSequenceNumber) {
remote_estimator_proxy_.IncomingPacket(arrival_time_ms, payload_size,
header);
} else {
remote_estimator_proxy_.IncomingPacket(arrival_time_ms, payload_size, header);
if (!header.extension.hasTransportSequenceNumber) {
// Receive-side BWE.
remote_bitrate_estimator_.IncomingPacket(arrival_time_ms, payload_size,
header);

View File

@ -292,6 +292,17 @@ bool PacketRouter::SendTransportFeedback(rtcp::TransportFeedback* packet) {
return false;
}
void PacketRouter::SendNetworkStateEstimatePacket(
rtcp::RemoteEstimate* packet) {
rtc::CritScope cs(&modules_crit_);
for (auto* rtcp_sender : rtcp_feedback_senders_) {
packet->SetSsrc(rtcp_sender->SSRC());
if (rtcp_sender->SendNetworkStateEstimatePacket(*packet)) {
break;
}
}
}
void PacketRouter::AddRembModuleCandidate(
RtcpFeedbackSenderInterface* candidate_module,
bool media_sender) {

View File

@ -78,6 +78,8 @@ class PacketRouter : public RemoteBitrateObserver,
// Send transport feedback packet to send-side.
bool SendTransportFeedback(rtcp::TransportFeedback* packet) override;
// Send RemoteEstimate packet to send-side.
void SendNetworkStateEstimatePacket(rtcp::RemoteEstimate* packet) override;
private:
RtpRtcp* FindRtpModule(uint32_t ssrc)

View File

@ -114,6 +114,7 @@ if (rtc_include_tests) {
"..:module_api_public",
"../..:webrtc_common",
"../../api/transport:field_trial_based_config",
"../../api/transport:mock_network_control",
"../../rtc_base",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_approved",

View File

@ -43,6 +43,7 @@ class TransportFeedbackSenderInterface {
public:
virtual ~TransportFeedbackSenderInterface() = default;
virtual bool SendTransportFeedback(rtcp::TransportFeedback* packet) = 0;
virtual void SendNetworkStateEstimatePacket(rtcp::RemoteEstimate* packet) = 0;
};
// TODO(holmer): Remove when all implementations have been updated.

View File

@ -32,15 +32,19 @@ static constexpr int64_t kMaxTimeMs =
RemoteEstimatorProxy::RemoteEstimatorProxy(
Clock* clock,
TransportFeedbackSenderInterface* feedback_sender,
const WebRtcKeyValueConfig* key_value_config)
const WebRtcKeyValueConfig* key_value_config,
NetworkStateEstimator* network_state_estimator)
: clock_(clock),
feedback_sender_(feedback_sender),
send_config_(key_value_config),
last_process_time_ms_(-1),
network_state_estimator_(network_state_estimator),
media_ssrc_(0),
feedback_packet_count_(0),
send_interval_ms_(send_config_.default_interval->ms()),
send_periodic_feedback_(true) {
send_periodic_feedback_(true),
previous_abs_send_time_(0),
abs_send_timestamp_(clock->CurrentTime()) {
RTC_LOG(LS_INFO)
<< "Maximum interval between transport feedback RTCP messages (ms): "
<< send_config_.max_interval->ms();
@ -51,60 +55,74 @@ RemoteEstimatorProxy::~RemoteEstimatorProxy() {}
void RemoteEstimatorProxy::IncomingPacket(int64_t arrival_time_ms,
size_t payload_size,
const RTPHeader& header) {
if (!header.extension.hasTransportSequenceNumber) {
RTC_LOG(LS_WARNING)
<< "RemoteEstimatorProxy: Incoming packet "
"is missing the transport sequence number extension!";
return;
}
if (arrival_time_ms < 0 || arrival_time_ms > kMaxTimeMs) {
RTC_LOG(LS_WARNING) << "Arrival time out of bounds: " << arrival_time_ms;
return;
}
rtc::CritScope cs(&lock_);
media_ssrc_ = header.ssrc;
int64_t seq = 0;
int64_t seq = unwrapper_.Unwrap(header.extension.transportSequenceNumber);
if (header.extension.hasTransportSequenceNumber) {
seq = unwrapper_.Unwrap(header.extension.transportSequenceNumber);
if (send_periodic_feedback_) {
if (periodic_window_start_seq_ &&
packet_arrival_times_.lower_bound(*periodic_window_start_seq_) ==
packet_arrival_times_.end()) {
// Start new feedback packet, cull old packets.
for (auto it = packet_arrival_times_.begin();
it != packet_arrival_times_.end() && it->first < seq &&
arrival_time_ms - it->second >= send_config_.back_window->ms();) {
it = packet_arrival_times_.erase(it);
if (send_periodic_feedback_) {
if (periodic_window_start_seq_ &&
packet_arrival_times_.lower_bound(*periodic_window_start_seq_) ==
packet_arrival_times_.end()) {
// Start new feedback packet, cull old packets.
for (auto it = packet_arrival_times_.begin();
it != packet_arrival_times_.end() && it->first < seq &&
arrival_time_ms - it->second >= send_config_.back_window->ms();) {
it = packet_arrival_times_.erase(it);
}
}
if (!periodic_window_start_seq_ || seq < *periodic_window_start_seq_) {
periodic_window_start_seq_ = seq;
}
}
if (!periodic_window_start_seq_ || seq < *periodic_window_start_seq_) {
periodic_window_start_seq_ = seq;
// We are only interested in the first time a packet is received.
if (packet_arrival_times_.find(seq) != packet_arrival_times_.end())
return;
packet_arrival_times_[seq] = arrival_time_ms;
// Limit the range of sequence numbers to send feedback for.
auto first_arrival_time_to_keep = packet_arrival_times_.lower_bound(
packet_arrival_times_.rbegin()->first - kMaxNumberOfPackets);
if (first_arrival_time_to_keep != packet_arrival_times_.begin()) {
packet_arrival_times_.erase(packet_arrival_times_.begin(),
first_arrival_time_to_keep);
if (send_periodic_feedback_) {
// |packet_arrival_times_| cannot be empty since we just added one
// element and the last element is not deleted.
RTC_DCHECK(!packet_arrival_times_.empty());
periodic_window_start_seq_ = packet_arrival_times_.begin()->first;
}
}
if (header.extension.feedback_request) {
// Send feedback packet immediately.
SendFeedbackOnRequest(seq, header.extension.feedback_request.value());
}
}
// We are only interested in the first time a packet is received.
if (packet_arrival_times_.find(seq) != packet_arrival_times_.end())
return;
packet_arrival_times_[seq] = arrival_time_ms;
// Limit the range of sequence numbers to send feedback for.
auto first_arrival_time_to_keep = packet_arrival_times_.lower_bound(
packet_arrival_times_.rbegin()->first - kMaxNumberOfPackets);
if (first_arrival_time_to_keep != packet_arrival_times_.begin()) {
packet_arrival_times_.erase(packet_arrival_times_.begin(),
first_arrival_time_to_keep);
if (send_periodic_feedback_) {
// |packet_arrival_times_| cannot be empty since we just added one element
// and the last element is not deleted.
RTC_DCHECK(!packet_arrival_times_.empty());
periodic_window_start_seq_ = packet_arrival_times_.begin()->first;
}
}
if (header.extension.feedback_request) {
// Send feedback packet immediately.
SendFeedbackOnRequest(seq, header.extension.feedback_request.value());
if (network_state_estimator_ && header.extension.hasAbsoluteSendTime) {
PacketResult packet_result;
packet_result.receive_time = Timestamp::ms(arrival_time_ms);
// Ignore reordering of packets and assume they have approximately the same
// send time.
abs_send_timestamp_ += std::max(
header.extension.GetAbsoluteSendTimeDelta(previous_abs_send_time_),
TimeDelta::ms(0));
previous_abs_send_time_ = header.extension.absoluteSendTime;
packet_result.sent_packet.send_time = abs_send_timestamp_;
// TODO(webrtc:10742): Take IP header and transport overhead into account.
packet_result.sent_packet.size =
DataSize::bytes(header.headerLength + payload_size);
packet_result.sent_packet.sequence_number = seq;
network_state_estimator_->OnReceivedPacket(packet_result);
}
}
@ -169,6 +187,16 @@ void RemoteEstimatorProxy::SendPeriodicFeedbacks() {
if (!periodic_window_start_seq_)
return;
if (network_state_estimator_) {
absl::optional<NetworkStateEstimate> state_estimate =
network_state_estimator_->GetCurrentEstimate();
if (state_estimate) {
rtcp::RemoteEstimate rtcp_estimate;
rtcp_estimate.SetEstimate(state_estimate.value());
feedback_sender_->SendNetworkStateEstimatePacket(&rtcp_estimate);
}
}
for (auto begin_iterator =
packet_arrival_times_.lower_bound(*periodic_window_start_seq_);
begin_iterator != packet_arrival_times_.cend();

View File

@ -14,6 +14,7 @@
#include <map>
#include <vector>
#include "api/transport/network_control.h"
#include "api/transport/webrtc_key_value_config.h"
#include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
#include "rtc_base/critical_section.h"
@ -36,7 +37,8 @@ class RemoteEstimatorProxy : public RemoteBitrateEstimator {
public:
RemoteEstimatorProxy(Clock* clock,
TransportFeedbackSenderInterface* feedback_sender,
const WebRtcKeyValueConfig* key_value_config);
const WebRtcKeyValueConfig* key_value_config,
NetworkStateEstimator* network_state_estimator);
~RemoteEstimatorProxy() override;
void IncomingPacket(int64_t arrival_time_ms,
@ -90,7 +92,9 @@ class RemoteEstimatorProxy : public RemoteBitrateEstimator {
int64_t last_process_time_ms_;
rtc::CriticalSection lock_;
// |network_state_estimator_| may be null.
NetworkStateEstimator* const network_state_estimator_
RTC_PT_GUARDED_BY(&lock_);
uint32_t media_ssrc_ RTC_GUARDED_BY(&lock_);
uint8_t feedback_packet_count_ RTC_GUARDED_BY(&lock_);
SeqNumUnwrapper<uint16_t> unwrapper_ RTC_GUARDED_BY(&lock_);
@ -99,6 +103,10 @@ class RemoteEstimatorProxy : public RemoteBitrateEstimator {
std::map<int64_t, int64_t> packet_arrival_times_ RTC_GUARDED_BY(&lock_);
int64_t send_interval_ms_ RTC_GUARDED_BY(&lock_);
bool send_periodic_feedback_ RTC_GUARDED_BY(&lock_);
// Unwraps absolute send times.
uint32_t previous_abs_send_time_ RTC_GUARDED_BY(&lock_);
Timestamp abs_send_timestamp_ RTC_GUARDED_BY(&lock_);
};
} // namespace webrtc

View File

@ -11,6 +11,7 @@
#include "modules/remote_bitrate_estimator/remote_estimator_proxy.h"
#include "api/transport/field_trial_based_config.h"
#include "api/transport/test/mock_network_control.h"
#include "modules/pacing/packet_router.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "system_wrappers/include/clock.h"
@ -61,27 +62,43 @@ class MockTransportFeedbackSender : public TransportFeedbackSenderInterface {
public:
MOCK_METHOD1(SendTransportFeedback,
bool(rtcp::TransportFeedback* feedback_packet));
MOCK_METHOD1(SendNetworkStateEstimatePacket,
void(rtcp::RemoteEstimate* packet));
};
class RemoteEstimatorProxyTest : public ::testing::Test {
public:
RemoteEstimatorProxyTest()
: clock_(0), proxy_(&clock_, &router_, &field_trial_config_) {}
: clock_(0),
proxy_(&clock_,
&router_,
&field_trial_config_,
&network_state_estimator_) {}
protected:
void IncomingPacket(uint16_t seq,
int64_t time_ms,
absl::optional<FeedbackRequest> feedback_request) {
RTPHeader header;
header.extension.hasTransportSequenceNumber = true;
header.extension.transportSequenceNumber = seq;
header.extension.feedback_request = feedback_request;
header.ssrc = kMediaSsrc;
proxy_.IncomingPacket(time_ms, kDefaultPacketSize, header);
void IncomingPacket(
uint16_t seq,
int64_t time_ms,
absl::optional<FeedbackRequest> feedback_request = absl::nullopt) {
proxy_.IncomingPacket(time_ms, kDefaultPacketSize,
CreateHeader(seq, feedback_request, absl::nullopt));
}
void IncomingPacket(uint16_t seq, int64_t time_ms) {
IncomingPacket(seq, time_ms, absl::nullopt);
RTPHeader CreateHeader(absl::optional<uint16_t> transport_sequence,
absl::optional<FeedbackRequest> feedback_request,
absl::optional<uint32_t> absolute_send_time) {
RTPHeader header;
if (transport_sequence) {
header.extension.hasTransportSequenceNumber = true;
header.extension.transportSequenceNumber = transport_sequence.value();
}
header.extension.feedback_request = feedback_request;
if (absolute_send_time) {
header.extension.hasAbsoluteSendTime = true;
header.extension.absoluteSendTime = absolute_send_time.value();
}
header.ssrc = kMediaSsrc;
return header;
}
void Process() {
@ -92,6 +109,7 @@ class RemoteEstimatorProxyTest : public ::testing::Test {
FieldTrialBasedConfig field_trial_config_;
SimulatedClock clock_;
::testing::StrictMock<MockTransportFeedbackSender> router_;
::testing::NiceMock<MockNetworkStateEstimator> network_state_estimator_;
RemoteEstimatorProxy proxy_;
};
@ -499,5 +517,74 @@ TEST_F(RemoteEstimatorProxyOnRequestTest,
kFivePacketsFeedbackRequest);
}
TEST_F(RemoteEstimatorProxyTest, ReportsIncomingPacketToNetworkStateEstimator) {
Timestamp first_send_timestamp = Timestamp::ms(0);
EXPECT_CALL(network_state_estimator_, OnReceivedPacket(_))
.WillOnce(Invoke([&first_send_timestamp](const PacketResult& packet) {
EXPECT_EQ(packet.receive_time, Timestamp::ms(kBaseTimeMs));
first_send_timestamp = packet.sent_packet.send_time;
}));
// Incoming packet with abs sendtime but without transport sequence number.
proxy_.IncomingPacket(
kBaseTimeMs, kDefaultPacketSize,
CreateHeader(absl::nullopt, absl::nullopt,
AbsoluteSendTime::MsTo24Bits(kBaseTimeMs)));
// Expect packet with older abs send time to be treated as sent at the same
// time as the previous packet due to reordering.
EXPECT_CALL(network_state_estimator_, OnReceivedPacket(_))
.WillOnce(Invoke([&first_send_timestamp](const PacketResult& packet) {
EXPECT_EQ(packet.receive_time, Timestamp::ms(kBaseTimeMs));
EXPECT_EQ(packet.sent_packet.send_time, first_send_timestamp);
}));
proxy_.IncomingPacket(
kBaseTimeMs, kDefaultPacketSize,
CreateHeader(absl::nullopt, absl::nullopt,
AbsoluteSendTime::MsTo24Bits(kBaseTimeMs - 12)));
}
TEST_F(RemoteEstimatorProxyTest, IncomingPacketHandlesWrapInAbsSendTime) {
// abs send time use 24bit precision.
const uint32_t kFirstAbsSendTime =
AbsoluteSendTime::MsTo24Bits((1 << 24) - 30);
// Second abs send time has wrapped.
const uint32_t kSecondAbsSendTime = AbsoluteSendTime::MsTo24Bits((1 << 24));
const TimeDelta kExpectedAbsSendTimeDelta = TimeDelta::ms(30);
Timestamp first_send_timestamp = Timestamp::ms(0);
EXPECT_CALL(network_state_estimator_, OnReceivedPacket(_))
.WillOnce(Invoke([&first_send_timestamp](const PacketResult& packet) {
EXPECT_EQ(packet.receive_time, Timestamp::ms(kBaseTimeMs));
first_send_timestamp = packet.sent_packet.send_time;
}));
proxy_.IncomingPacket(
kBaseTimeMs, kDefaultPacketSize,
CreateHeader(kBaseSeq, absl::nullopt, kFirstAbsSendTime));
EXPECT_CALL(network_state_estimator_, OnReceivedPacket(_))
.WillOnce(Invoke([first_send_timestamp,
kExpectedAbsSendTimeDelta](const PacketResult& packet) {
EXPECT_EQ(packet.receive_time, Timestamp::ms(kBaseTimeMs + 123));
EXPECT_EQ(packet.sent_packet.send_time.ms(),
(first_send_timestamp + kExpectedAbsSendTimeDelta).ms());
}));
proxy_.IncomingPacket(
kBaseTimeMs + 123, kDefaultPacketSize,
CreateHeader(kBaseSeq + 1, absl::nullopt, kSecondAbsSendTime));
}
TEST_F(RemoteEstimatorProxyTest, SendTransportFeedbackAndNetworkStateUpdate) {
proxy_.IncomingPacket(
kBaseTimeMs, kDefaultPacketSize,
CreateHeader(kBaseSeq, absl::nullopt,
AbsoluteSendTime::MsTo24Bits(kBaseTimeMs - 1)));
EXPECT_CALL(router_, SendTransportFeedback(_));
EXPECT_CALL(network_state_estimator_, GetCurrentEstimate())
.WillOnce(Return(NetworkStateEstimate()));
EXPECT_CALL(router_, SendNetworkStateEstimatePacket(_));
Process();
}
} // namespace
} // namespace webrtc