Adds simulated TCP message route for testing.
This TCP message route allows simulation of sending a fixed lengths message over an existing route. This can be used to simulate reliable signaling in tests as well as simulating the cross traffic impact of TCP connection. It is based on the existing Fake TCP cross traffic implementation. Bug: webrtc:9510 Change-Id: Ibfc2a9a5b95593b00db16de2c09ce929077cf5c5 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/159482 Commit-Queue: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Niels Moller <nisse@webrtc.org> Cr-Commit-Position: refs/heads/master@{#29777}
This commit is contained in:

committed by
Commit Bot

parent
b4463eeedc
commit
dcc910a209
@ -120,6 +120,7 @@ rtc_library("cross_traffic_unittest") {
|
||||
"../../call:simulated_network",
|
||||
"../../rtc_base:logging",
|
||||
"../../rtc_base:rtc_event",
|
||||
"//test/time_controller:time_controller",
|
||||
"//third_party/abseil-cpp/absl/memory",
|
||||
]
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
#include "absl/memory/memory.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "cross_traffic.h"
|
||||
#include "rtc_base/logging.h"
|
||||
#include "rtc_base/numerics/safe_minmax.h"
|
||||
|
||||
@ -115,6 +116,127 @@ ColumnPrinter PulsedPeaksCrossTraffic::StatsPrinter() {
|
||||
32);
|
||||
}
|
||||
|
||||
TcpMessageRoute::TcpMessageRoute(Clock* clock,
|
||||
TaskQueueBase* task_queue,
|
||||
EmulatedRoute* send_route,
|
||||
EmulatedRoute* ret_route)
|
||||
: clock_(clock),
|
||||
task_queue_(task_queue),
|
||||
request_route_(send_route,
|
||||
[this](TcpPacket packet, Timestamp) {
|
||||
OnRequest(std::move(packet));
|
||||
}),
|
||||
response_route_(ret_route,
|
||||
[this](TcpPacket packet, Timestamp arrival_time) {
|
||||
OnResponse(std::move(packet), arrival_time);
|
||||
}) {}
|
||||
|
||||
void TcpMessageRoute::SendMessage(size_t size,
|
||||
std::function<void()> on_received) {
|
||||
task_queue_->PostTask(
|
||||
ToQueuedTask([this, size, handler = std::move(on_received)] {
|
||||
// If we are currently sending a message we won't reset the connection,
|
||||
// we'll act as if the messages are sent in the same TCP stream. This is
|
||||
// intended to simulate recreation of a TCP session for each message
|
||||
// in the typical case while avoiding the complexity overhead of
|
||||
// maintaining multiple virtual TCP sessions in parallel.
|
||||
if (pending_.empty() && in_flight_.empty()) {
|
||||
cwnd_ = 10;
|
||||
ssthresh_ = INFINITY;
|
||||
}
|
||||
size_t data_left = size;
|
||||
size_t kMaxPacketSize = 1200;
|
||||
Message message{std::move(handler)};
|
||||
while (data_left > 0) {
|
||||
size_t packet_size = std::min(data_left, kMaxPacketSize);
|
||||
int fragment_id = next_fragment_id_++;
|
||||
pending_.push_back(MessageFragment{fragment_id, packet_size});
|
||||
message.pending_fragment_ids.insert(fragment_id);
|
||||
data_left -= packet_size;
|
||||
}
|
||||
messages_.emplace_back(message);
|
||||
SendPackets(clock_->CurrentTime());
|
||||
}));
|
||||
}
|
||||
|
||||
void TcpMessageRoute::OnRequest(TcpPacket packet_info) {
|
||||
for (auto it = messages_.begin(); it != messages_.end(); ++it) {
|
||||
if (it->pending_fragment_ids.count(packet_info.fragment.fragment_id) != 0) {
|
||||
it->pending_fragment_ids.erase(packet_info.fragment.fragment_id);
|
||||
if (it->pending_fragment_ids.empty()) {
|
||||
it->handler();
|
||||
messages_.erase(it);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
const size_t kAckPacketSize = 20;
|
||||
response_route_.SendPacket(kAckPacketSize, packet_info);
|
||||
}
|
||||
|
||||
void TcpMessageRoute::OnResponse(TcpPacket packet_info, Timestamp at_time) {
|
||||
auto it = in_flight_.find(packet_info.sequence_number);
|
||||
if (it != in_flight_.end()) {
|
||||
last_rtt_ = at_time - packet_info.send_time;
|
||||
in_flight_.erase(it);
|
||||
}
|
||||
auto lost_end = in_flight_.lower_bound(packet_info.sequence_number);
|
||||
for (auto lost_it = in_flight_.begin(); lost_it != lost_end;
|
||||
lost_it = in_flight_.erase(lost_it)) {
|
||||
pending_.push_front(lost_it->second.fragment);
|
||||
}
|
||||
|
||||
if (packet_info.sequence_number - last_acked_seq_num_ > 1) {
|
||||
HandleLoss(at_time);
|
||||
} else if (cwnd_ <= ssthresh_) {
|
||||
cwnd_ += 1;
|
||||
} else {
|
||||
cwnd_ += 1.0f / cwnd_;
|
||||
}
|
||||
last_acked_seq_num_ =
|
||||
std::max(packet_info.sequence_number, last_acked_seq_num_);
|
||||
SendPackets(at_time);
|
||||
}
|
||||
|
||||
void TcpMessageRoute::HandleLoss(Timestamp at_time) {
|
||||
if (at_time - last_reduction_time_ < last_rtt_)
|
||||
return;
|
||||
last_reduction_time_ = at_time;
|
||||
ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
|
||||
cwnd_ = ssthresh_;
|
||||
}
|
||||
|
||||
void TcpMessageRoute::SendPackets(Timestamp at_time) {
|
||||
const TimeDelta kPacketTimeout = TimeDelta::seconds(1);
|
||||
int cwnd = std::ceil(cwnd_);
|
||||
int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
|
||||
while (packets_to_send-- > 0 && !pending_.empty()) {
|
||||
auto seq_num = next_sequence_number_++;
|
||||
TcpPacket send;
|
||||
send.sequence_number = seq_num;
|
||||
send.send_time = at_time;
|
||||
send.fragment = pending_.front();
|
||||
pending_.pop_front();
|
||||
request_route_.SendPacket(send.fragment.size, send);
|
||||
in_flight_.insert({seq_num, send});
|
||||
task_queue_->PostDelayedTask(ToQueuedTask([this, seq_num] {
|
||||
HandlePacketTimeout(seq_num,
|
||||
clock_->CurrentTime());
|
||||
}),
|
||||
kPacketTimeout.ms());
|
||||
}
|
||||
}
|
||||
|
||||
void TcpMessageRoute::HandlePacketTimeout(int seq_num, Timestamp at_time) {
|
||||
auto lost = in_flight_.find(seq_num);
|
||||
if (lost != in_flight_.end()) {
|
||||
pending_.push_front(lost->second.fragment);
|
||||
in_flight_.erase(lost);
|
||||
HandleLoss(at_time);
|
||||
SendPackets(at_time);
|
||||
}
|
||||
}
|
||||
|
||||
FakeTcpCrossTraffic::FakeTcpCrossTraffic(Clock* clock,
|
||||
FakeTcpConfig config,
|
||||
EmulatedRoute* send_route,
|
||||
|
@ -92,6 +92,68 @@ class PulsedPeaksCrossTraffic {
|
||||
bool sending_ RTC_GUARDED_BY(sequence_checker_) = false;
|
||||
};
|
||||
|
||||
// Simulates a TCP connection, this roughly implements the Reno algorithm. In
|
||||
// difference from TCP this only support sending messages with a fixed length,
|
||||
// no streaming. This is useful to simulate signaling and cross traffic using
|
||||
// message based protocols such as HTTP. It differs from UDP messages in that
|
||||
// they are guranteed to be delivered eventually, even on lossy networks.
|
||||
class TcpMessageRoute {
|
||||
public:
|
||||
TcpMessageRoute(Clock* clock,
|
||||
TaskQueueBase* task_queue,
|
||||
EmulatedRoute* send_route,
|
||||
EmulatedRoute* ret_route);
|
||||
|
||||
// Sends a TCP message of the given |size| over the route, |on_received| is
|
||||
// called when the message has been delivered. Note that the connection
|
||||
// parameters are reset iff there's no currently pending message on the route.
|
||||
void SendMessage(size_t size, std::function<void()> on_received);
|
||||
|
||||
private:
|
||||
// Represents a message sent over the route. When all fragments has been
|
||||
// delivered, the message is considered delivered and the handler is
|
||||
// triggered. This only happen once.
|
||||
struct Message {
|
||||
std::function<void()> handler;
|
||||
std::set<int> pending_fragment_ids;
|
||||
};
|
||||
// Represents a piece of a message that fit into a TCP packet.
|
||||
struct MessageFragment {
|
||||
int fragment_id;
|
||||
size_t size;
|
||||
};
|
||||
// Represents a packet sent on the wire.
|
||||
struct TcpPacket {
|
||||
int sequence_number;
|
||||
Timestamp send_time = Timestamp::MinusInfinity();
|
||||
MessageFragment fragment;
|
||||
};
|
||||
|
||||
void OnRequest(TcpPacket packet_info);
|
||||
void OnResponse(TcpPacket packet_info, Timestamp at_time);
|
||||
void HandleLoss(Timestamp at_time);
|
||||
void SendPackets(Timestamp at_time);
|
||||
void HandlePacketTimeout(int seq_num, Timestamp at_time);
|
||||
|
||||
Clock* const clock_;
|
||||
TaskQueueBase* const task_queue_;
|
||||
FakePacketRoute<TcpPacket> request_route_;
|
||||
FakePacketRoute<TcpPacket> response_route_;
|
||||
|
||||
std::deque<MessageFragment> pending_;
|
||||
std::map<int, TcpPacket> in_flight_;
|
||||
std::list<Message> messages_;
|
||||
|
||||
double cwnd_;
|
||||
double ssthresh_;
|
||||
|
||||
int last_acked_seq_num_ = 0;
|
||||
int next_sequence_number_ = 0;
|
||||
int next_fragment_id_ = 0;
|
||||
Timestamp last_reduction_time_ = Timestamp::MinusInfinity();
|
||||
TimeDelta last_rtt_ = TimeDelta::Zero();
|
||||
};
|
||||
|
||||
struct FakeTcpConfig {
|
||||
DataSize packet_size = DataSize::bytes(1200);
|
||||
DataSize send_limit = DataSize::PlusInfinity();
|
||||
|
@ -22,6 +22,8 @@
|
||||
#include "rtc_base/logging.h"
|
||||
#include "test/gmock.h"
|
||||
#include "test/gtest.h"
|
||||
#include "test/network/network_emulation_manager.h"
|
||||
#include "test/time_controller/simulated_time_controller.h"
|
||||
|
||||
namespace webrtc {
|
||||
namespace test {
|
||||
@ -110,5 +112,40 @@ TEST(CrossTrafficTest, RandomWalkCrossTraffic) {
|
||||
kExpectedDataSent.bytes() * 0.1);
|
||||
}
|
||||
|
||||
TEST(TcpMessageRouteTest, DeliveredOnLossyNetwork) {
|
||||
GlobalSimulatedTimeController time(Timestamp::seconds(0));
|
||||
NetworkEmulationManagerImpl net(&time);
|
||||
BuiltInNetworkBehaviorConfig send;
|
||||
// 800 kbps means that the 100 kB message would be delivered in ca 1 second
|
||||
// under ideal conditions and no overhead.
|
||||
send.link_capacity_kbps = 100 * 8;
|
||||
send.loss_percent = 50;
|
||||
send.queue_delay_ms = 100;
|
||||
send.delay_standard_deviation_ms = 20;
|
||||
send.allow_reordering = true;
|
||||
auto ret = send;
|
||||
ret.loss_percent = 10;
|
||||
|
||||
auto* tcp_route = net.CreateTcpRoute({net.CreateEmulatedNode(send)},
|
||||
{net.CreateEmulatedNode(ret)});
|
||||
int deliver_count = 0;
|
||||
// 100 kB is more than what fits into a single packet.
|
||||
constexpr size_t kMessageSize = 100000;
|
||||
|
||||
tcp_route->SendMessage(kMessageSize, [&] {
|
||||
RTC_LOG(LS_INFO) << "Received at "
|
||||
<< ToString(time.GetClock()->CurrentTime());
|
||||
deliver_count++;
|
||||
});
|
||||
|
||||
// If there was no loss, we would have delivered the message in ca 1 second,
|
||||
// with 50% it should take much longer.
|
||||
time.Sleep(TimeDelta::seconds(5));
|
||||
ASSERT_EQ(deliver_count, 0);
|
||||
// But given enough time the messsage will be delivered, but only once.
|
||||
time.Sleep(TimeDelta::seconds(60));
|
||||
EXPECT_EQ(deliver_count, 1);
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
} // namespace webrtc
|
||||
|
@ -228,6 +228,18 @@ FakeTcpCrossTraffic* NetworkEmulationManagerImpl::StartFakeTcpCrossTraffic(
|
||||
return traffic_ptr;
|
||||
}
|
||||
|
||||
TcpMessageRoute* NetworkEmulationManagerImpl::CreateTcpRoute(
|
||||
std::vector<EmulatedNetworkNode*> send_link,
|
||||
std::vector<EmulatedNetworkNode*> ret_link) {
|
||||
auto tcp_route = std::make_unique<TcpMessageRoute>(
|
||||
clock_, task_queue_.Get(), CreateRoute(send_link), CreateRoute(ret_link));
|
||||
auto* route_ptr = tcp_route.get();
|
||||
task_queue_.PostTask([this, tcp_route = std::move(tcp_route)]() mutable {
|
||||
tcp_message_routes_.push_back(std::move(tcp_route));
|
||||
});
|
||||
return route_ptr;
|
||||
}
|
||||
|
||||
void NetworkEmulationManagerImpl::StopCrossTraffic(
|
||||
FakeTcpCrossTraffic* traffic) {
|
||||
task_queue_.PostTask([=]() {
|
||||
|
@ -76,6 +76,10 @@ class NetworkEmulationManagerImpl : public NetworkEmulationManager {
|
||||
std::vector<EmulatedNetworkNode*> send_link,
|
||||
std::vector<EmulatedNetworkNode*> ret_link,
|
||||
FakeTcpConfig config);
|
||||
|
||||
TcpMessageRoute* CreateTcpRoute(std::vector<EmulatedNetworkNode*> send_link,
|
||||
std::vector<EmulatedNetworkNode*> ret_link);
|
||||
|
||||
void StopCrossTraffic(FakeTcpCrossTraffic* traffic);
|
||||
|
||||
EmulatedNetworkManagerInterface* CreateEmulatedNetworkManagerInterface(
|
||||
@ -101,6 +105,7 @@ class NetworkEmulationManagerImpl : public NetworkEmulationManager {
|
||||
std::vector<std::unique_ptr<RandomWalkCrossTraffic>> random_cross_traffics_;
|
||||
std::vector<std::unique_ptr<PulsedPeaksCrossTraffic>> pulsed_cross_traffics_;
|
||||
std::list<std::unique_ptr<FakeTcpCrossTraffic>> tcp_cross_traffics_;
|
||||
std::list<std::unique_ptr<TcpMessageRoute>> tcp_message_routes_;
|
||||
std::vector<std::unique_ptr<EndpointsContainer>> endpoints_containers_;
|
||||
std::vector<std::unique_ptr<EmulatedNetworkManager>> network_managers_;
|
||||
|
||||
|
Reference in New Issue
Block a user