Remove rtc::MessageHandler usage in pseudo tcp unittests

Bug: webrtc:11988
Change-Id: Iac41f18410828333b40012d4876db23673d198d8
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272283
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37834}
This commit is contained in:
Danil Chapovalov
2022-08-19 10:28:40 +02:00
committed by WebRTC LUCI CQ
parent 9f1f48bdd8
commit 4d715385e1
2 changed files with 48 additions and 75 deletions

View File

@ -271,6 +271,8 @@ if (rtc_include_tests) {
"../api:mock_async_dns_resolver", "../api:mock_async_dns_resolver",
"../api:packet_socket_factory", "../api:packet_socket_factory",
"../api:scoped_refptr", "../api:scoped_refptr",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/transport:stun_types", "../api/transport:stun_types",
"../api/units:time_delta", "../api/units:time_delta",
"../rtc_base", "../rtc_base",
@ -280,7 +282,6 @@ if (rtc_include_tests) {
"../rtc_base:copy_on_write_buffer", "../rtc_base:copy_on_write_buffer",
"../rtc_base:gunit_helpers", "../rtc_base:gunit_helpers",
"../rtc_base:ip_address", "../rtc_base:ip_address",
"../rtc_base:location",
"../rtc_base:logging", "../rtc_base:logging",
"../rtc_base:macromagic", "../rtc_base:macromagic",
"../rtc_base:net_helpers", "../rtc_base:net_helpers",

View File

@ -15,19 +15,23 @@
#include <algorithm> #include <algorithm>
#include <cstddef> #include <cstddef>
#include <string> #include <string>
#include <utility>
#include <vector> #include <vector>
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/gunit.h" #include "rtc_base/gunit.h"
#include "rtc_base/helpers.h" #include "rtc_base/helpers.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/memory_stream.h" #include "rtc_base/memory_stream.h"
#include "rtc_base/message_handler.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "test/gtest.h" #include "test/gtest.h"
using cricket::PseudoTcp; using ::cricket::PseudoTcp;
using ::webrtc::ScopedTaskSafety;
using ::webrtc::TaskQueueBase;
using ::webrtc::TimeDelta;
static const int kConnectTimeoutMs = 10000; // ~3 * default RTO of 3000ms static const int kConnectTimeoutMs = 10000; // ~3 * default RTO of 3000ms
static const int kTransferTimeoutMs = 15000; static const int kTransferTimeoutMs = 15000;
@ -44,7 +48,6 @@ class PseudoTcpForTest : public cricket::PseudoTcp {
}; };
class PseudoTcpTestBase : public ::testing::Test, class PseudoTcpTestBase : public ::testing::Test,
public rtc::MessageHandlerAutoCleanup,
public cricket::IPseudoTcpNotify { public cricket::IPseudoTcpNotify {
public: public:
PseudoTcpTestBase() PseudoTcpTestBase()
@ -121,14 +124,6 @@ class PseudoTcpTestBase : public ::testing::Test,
UpdateLocalClock(); UpdateLocalClock();
} }
enum {
MSG_LPACKET,
MSG_RPACKET,
MSG_LCLOCK,
MSG_RCLOCK,
MSG_IOCOMPLETE,
MSG_WRITE
};
virtual void OnTcpOpen(PseudoTcp* tcp) { virtual void OnTcpOpen(PseudoTcp* tcp) {
// Consider ourselves connected when the local side gets OnTcpOpen. // Consider ourselves connected when the local side gets OnTcpOpen.
// OnTcpWriteable isn't fired at open, so we trigger it now. // OnTcpWriteable isn't fired at open, so we trigger it now.
@ -173,54 +168,48 @@ class PseudoTcpTestBase : public ::testing::Test,
<< len; << len;
return WR_SUCCESS; return WR_SUCCESS;
} }
int id = (tcp == &local_) ? MSG_RPACKET : MSG_LPACKET; PseudoTcp* other;
ScopedTaskSafety* timer;
if (tcp == &local_) {
other = &remote_;
timer = &remote_timer_;
} else {
other = &local_;
timer = &local_timer_;
}
std::string packet(buffer, len); std::string packet(buffer, len);
rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, delay_, this, id, ++packets_in_flight_;
rtc::WrapMessageData(packet)); TaskQueueBase::Current()->PostDelayedTask(
[other, timer, packet = std::move(packet), this] {
--packets_in_flight_;
other->NotifyPacket(packet.c_str(), packet.size());
UpdateClock(*other, *timer);
},
TimeDelta::Millis(delay_));
return WR_SUCCESS; return WR_SUCCESS;
} }
void UpdateLocalClock() { UpdateClock(&local_, MSG_LCLOCK); } void UpdateLocalClock() { UpdateClock(local_, local_timer_); }
void UpdateRemoteClock() { UpdateClock(&remote_, MSG_RCLOCK); } void UpdateRemoteClock() { UpdateClock(remote_, remote_timer_); }
void UpdateClock(PseudoTcp* tcp, uint32_t message) { static void UpdateClock(PseudoTcp& tcp, ScopedTaskSafety& timer) {
long interval = 0; // NOLINT long interval = 0; // NOLINT
tcp->GetNextClock(PseudoTcp::Now(), interval); tcp.GetNextClock(PseudoTcp::Now(), interval);
interval = std::max<int>(interval, 0L); // sometimes interval is < 0 interval = std::max<int>(interval, 0L); // sometimes interval is < 0
rtc::Thread::Current()->Clear(this, message); timer.reset();
rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, interval, this, message); TaskQueueBase::Current()->PostDelayedTask(
} SafeTask(timer.flag(),
[&tcp, &timer] {
virtual void OnMessage(rtc::Message* message) { tcp.NotifyClock(PseudoTcp::Now());
switch (message->message_id) { UpdateClock(tcp, timer);
case MSG_LPACKET: { }),
const std::string& s(rtc::UseMessageData<std::string>(message->pdata)); TimeDelta::Millis(interval));
local_.NotifyPacket(s.c_str(), s.size());
UpdateLocalClock();
break;
}
case MSG_RPACKET: {
const std::string& s(rtc::UseMessageData<std::string>(message->pdata));
remote_.NotifyPacket(s.c_str(), s.size());
UpdateRemoteClock();
break;
}
case MSG_LCLOCK:
local_.NotifyClock(PseudoTcp::Now());
UpdateLocalClock();
break;
case MSG_RCLOCK:
remote_.NotifyClock(PseudoTcp::Now());
UpdateRemoteClock();
break;
default:
break;
}
delete message->pdata;
} }
rtc::AutoThread main_thread_; rtc::AutoThread main_thread_;
PseudoTcpForTest local_; PseudoTcpForTest local_;
PseudoTcpForTest remote_; PseudoTcpForTest remote_;
ScopedTaskSafety local_timer_;
ScopedTaskSafety remote_timer_;
rtc::MemoryStream send_stream_; rtc::MemoryStream send_stream_;
rtc::MemoryStream recv_stream_; rtc::MemoryStream recv_stream_;
bool have_connected_; bool have_connected_;
@ -231,6 +220,7 @@ class PseudoTcpTestBase : public ::testing::Test,
int loss_; int loss_;
bool drop_next_packet_ = false; bool drop_next_packet_ = false;
bool simultaneous_open_ = false; bool simultaneous_open_ = false;
int packets_in_flight_ = 0;
}; };
class PseudoTcpTest : public PseudoTcpTestBase { class PseudoTcpTest : public PseudoTcpTestBase {
@ -480,7 +470,7 @@ class PseudoTcpTestReceiveWindow : public PseudoTcpTestBase {
EXPECT_EQ(0, Connect()); EXPECT_EQ(0, Connect());
EXPECT_TRUE_WAIT(have_connected_, kConnectTimeoutMs); EXPECT_TRUE_WAIT(have_connected_, kConnectTimeoutMs);
rtc::Thread::Current()->Post(RTC_FROM_HERE, this, MSG_WRITE); TaskQueueBase::Current()->PostTask([this] { WriteData(); });
EXPECT_TRUE_WAIT(have_disconnected_, kTransferTimeoutMs); EXPECT_TRUE_WAIT(have_disconnected_, kTransferTimeoutMs);
ASSERT_EQ(2u, send_position_.size()); ASSERT_EQ(2u, send_position_.size());
@ -498,20 +488,6 @@ class PseudoTcpTestReceiveWindow : public PseudoTcpTestBase {
EXPECT_EQ(2 * estimated_recv_window, recv_position_[1]); EXPECT_EQ(2 * estimated_recv_window, recv_position_[1]);
} }
virtual void OnMessage(rtc::Message* message) {
int message_id = message->message_id;
PseudoTcpTestBase::OnMessage(message);
switch (message_id) {
case MSG_WRITE: {
WriteData();
break;
}
default:
break;
}
}
uint32_t EstimateReceiveWindowSize() const { uint32_t EstimateReceiveWindowSize() const {
return static_cast<uint32_t>(recv_position_[0]); return static_cast<uint32_t>(recv_position_[0]);
} }
@ -575,15 +551,11 @@ class PseudoTcpTestReceiveWindow : public PseudoTcpTestBase {
} while (sent > 0); } while (sent > 0);
// At this point, we've filled up the available space in the send queue. // At this point, we've filled up the available space in the send queue.
int message_queue_size = static_cast<int>(rtc::Thread::Current()->size()); if (packets_in_flight_ > 0) {
// The message queue will always have at least 2 messages, an RCLOCK and // If there are packet tasks, attempt to continue sending after giving
// an LCLOCK, since they are added back on the delay queue at the same time // those packets time to process, which should free up the send buffer.
// they are pulled off and therefore are never really removed. rtc::Thread::Current()->PostDelayedTask([this] { WriteData(); },
if (message_queue_size > 2) { TimeDelta::Millis(10));
// If there are non-clock messages remaining, attempt to continue sending
// after giving those messages time to process, which should free up the
// send buffer.
rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, 10, this, MSG_WRITE);
} else { } else {
if (!remote_.isReceiveBufferFull()) { if (!remote_.isReceiveBufferFull()) {
RTC_LOG(LS_ERROR) << "This shouldn't happen - the send buffer is full, " RTC_LOG(LS_ERROR) << "This shouldn't happen - the send buffer is full, "