Add flakyness check based on the recently received packets.

BUG=
R=pthatcher@webrtc.org

Review URL: https://codereview.webrtc.org/1207563002 .

Cr-Commit-Position: refs/heads/master@{#9553}
This commit is contained in:
Peter Thatcher
2015-07-08 11:08:35 -07:00
parent aa97df4559
commit 54360510ff
24 changed files with 267 additions and 16 deletions

View File

@ -250,6 +250,14 @@ class PCOJava : public PeerConnectionObserver {
CHECK_EXCEPTION(jni()) << "error during CallVoidMethod";
}
void OnIceConnectionReceivingChange(bool receiving) override {
ScopedLocalRefFrame local_ref_frame(jni());
jmethodID m = GetMethodID(
jni(), *j_observer_class_, "onIceConnectionReceivingChange", "(Z)V");
jni()->CallVoidMethod(*j_observer_global_, m, receiving);
CHECK_EXCEPTION(jni()) << "error during CallVoidMethod";
}
void OnIceGatheringChange(
PeerConnectionInterface::IceGatheringState new_state) override {
ScopedLocalRefFrame local_ref_frame(jni());

View File

@ -66,6 +66,9 @@ public class PeerConnection {
/** Triggered when the IceConnectionState changes. */
public void onIceConnectionChange(IceConnectionState newState);
/** Triggered when the ICE connection receiving status changes. */
public void onIceConnectionReceivingChange(boolean receiving);
/** Triggered when the IceGatheringState changes. */
public void onIceGatheringChange(IceGatheringState newState);

View File

@ -175,6 +175,11 @@ public class PeerConnectionTest {
assertEquals(expectedIceConnectionChanges.removeFirst(), newState);
}
@Override
public synchronized void onIceConnectionReceivingChange(boolean receiving) {
System.out.println(name + "Got an ice connection receiving change " + receiving);
}
public synchronized void expectIceGatheringChange(
IceGatheringState newState) {
expectedIceGatheringChanges.add(newState);

View File

@ -886,6 +886,11 @@ void PeerConnection::OnIceComplete() {
observer_->OnIceComplete();
}
void PeerConnection::OnIceConnectionReceivingChange(bool receiving) {
ASSERT(signaling_thread()->IsCurrent());
observer_->OnIceConnectionReceivingChange(receiving);
}
void PeerConnection::ChangeSignalingState(
PeerConnectionInterface::SignalingState signaling_state) {
signaling_state_ = signaling_state;

View File

@ -142,13 +142,14 @@ class PeerConnection : public PeerConnectionInterface,
uint32 ssrc) override;
void OnRemoveLocalVideoTrack(MediaStreamInterface* stream,
VideoTrackInterface* video_track) override;
virtual void OnRemoveLocalStream(MediaStreamInterface* stream);
void OnRemoveLocalStream(MediaStreamInterface* stream) override;
// Implements IceObserver
virtual void OnIceConnectionChange(IceConnectionState new_state);
virtual void OnIceGatheringChange(IceGatheringState new_state);
virtual void OnIceCandidate(const IceCandidateInterface* candidate);
virtual void OnIceComplete();
void OnIceConnectionChange(IceConnectionState new_state) override;
void OnIceGatheringChange(IceGatheringState new_state) override;
void OnIceCandidate(const IceCandidateInterface* candidate) override;
void OnIceComplete() override;
void OnIceConnectionReceivingChange(bool receiving) override;
// Signals from WebRtcSession.
void OnSessionStateChange(cricket::BaseSession* session,

View File

@ -411,6 +411,9 @@ class PeerConnectionObserver {
// All Ice candidates have been found.
virtual void OnIceComplete() {}
// Called when the ICE connection receiving status changes.
virtual void OnIceConnectionReceivingChange(bool receiving) {}
protected:
// Dtor protected as objects shouldn't be deleted via this interface.
~PeerConnectionObserver() {}

View File

@ -490,6 +490,7 @@ WebRtcSession::WebRtcSession(
mediastream_signaling_(mediastream_signaling),
ice_observer_(NULL),
ice_connection_state_(PeerConnectionInterface::kIceConnectionNew),
ice_connection_receiving_(true),
older_version_remote_peer_(false),
dtls_enabled_(false),
data_channel_type_(cricket::DCT_NONE),
@ -1406,6 +1407,31 @@ void WebRtcSession::OnTransportFailed(cricket::Transport* transport) {
SetIceConnectionState(PeerConnectionInterface::kIceConnectionFailed);
}
void WebRtcSession::OnTransportReceiving(cricket::Transport* transport) {
ASSERT(signaling_thread()->IsCurrent());
// The ice connection is considered receiving if at least one transport is
// receiving on any channels.
bool receiving = false;
for (const auto& kv : transport_proxies()) {
cricket::Transport* transport = kv.second->impl();
if (transport && transport->any_channel_receiving()) {
receiving = true;
break;
}
}
SetIceConnectionReceiving(receiving);
}
void WebRtcSession::SetIceConnectionReceiving(bool receiving) {
if (ice_connection_receiving_ == receiving) {
return;
}
ice_connection_receiving_ = receiving;
if (ice_observer_) {
ice_observer_->OnIceConnectionReceivingChange(receiving);
}
}
void WebRtcSession::OnTransportProxyCandidatesReady(
cricket::TransportProxy* proxy, const cricket::Candidates& candidates) {
ASSERT(signaling_thread()->IsCurrent());

View File

@ -84,6 +84,8 @@ class IceObserver {
public:
IceObserver() {}
// Called any time the IceConnectionState changes
// TODO(honghaiz): Change the name to OnIceConnectionStateChange so as to
// conform to the w3c standard.
virtual void OnIceConnectionChange(
PeerConnectionInterface::IceConnectionState new_state) {}
// Called any time the IceGatheringState changes
@ -96,6 +98,9 @@ class IceObserver {
// (via PeerConnectionObserver)
virtual void OnIceComplete() {}
// Called whenever the state changes between receiving and not receiving.
virtual void OnIceConnectionReceivingChange(bool receiving) {}
protected:
~IceObserver() {}
@ -298,6 +303,7 @@ class WebRtcSession : public cricket::BaseSession,
cricket::TransportProxy* proxy,
const cricket::Candidates& candidates);
virtual void OnCandidatesAllocationDone();
void OnTransportReceiving(cricket::Transport* transport) override;
// Enables media channels to allow sending of media.
void EnableChannels();
@ -342,6 +348,7 @@ class WebRtcSession : public cricket::BaseSession,
std::string BadStateErrMsg(State state);
void SetIceConnectionState(PeerConnectionInterface::IceConnectionState state);
void SetIceConnectionReceiving(bool receiving);
bool ValidateBundleSettings(const cricket::SessionDescription* desc);
bool HasRtcpMuxEnabled(const cricket::ContentInfo* content);
@ -381,6 +388,7 @@ class WebRtcSession : public cricket::BaseSession,
MediaStreamSignaling* mediastream_signaling_;
IceObserver* ice_observer_;
PeerConnectionInterface::IceConnectionState ice_connection_state_;
bool ice_connection_receiving_;
rtc::scoped_ptr<SessionDescriptionInterface> local_desc_;
rtc::scoped_ptr<SessionDescriptionInterface> remote_desc_;
// Candidates that arrived before the remote description was set.

View File

@ -897,6 +897,11 @@ public class PeerConnectionClient {
Log.d(TAG, "IceGatheringState: " + newState);
}
@Override
public void onIceConnectionReceivingChange(boolean receiving) {
Log.d(TAG, "IceConnectionReceiving changed to " + receiving);
}
@Override
public void onAddStream(final MediaStream stream){
executor.execute(new Runnable() {

View File

@ -118,6 +118,8 @@ DtlsTransportChannelWrapper::DtlsTransportChannelWrapper(
&DtlsTransportChannelWrapper::OnRouteChange);
channel_->SignalConnectionRemoved.connect(this,
&DtlsTransportChannelWrapper::OnConnectionRemoved);
channel_->SignalReceivingState.connect(this,
&DtlsTransportChannelWrapper::OnReceivingState);
}
DtlsTransportChannelWrapper::~DtlsTransportChannelWrapper() {
@ -456,6 +458,18 @@ void DtlsTransportChannelWrapper::OnWritableState(TransportChannel* channel) {
}
}
void DtlsTransportChannelWrapper::OnReceivingState(TransportChannel* channel) {
ASSERT(rtc::Thread::Current() == worker_thread_);
ASSERT(channel == channel_);
LOG_J(LS_VERBOSE, this)
<< "DTLSTransportChannelWrapper: channel receiving state changed to "
<< channel_->receiving();
if (dtls_state_ == STATE_NONE || dtls_state_ == STATE_OPEN) {
// Note: SignalReceivingState fired by set_receiving.
set_receiving(channel_->receiving());
}
}
void DtlsTransportChannelWrapper::OnReadPacket(
TransportChannel* channel, const char* data, size_t size,
const rtc::PacketTime& packet_time, int flags) {

View File

@ -213,6 +213,7 @@ class DtlsTransportChannelWrapper : public TransportChannelImpl {
void OnReadPacket(TransportChannel* channel, const char* data, size_t size,
const rtc::PacketTime& packet_time, int flags);
void OnReadyToSend(TransportChannel* channel);
void OnReceivingState(TransportChannel* channel);
void OnDtlsEvent(rtc::StreamInterface* stream_, int sig, int err);
bool SetupDtls();
bool MaybeStartDtls();

View File

@ -177,6 +177,10 @@ class FakeTransportChannel : public TransportChannelImpl,
SignalConnectionRemoved(this);
}
void SetReceiving(bool receiving) {
set_receiving(receiving);
}
virtual int SendPacket(const char* data, size_t len,
const rtc::PacketOptions& options, int flags) {
if (state_ != STATE_CONNECTED) {

View File

@ -25,6 +25,7 @@ namespace {
enum {
MSG_SORT = 1,
MSG_PING,
MSG_CHECK_RECEIVING
};
// When the socket is unwritable, we will use 10 Kbps (ignoring IP+UDP headers)
@ -40,6 +41,8 @@ static const uint32 UNWRITABLE_DELAY = 1000 * PING_PACKET_SIZE / 10000; // 50ms
// make sure it is pinged at this rate.
static const uint32 MAX_CURRENT_WRITABLE_DELAY = 900; // 2*WRITABLE_DELAY - bit
static const int MIN_CHECK_RECEIVING_DELAY = 50; // ms
// The minimum improvement in RTT that justifies a switch.
static const double kMinImprovement = 10;
@ -193,7 +196,9 @@ P2PTransportChannel::P2PTransportChannel(const std::string& content_name,
remote_ice_mode_(ICEMODE_FULL),
ice_role_(ICEROLE_UNKNOWN),
tiebreaker_(0),
remote_candidate_generation_(0) {
remote_candidate_generation_(0),
check_receiving_delay_(MIN_CHECK_RECEIVING_DELAY * 5),
receiving_timeout_(MIN_CHECK_RECEIVING_DELAY * 50) {
}
P2PTransportChannel::~P2PTransportChannel() {
@ -354,6 +359,12 @@ void P2PTransportChannel::SetRemoteIceMode(IceMode mode) {
remote_ice_mode_ = mode;
}
void P2PTransportChannel::set_receiving_timeout(int receiving_timeout_ms) {
receiving_timeout_ = receiving_timeout_ms;
check_receiving_delay_ =
std::max(MIN_CHECK_RECEIVING_DELAY, receiving_timeout_ / 10);
}
// Go into the state of processing candidates, and running in general
void P2PTransportChannel::Connect() {
ASSERT(worker_thread_ == rtc::Thread::Current());
@ -369,6 +380,9 @@ void P2PTransportChannel::Connect() {
// Start pinging as the ports come in.
thread()->Post(this, MSG_PING);
thread()->PostDelayed(
check_receiving_delay_, this, MSG_CHECK_RECEIVING);
}
// A new port is available, attempt to make connections for it
@ -1067,6 +1081,8 @@ void P2PTransportChannel::SwitchBestConnectionTo(Connection* conn) {
LOG_J(LS_INFO, this) << "New best connection: "
<< best_connection_->ToString();
SignalRouteChange(this, best_connection_->remote_candidate());
// When it just switched to a best connection, set receiving to true.
set_receiving(true);
} else {
LOG_J(LS_INFO, this) << "No best connection";
}
@ -1148,6 +1164,9 @@ void P2PTransportChannel::OnMessage(rtc::Message *pmsg) {
case MSG_PING:
OnPing();
break;
case MSG_CHECK_RECEIVING:
OnCheckReceiving();
break;
default:
ASSERT(false);
break;
@ -1176,6 +1195,19 @@ void P2PTransportChannel::OnPing() {
thread()->PostDelayed(delay, this, MSG_PING);
}
void P2PTransportChannel::OnCheckReceiving() {
// Check receiving only if the best connection has received data packets
// because we want to detect not receiving any packets only after the media
// have started flowing.
if (best_connection_ && best_connection_->recv_total_bytes() > 0) {
bool receiving = rtc::Time() <=
best_connection_->last_received() + receiving_timeout_;
set_receiving(receiving);
}
thread()->PostDelayed(check_receiving_delay_, this, MSG_CHECK_RECEIVING);
}
// Is the connection in a state for us to even consider pinging the other side?
// We consider a connection pingable even if it's not connected because that's
// how a TCP connection is kicked into reconnecting on the active side.

View File

@ -85,6 +85,10 @@ class P2PTransportChannel : public TransportChannelImpl,
const Connection* best_connection() const { return best_connection_; }
void set_incoming_only(bool value) { incoming_only_ = value; }
// Sets the receiving timeout in milliseconds.
// This also sets the check_receiving_delay proportionally.
void set_receiving_timeout(int receiving_timeout_ms);
// Note: This is only for testing purpose.
// |ports_| should not be changed from outside.
const std::vector<PortInterface*>& ports() { return ports_; }
@ -150,6 +154,9 @@ class P2PTransportChannel : public TransportChannelImpl,
return false;
}
int receiving_timeout() const { return receiving_timeout_; }
int check_receiving_delay() const { return check_receiving_delay_; }
// Helper method used only in unittest.
rtc::DiffServCodePoint DefaultDscpValue() const;
@ -213,6 +220,8 @@ class P2PTransportChannel : public TransportChannelImpl,
void OnSort();
void OnPing();
void OnCheckReceiving();
P2PTransport* transport_;
PortAllocator *allocator_;
rtc::Thread *worker_thread_;
@ -241,6 +250,9 @@ class P2PTransportChannel : public TransportChannelImpl,
uint64 tiebreaker_;
uint32 remote_candidate_generation_;
int check_receiving_delay_;
int receiving_timeout_;
DISALLOW_COPY_AND_ASSIGN(P2PTransportChannel);
};

View File

@ -1861,3 +1861,26 @@ TEST_F(P2PTransportChannelPingTest, ConnectionResurrection) {
ASSERT_TRUE(conn3 != nullptr);
EXPECT_EQ(conn3->remote_candidate().priority(), prflx_priority);
}
TEST_F(P2PTransportChannelPingTest, TestReceivingStateChange) {
cricket::FakePortAllocator pa(rtc::Thread::Current(), nullptr);
cricket::P2PTransportChannel ch("receiving state change", 1, nullptr, &pa);
PrepareChannel(&ch);
// Default receiving timeout and checking receiving delay should not be too
// small.
EXPECT_LE(1000, ch.receiving_timeout());
EXPECT_LE(200, ch.check_receiving_delay());
ch.set_receiving_timeout(500);
EXPECT_EQ(500, ch.receiving_timeout());
EXPECT_EQ(50, ch.check_receiving_delay());
ch.Connect();
ch.OnCandidate(CreateCandidate("1.1.1.1", 1, 1));
cricket::Connection* conn1 = WaitForConnectionTo(&ch, "1.1.1.1", 1);
ASSERT_TRUE(conn1 != nullptr);
conn1->ReceivedPing();
conn1->OnReadPacket("ABC", 3, rtc::CreatePacketTime(0));
EXPECT_TRUE_WAIT(ch.best_connection() != nullptr, 1000)
EXPECT_TRUE_WAIT(ch.receiving(), 1000);
EXPECT_TRUE_WAIT(!ch.receiving(), 1000);
}

View File

@ -1004,7 +1004,6 @@ void Connection::OnReadPacket(
if (read_state_ == STATE_READABLE) {
// readable means data from this address is acceptable
// Send it on!
last_data_received_ = rtc::Time();
recv_rate_tracker_.Update(size);
SignalReadPacket(this, data, size, packet_time);
@ -1445,6 +1444,11 @@ void Connection::OnMessage(rtc::Message *pmsg) {
delete this;
}
uint32 Connection::last_received() {
return std::max(last_data_received_,
std::max(last_ping_received_, last_ping_response_received_));
}
size_t Connection::recv_bytes_second() {
return recv_rate_tracker_.units_second();
}

View File

@ -540,6 +540,7 @@ class Connection : public rtc::MessageHandler,
// transmission. This connection will send STUN ping with USE-CANDIDATE
// attribute.
sigslot::signal1<Connection*> SignalUseCandidate;
// Invoked when Connection receives STUN error response with 487 code.
void HandleRoleConflictFromPeer();
@ -557,6 +558,10 @@ class Connection : public rtc::MessageHandler,
// |new_candidate|.
void MaybeUpdatePeerReflexiveCandidate(const Candidate& new_candidate);
// Returns the last received time of any data, stun request, or stun
// response in milliseconds
uint32 last_received();
protected:
enum { MSG_DELETE = 0, MSG_FIRST_AVAILABLE };

View File

@ -518,6 +518,8 @@ TransportProxy* BaseSession::GetOrCreateTransportProxy(
this, &BaseSession::OnTransportConnecting);
transport->SignalWritableState.connect(
this, &BaseSession::OnTransportWritable);
transport->SignalReceivingState.connect(
this, &BaseSession::OnTransportReceiving);
transport->SignalRequestSignaling.connect(
this, &BaseSession::OnTransportRequestSignaling);
transport->SignalRouteChange.connect(

View File

@ -368,6 +368,9 @@ class BaseSession : public sigslot::has_slots<>,
virtual void OnTransportReadable(Transport* transport) {
}
virtual void OnTransportReceiving(Transport* transport) {
}
// Called when a transport has found its steady-state connections.
virtual void OnTransportCompleted(Transport* transport) {
}

View File

@ -35,6 +35,7 @@ enum {
MSG_ROLECONFLICT,
MSG_COMPLETED,
MSG_FAILED,
MSG_RECEIVINGSTATE,
};
struct ChannelParams : public rtc::MessageData {
@ -128,6 +129,7 @@ Transport::Transport(rtc::Thread* signaling_thread,
destroyed_(false),
readable_(TRANSPORT_STATE_NONE),
writable_(TRANSPORT_STATE_NONE),
receiving_(TRANSPORT_STATE_NONE),
was_writable_(false),
connect_requested_(false),
ice_role_(ICEROLE_UNKNOWN),
@ -242,6 +244,7 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState);
impl->SignalRequestSignaling.connect(
this, &Transport::OnChannelRequestSignaling);
impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
@ -501,7 +504,7 @@ void Transport::OnChannelReadableState(TransportChannel* channel) {
void Transport::OnChannelReadableState_s() {
ASSERT(signaling_thread()->IsCurrent());
TransportState readable = GetTransportState_s(true);
TransportState readable = GetTransportState_s(TRANSPORT_READABLE_STATE);
if (readable_ != readable) {
readable_ = readable;
SignalReadableState(this);
@ -517,7 +520,7 @@ void Transport::OnChannelWritableState(TransportChannel* channel) {
void Transport::OnChannelWritableState_s() {
ASSERT(signaling_thread()->IsCurrent());
TransportState writable = GetTransportState_s(false);
TransportState writable = GetTransportState_s(TRANSPORT_WRITABLE_STATE);
if (writable_ != writable) {
was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
writable_ = writable;
@ -525,15 +528,41 @@ void Transport::OnChannelWritableState_s() {
}
}
TransportState Transport::GetTransportState_s(bool read) {
void Transport::OnChannelReceivingState(TransportChannel* channel) {
ASSERT(worker_thread()->IsCurrent());
signaling_thread()->Post(this, MSG_RECEIVINGSTATE);
}
void Transport::OnChannelReceivingState_s() {
ASSERT(signaling_thread()->IsCurrent());
TransportState receiving = GetTransportState_s(TRANSPORT_RECEIVING_STATE);
if (receiving_ != receiving) {
receiving_ = receiving;
SignalReceivingState(this);
}
}
TransportState Transport::GetTransportState_s(TransportStateType state_type) {
ASSERT(signaling_thread()->IsCurrent());
rtc::CritScope cs(&crit_);
bool any = false;
bool all = !channels_.empty();
for (const auto iter : channels_) {
bool b = (read ? iter.second->readable() :
iter.second->writable());
bool b = false;
switch (state_type) {
case TRANSPORT_READABLE_STATE:
b = iter.second->readable();
break;
case TRANSPORT_WRITABLE_STATE:
b = iter.second->writable();
break;
case TRANSPORT_RECEIVING_STATE:
b = iter.second->receiving();
break;
default:
ASSERT(false);
}
any |= b;
all &= b;
}
@ -900,6 +929,9 @@ void Transport::OnMessage(rtc::Message* msg) {
case MSG_WRITESTATE:
OnChannelWritableState_s();
break;
case MSG_RECEIVINGSTATE:
OnChannelReceivingState_s();
break;
case MSG_REQUESTSIGNALING:
OnChannelRequestSignaling_s();
break;

View File

@ -53,7 +53,7 @@ class TransportChannelImpl;
typedef std::vector<Candidate> Candidates;
// For "writable" and "readable", we need to differentiate between
// For "writable", "readable", and "receiving", we need to differentiate between
// none, all, and some.
enum TransportState {
TRANSPORT_STATE_NONE = 0,
@ -61,6 +61,14 @@ enum TransportState {
TRANSPORT_STATE_ALL
};
// When checking transport state, we need to differentiate between
// "readable", "writable", or "receiving" check.
enum TransportStateType {
TRANSPORT_READABLE_STATE = 0,
TRANSPORT_WRITABLE_STATE,
TRANSPORT_RECEIVING_STATE
};
// Stats that we can return about the connections for a transport channel.
// TODO(hta): Rename to ConnectionStats
struct ConnectionInfo {
@ -172,8 +180,14 @@ class Transport : public rtc::MessageHandler,
bool all_channels_writable() const {
return (writable_ == TRANSPORT_STATE_ALL);
}
bool any_channel_receiving() const {
return (receiving_ == TRANSPORT_STATE_SOME ||
receiving_ == TRANSPORT_STATE_ALL);
}
sigslot::signal1<Transport*> SignalReadableState;
sigslot::signal1<Transport*> SignalWritableState;
sigslot::signal1<Transport*> SignalReceivingState;
sigslot::signal1<Transport*> SignalCompleted;
sigslot::signal1<Transport*> SignalFailed;
@ -363,6 +377,9 @@ class Transport : public rtc::MessageHandler,
void OnChannelReadableState(TransportChannel* channel);
void OnChannelWritableState(TransportChannel* channel);
// Called when the receiving state of a channel changes.
void OnChannelReceivingState(TransportChannel* channel);
// Called when a channel requests signaling.
void OnChannelRequestSignaling(TransportChannelImpl* channel);
@ -393,6 +410,7 @@ class Transport : public rtc::MessageHandler,
void OnRemoteCandidate_w(const Candidate& candidate);
void OnChannelReadableState_s();
void OnChannelWritableState_s();
void OnChannelReceivingState_s();
void OnChannelRequestSignaling_s();
void OnConnecting_s();
void OnChannelRouteChange_s(const TransportChannel* channel,
@ -403,8 +421,9 @@ class Transport : public rtc::MessageHandler,
typedef void (TransportChannelImpl::* TransportChannelFunc)();
void CallChannels_w(TransportChannelFunc func);
// Computes the OR of the channel's read or write state (argument picks).
TransportState GetTransportState_s(bool read);
// Computes the AND and OR of the channel's read/write/receiving state
// (argument picks the operation).
TransportState GetTransportState_s(TransportStateType type);
void OnChannelCandidateReady_s();
@ -430,6 +449,7 @@ class Transport : public rtc::MessageHandler,
bool destroyed_;
TransportState readable_;
TransportState writable_;
TransportState receiving_;
bool was_writable_;
bool connect_requested_;
IceRole ice_role_;

View File

@ -321,3 +321,24 @@ TEST_F(TransportTest, TestGetStats) {
ASSERT_EQ(1U, stats.channel_stats.size());
EXPECT_EQ(1, stats.channel_stats[0].component);
}
TEST_F(TransportTest, TestReceivingStateChange) {
ASSERT_TRUE(SetupChannel());
channel_->SetConnectionCount(1);
transport_->ConnectChannels();
EXPECT_FALSE(transport_->any_channel_receiving());
channel_->SetReceiving(true);
EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100);
FakeTransportChannel* channel2 = CreateChannel(2);
channel2->SetReceiving(true);
EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100);
channel2->SetReceiving(false);
EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100);
// After both channels become not receiving, the transport receiving state
// becomes TRANSPORT_STATE_NONE.
channel_->SetReceiving(false);
EXPECT_TRUE_WAIT(!transport_->any_channel_receiving(), 100);
}

View File

@ -31,6 +31,14 @@ void TransportChannel::set_readable(bool readable) {
}
}
void TransportChannel::set_receiving(bool receiving) {
if (receiving_ == receiving) {
return;
}
receiving_ = receiving;
SignalReceivingState(this);
}
void TransportChannel::set_writable(bool writable) {
if (writable_ != writable) {
LOG_J(LS_VERBOSE, this) << "set_writable from:" << writable_ << " to "

View File

@ -46,7 +46,7 @@ class TransportChannel : public sigslot::has_slots<> {
explicit TransportChannel(const std::string& content_name, int component)
: content_name_(content_name),
component_(component),
readable_(false), writable_(false) {}
readable_(false), writable_(false), receiving_(false) {}
virtual ~TransportChannel() {}
// TODO(guoweis) - Make this pure virtual once all subclasses of
@ -67,10 +67,12 @@ class TransportChannel : public sigslot::has_slots<> {
// TransportManager.
bool readable() const { return readable_; }
bool writable() const { return writable_; }
bool receiving() const { return receiving_; }
sigslot::signal1<TransportChannel*> SignalReadableState;
sigslot::signal1<TransportChannel*> SignalWritableState;
// Emitted when the TransportChannel's ability to send has changed.
sigslot::signal1<TransportChannel*> SignalReadyToSend;
sigslot::signal1<TransportChannel*> SignalReceivingState;
// Attempts to send the given packet. The return value is < 0 on failure.
// TODO: Remove the default argument once channel code is updated.
@ -142,6 +144,9 @@ class TransportChannel : public sigslot::has_slots<> {
// Sets the writable state, signaling if necessary.
void set_writable(bool writable);
// Sets the receiving state, signaling if necessary.
void set_receiving(bool receiving);
private:
// Used mostly for debugging.
@ -149,6 +154,7 @@ class TransportChannel : public sigslot::has_slots<> {
int component_;
bool readable_;
bool writable_;
bool receiving_;
DISALLOW_COPY_AND_ASSIGN(TransportChannel);
};