diff --git a/logging/BUILD.gn b/logging/BUILD.gn index 18189d51e4..d450f624e2 100644 --- a/logging/BUILD.gn +++ b/logging/BUILD.gn @@ -293,6 +293,7 @@ if (rtc_enable_protobuf) { "rtc_event_log/logged_events.h", "rtc_event_log/rtc_event_log_parser.cc", "rtc_event_log/rtc_event_log_parser.h", + "rtc_event_log/rtc_event_processor.cc", "rtc_event_log/rtc_event_processor.h", ] diff --git a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc index c446212be4..62d6bfc6d1 100644 --- a/logging/rtc_event_log/rtc_event_log2rtp_dump.cc +++ b/logging/rtc_event_log/rtc_event_log2rtp_dump.cc @@ -236,21 +236,13 @@ int main(int argc, char* argv[]) { parsed_stream.GetMediaType(stream.ssrc, webrtc::kIncomingPacket); if (ShouldSkipStream(media_type, stream.ssrc, ssrc_filter)) continue; - auto rtp_view = absl::make_unique< - webrtc::ProcessableEventList>( - stream.incoming_packets.begin(), stream.incoming_packets.end(), - handle_rtp); - event_processor.AddEvents(std::move(rtp_view)); + event_processor.AddEvents(stream.incoming_packets, handle_rtp); } // Note that |packet_ssrc| is the sender SSRC. An RTCP message may contain // report blocks for many streams, thus several SSRCs and they don't // necessarily have to be of the same media type. We therefore don't // support filtering of RTCP based on SSRC and media type. - auto rtcp_view = absl::make_unique< - webrtc::ProcessableEventList>( - parsed_stream.incoming_rtcp_packets().begin(), - parsed_stream.incoming_rtcp_packets().end(), handle_rtcp); - event_processor.AddEvents(std::move(rtcp_view)); + event_processor.AddEvents(parsed_stream.incoming_rtcp_packets(), handle_rtcp); event_processor.ProcessEventsInOrder(); diff --git a/logging/rtc_event_log/rtc_event_log_parser.h b/logging/rtc_event_log/rtc_event_log_parser.h index f7e896dc11..dc23944057 100644 --- a/logging/rtc_event_log/rtc_event_log_parser.h +++ b/logging/rtc_event_log/rtc_event_log_parser.h @@ -105,6 +105,9 @@ class PacketIterator { T& operator*() { return *reinterpret_cast(ptr_); } const T& operator*() const { return *reinterpret_cast(ptr_); } + T* operator->() { return reinterpret_cast(ptr_); } + const T* operator->() const { return reinterpret_cast(ptr_); } + private: PacketIterator(typename std::conditional::value, const void*, @@ -155,6 +158,10 @@ class PacketView { return PacketView(ptr, num_elements, offset, sizeof(U)); } + using value_type = T; + using reference = value_type&; + using const_reference = const value_type&; + using iterator = PacketIterator; using const_iterator = PacketIterator; using reverse_iterator = std::reverse_iterator; diff --git a/logging/rtc_event_log/rtc_event_processor.cc b/logging/rtc_event_log/rtc_event_processor.cc new file mode 100644 index 0000000000..804e283851 --- /dev/null +++ b/logging/rtc_event_log/rtc_event_processor.cc @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "logging/rtc_event_log/rtc_event_processor.h" + +namespace webrtc { + +RtcEventProcessor::RtcEventProcessor() = default; + +RtcEventProcessor::~RtcEventProcessor() = default; +void RtcEventProcessor::ProcessEventsInOrder() { + // |event_lists_| is a min-heap of lists ordered by the timestamp of the + // first element in the list. We therefore process the first element of the + // first list, then reinsert the remainder of that list into the heap + // if the list still contains unprocessed elements. + while (!event_lists_.empty()) { + event_lists_.front()->ProcessNext(); + std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp); + if (event_lists_.back()->IsEmpty()) { + event_lists_.pop_back(); + } else { + std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp); + } + } +} + +bool RtcEventProcessor::Cmp(const RtcEventProcessor::ListPtrType& a, + const RtcEventProcessor::ListPtrType& b) { + int64_t time_diff = a->GetNextTime() - b->GetNextTime(); + if (time_diff == 0) + return a->GetTieBreaker() > b->GetTieBreaker(); + return time_diff > 0; +} + +} // namespace webrtc diff --git a/logging/rtc_event_log/rtc_event_processor.h b/logging/rtc_event_log/rtc_event_processor.h index 9eee4815f2..dbbdff6730 100644 --- a/logging/rtc_event_log/rtc_event_processor.h +++ b/logging/rtc_event_log/rtc_event_processor.h @@ -17,6 +17,7 @@ #include #include +#include "absl/memory/memory.h" #include "rtc_base/checks.h" #include "rtc_base/function_view.h" @@ -27,6 +28,7 @@ namespace webrtc { // in the merge-sort algorithm but without copying the elements or modifying the // lists. +namespace event_processor_impl { // Interface to allow "merging" lists of different types. ProcessNext() // processes the next unprocesses element in the list. IsEmpty() checks if all // elements have been processed. GetNextTime returns the timestamp of the next @@ -37,22 +39,19 @@ class ProcessableEventListInterface { virtual void ProcessNext() = 0; virtual bool IsEmpty() const = 0; virtual int64_t GetNextTime() const = 0; + virtual int GetTieBreaker() const = 0; }; // ProcessableEventList encapsulates a list of events and a function that will // be applied to each element of the list. -template +template class ProcessableEventList : public ProcessableEventListInterface { public: - // N.B. |f| is not owned by ProcessableEventList. The caller must ensure that - // the function object or lambda outlives ProcessableEventList and - // RtcEventProcessor. The same thing applies to the iterators (begin, end); - // the vector must outlive ProcessableEventList and must not be modified until - // processing has finished. - ProcessableEventList(typename std::vector::const_iterator begin, - typename std::vector::const_iterator end, - rtc::FunctionView f) - : begin_(begin), end_(end), f_(f) {} + ProcessableEventList(Iterator begin, + Iterator end, + std::function f, + int tie_breaker) + : begin_(begin), end_(end), f_(f), tie_breaker_(tie_breaker) {} void ProcessNext() override { RTC_DCHECK(!IsEmpty()); @@ -66,12 +65,15 @@ class ProcessableEventList : public ProcessableEventListInterface { RTC_DCHECK(!IsEmpty()); return begin_->log_time_us(); } + int GetTieBreaker() const override { return tie_breaker_; } private: - typename std::vector::const_iterator begin_; - typename std::vector::const_iterator end_; - rtc::FunctionView f_; + Iterator begin_; + Iterator end_; + std::function f_; + int tie_breaker_; }; +} // namespace event_processor_impl // Helper class used to "merge" two or more lists of ordered RtcEventLog events // so that they can be treated as a single ordered list. Since the individual @@ -81,57 +83,47 @@ class ProcessableEventList : public ProcessableEventListInterface { // Usage example: // ParsedRtcEventLogNew log; // auto incoming_handler = [] (LoggedRtcpPacketIncoming elem) { ... }; -// auto incoming_rtcp = -// absl::make_unique>( -// log.incoming_rtcp_packets().begin(), -// log.incoming_rtcp_packets().end(), -// incoming_handler); // auto outgoing_handler = [] (LoggedRtcpPacketOutgoing elem) { ... }; -// auto outgoing_rtcp = -// absl::make_unique>( -// log.outgoing_rtcp_packets().begin(), -// log.outgoing_rtcp_packets().end(), -// outgoing_handler); // // RtcEventProcessor processor; -// processor.AddEvents(std::move(incoming_rtcp)); -// processor.AddEvents(std::move(outgoing_rtcp)); +// processor.AddEvents(log.incoming_rtcp_packets(), +// incoming_handler); +// processor.AddEvents(log.outgoing_rtcp_packets(), +// outgoing_handler); // processor.ProcessEventsInOrder(); class RtcEventProcessor { public: + RtcEventProcessor(); + ~RtcEventProcessor(); // The elements of each list is processed in the index order. To process all - // elements in all lists in timestamp order, each lists need to be sorted in - // timestamp order prior to insertion. Otherwise, - void AddEvents(std::unique_ptr events) { - if (!events->IsEmpty()) { - event_lists_.push_back(std::move(events)); - std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp); - } + // elements in all lists in timestamp order, each list needs to be sorted in + // timestamp order prior to insertion. + // N.B. |iterable| is not owned by RtcEventProcessor. The caller must ensure + // that the iterable outlives RtcEventProcessor and it must not be modified + // until processing has finished. + template + void AddEvents( + const Iterable& iterable, + std::function handler) { + if (iterable.begin() == iterable.end()) + return; + event_lists_.push_back( + absl::make_unique>( + iterable.begin(), iterable.end(), handler, + insertion_order_index_++)); + std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp); } - void ProcessEventsInOrder() { - // |event_lists_| is a min-heap of lists ordered by the timestamp of the - // first element in the list. We therefore process the first element of the - // first list, then reinsert the remainder of that list into the heap - // if the list still contains unprocessed elements. - while (!event_lists_.empty()) { - event_lists_.front()->ProcessNext(); - std::pop_heap(event_lists_.begin(), event_lists_.end(), Cmp); - if (event_lists_.back()->IsEmpty()) { - event_lists_.pop_back(); - } else { - std::push_heap(event_lists_.begin(), event_lists_.end(), Cmp); - } - } - } + void ProcessEventsInOrder(); private: - using ListPtrType = std::unique_ptr; + using ListPtrType = + std::unique_ptr; + int insertion_order_index_ = 0; std::vector event_lists_; // Comparison function to make |event_lists_| into a min heap. - static bool Cmp(const ListPtrType& a, const ListPtrType& b) { - return a->GetNextTime() > b->GetNextTime(); - } + static bool Cmp(const ListPtrType& a, const ListPtrType& b); }; } // namespace webrtc diff --git a/logging/rtc_event_log/rtc_event_processor_unittest.cc b/logging/rtc_event_log/rtc_event_processor_unittest.cc index dac3bb61a5..9f33540fc8 100644 --- a/logging/rtc_event_log/rtc_event_processor_unittest.cc +++ b/logging/rtc_event_log/rtc_event_processor_unittest.cc @@ -33,8 +33,6 @@ std::vector CreateEventList( return v; } -using OrderedEventView = ProcessableEventList; - std::vector> CreateRandomEventLists(size_t num_lists, size_t num_elements, uint64_t seed) { Random prng(seed); @@ -58,8 +56,7 @@ TEST(RtcEventProcessor, EmptyList) { std::vector events; RtcEventProcessor processor; - processor.AddEvents(absl::make_unique( - events.begin(), events.end(), not_called)); + processor.AddEvents(events, not_called); processor.ProcessEventsInOrder(); // Don't crash but do nothing. } @@ -69,8 +66,7 @@ TEST(RtcEventProcessor, OneList) { std::vector events(CreateEventList({1, 2, 3, 4})); RtcEventProcessor processor; - processor.AddEvents( - absl::make_unique(events.begin(), events.end(), f)); + processor.AddEvents(events, f); processor.ProcessEventsInOrder(); std::vector expected_results{1, 2, 3, 4}; @@ -87,10 +83,8 @@ TEST(RtcEventProcessor, MergeTwoLists) { std::vector events1(CreateEventList({1, 2, 4, 7, 8, 9})); std::vector events2(CreateEventList({3, 5, 6, 10})); RtcEventProcessor processor; - processor.AddEvents( - absl::make_unique(events1.begin(), events1.end(), f)); - processor.AddEvents( - absl::make_unique(events2.begin(), events2.end(), f)); + processor.AddEvents(events1, f); + processor.AddEvents(events2, f); processor.ProcessEventsInOrder(); std::vector expected_results{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; @@ -107,10 +101,8 @@ TEST(RtcEventProcessor, MergeTwoListsWithDuplicatedElements) { std::vector events1(CreateEventList({1, 2, 2, 3, 5, 5})); std::vector events2(CreateEventList({1, 3, 4, 4})); RtcEventProcessor processor; - processor.AddEvents( - absl::make_unique(events1.begin(), events1.end(), f)); - processor.AddEvents( - absl::make_unique(events2.begin(), events2.end(), f)); + processor.AddEvents(events1, f); + processor.AddEvents(events2, f); processor.ProcessEventsInOrder(); std::vector expected_results{1, 1, 2, 2, 3, 3, 4, 4, 5, 5}; @@ -132,8 +124,7 @@ TEST(RtcEventProcessor, MergeManyLists) { RTC_DCHECK_EQ(lists.size(), kNumLists); RtcEventProcessor processor; for (const auto& list : lists) { - processor.AddEvents( - absl::make_unique(list.begin(), list.end(), f)); + processor.AddEvents(list, f); } processor.ProcessEventsInOrder(); @@ -157,10 +148,8 @@ TEST(RtcEventProcessor, DifferentTypes) { std::vector events1{LoggedStartEvent(2000)}; std::vector events2{LoggedStopEvent(1000)}; RtcEventProcessor processor; - processor.AddEvents(absl::make_unique>( - events1.begin(), events1.end(), f1)); - processor.AddEvents(absl::make_unique>( - events2.begin(), events2.end(), f2)); + processor.AddEvents(events1, f1); + processor.AddEvents(events2, f2); processor.ProcessEventsInOrder(); std::vector expected_results{1, 2}; diff --git a/modules/audio_coding/neteq/tools/rtc_event_log_source.cc b/modules/audio_coding/neteq/tools/rtc_event_log_source.cc index 446a1a9ee2..d72f9cbcb5 100644 --- a/modules/audio_coding/neteq/tools/rtc_event_log_source.cc +++ b/modules/audio_coding/neteq/tools/rtc_event_log_source.cc @@ -109,21 +109,13 @@ bool RtcEventLogSource::OpenFile(const std::string& file_name, if (ShouldSkipStream(media_type, rtp_packets.ssrc, ssrc_filter)) { continue; } - auto rtp_view = absl::make_unique< - webrtc::ProcessableEventList>( - rtp_packets.incoming_packets.begin(), - rtp_packets.incoming_packets.end(), handle_rtp_packet); - event_processor.AddEvents(std::move(rtp_view)); + event_processor.AddEvents(rtp_packets.incoming_packets, handle_rtp_packet); } for (const auto& audio_playouts : parsed_log.audio_playout_events()) { if (ssrc_filter.has_value() && audio_playouts.first != *ssrc_filter) continue; - auto audio_view = absl::make_unique< - webrtc::ProcessableEventList>( - audio_playouts.second.begin(), audio_playouts.second.end(), - handle_audio_playout); - event_processor.AddEvents(std::move(audio_view)); + event_processor.AddEvents(audio_playouts.second, handle_audio_playout); } // Fills in rtp_packets_ and audio_outputs_.