diff --git a/call/call.cc b/call/call.cc index 6f407fc0f0..dd2a1261fd 100644 --- a/call/call.cc +++ b/call/call.cc @@ -268,6 +268,10 @@ class Call final : public webrtc::Call, DeliveryStatus DeliverPacket(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) override; + void DeliverPacketAsync(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us, + PacketCallback callback) override; // Implements RecoveredPacketReceiver. void OnRecoveredPacket(const uint8_t* packet, size_t length) override; @@ -321,6 +325,7 @@ class Call final : public webrtc::Call, Clock* const clock_; TaskQueueFactory* const task_queue_factory_; TaskQueueBase* const worker_thread_; + RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_; const int num_cpu_cores_; const rtc::scoped_refptr module_process_thread_; @@ -625,6 +630,8 @@ Call::Call(Clock* clock, RTC_DCHECK(config.trials != nullptr); RTC_DCHECK(worker_thread_->IsCurrent()); + network_thread_.Detach(); + // Do not remove this call; it is here to convince the compiler that the // WebRTC source timestamp string needs to be in the final binary. LoadWebRTCVersionInRegister(); @@ -1418,6 +1425,30 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket( return DeliverRtp(media_type, std::move(packet), packet_time_us); } +void Call::DeliverPacketAsync(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us, + PacketCallback callback) { + RTC_DCHECK_RUN_ON(&network_thread_); + + TaskQueueBase* network_thread = rtc::Thread::Current(); + RTC_DCHECK(network_thread); + + worker_thread_->PostTask(ToQueuedTask( + task_safety_, [this, network_thread, media_type, p = std::move(packet), + packet_time_us, cb = std::move(callback)] { + RTC_DCHECK_RUN_ON(worker_thread_); + DeliveryStatus status = DeliverPacket(media_type, p, packet_time_us); + if (cb) { + network_thread->PostTask( + ToQueuedTask([cb = std::move(cb), status, media_type, + p = std::move(p), packet_time_us]() { + cb(status, media_type, std::move(p), packet_time_us); + })); + } + })); +} + void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { RTC_DCHECK_RUN_ON(worker_thread_); RtpPacketReceived parsed_packet; diff --git a/call/packet_receiver.h b/call/packet_receiver.h index df57d8f4f4..f18ee65c70 100644 --- a/call/packet_receiver.h +++ b/call/packet_receiver.h @@ -11,8 +11,10 @@ #define CALL_PACKET_RECEIVER_H_ #include +#include #include #include +#include #include #include "api/media_types.h" @@ -28,6 +30,32 @@ class PacketReceiver { DELIVERY_PACKET_ERROR, }; + // Definition of the callback to execute when packet delivery is complete. + // The callback will be issued on the same thread as called DeliverPacket. + typedef std::function< + void(DeliveryStatus, MediaType, rtc::CopyOnWriteBuffer, int64_t)> + PacketCallback; + + // Asynchronously handle packet delivery and report back to the caller when + // delivery of the packet has completed. + // Note that if the packet is invalid or can be processed without the need of + // asynchronous operations that the |callback| may have been called before + // the function returns. + // TODO(bugs.webrtc.org/11993): This function is meant to be called on the + // network thread exclusively but while the code is being updated to align + // with those goals, it may be called either on the worker or network threads. + // Update docs etc when the work has been completed. Once we're done with the + // updates, we might be able to go back to returning the status from this + // function instead of having to report it via a callback. + virtual void DeliverPacketAsync(MediaType media_type, + rtc::CopyOnWriteBuffer packet, + int64_t packet_time_us, + PacketCallback callback) { + DeliveryStatus status = DeliverPacket(media_type, packet, packet_time_us); + if (callback) + callback(status, media_type, std::move(packet), packet_time_us); + } + virtual DeliveryStatus DeliverPacket(MediaType media_type, rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) = 0;