Remove TaskQueue::PostAndReply as unused

Bug: webrtc:10191, webrtc:9728
Change-Id: Iaaa7c88bbbbfdd6e3e9bf5ab683bbdb2962a5cab
Reviewed-on: https://webrtc-review.googlesource.com/c/107202
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26202}
This commit is contained in:
Danil Chapovalov
2018-12-05 15:46:58 +01:00
committed by Commit Bot
parent a8f58f001e
commit 43f3982d6f
5 changed files with 9 additions and 356 deletions

View File

@ -44,16 +44,14 @@ using internal::AutoSetCurrentQueuePtr;
namespace {
static const char kQuit = 1;
static const char kRunTask = 2;
static const char kRunReplyTask = 3;
using Priority = TaskQueue::Priority;
// This ignores the SIGPIPE signal on the calling thread.
// This signal can be fired when trying to write() to a pipe that's being
// closed or while closing a pipe that's being written to.
// We can run into that situation (e.g. reply tasks that don't get a chance to
// run because the task queue is being deleted) so we ignore this signal and
// continue as normal.
// We can run into that situation so we ignore this signal and continue as
// normal.
// As a side note for this implementation, it would be great if we could safely
// restore the sigmask, but unfortunately the operation of restoring it, can
// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
@ -133,10 +131,6 @@ class TaskQueue::Impl : public RefCountInterface {
bool IsCurrent() const;
void PostTask(std::unique_ptr<QueuedTask> task);
void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue::Impl* reply_queue);
void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
private:
@ -145,14 +139,8 @@ class TaskQueue::Impl : public RefCountInterface {
static void RunTask(int fd, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
class ReplyTaskOwner;
class PostAndReplyTask;
class SetTimerTask;
typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
struct QueueContext;
TaskQueue* const queue_;
int wakeup_pipe_in_ = -1;
@ -162,8 +150,6 @@ class TaskQueue::Impl : public RefCountInterface {
PlatformThread thread_;
rtc::CriticalSection pending_lock_;
std::list<std::unique_ptr<QueuedTask>> pending_ RTC_GUARDED_BY(pending_lock_);
std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
RTC_GUARDED_BY(pending_lock_);
};
struct TaskQueue::Impl::QueueContext {
@ -174,90 +160,6 @@ struct TaskQueue::Impl::QueueContext {
std::list<TimerEvent*> pending_timers_;
};
// Posting a reply task is tricky business. This class owns the reply task
// and a reference to it is held by both the reply queue and the first task.
// Here's an outline of what happens when dealing with a reply task.
// * The ReplyTaskOwner owns the |reply_| task.
// * One ref owned by PostAndReplyTask
// * One ref owned by the reply TaskQueue
// * ReplyTaskOwner has a flag |run_task_| initially set to false.
// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
// * After successfully running the original |task_|, PostAndReplyTask() calls
// set_should_run_task(). This sets |run_task_| to true.
// * In PostAndReplyTask's dtor:
// * It releases its reference to ReplyTaskOwner (important to do this first).
// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
// * PostAndReplyTask doesn't care if write() fails, but when it does:
// * The reply queue is gone.
// * ReplyTaskOwner has already been deleted and the reply task too.
// * If write() succeeds:
// * ReplyQueue receives the kRunReplyTask message
// * Goes through all pending tasks, finding the first that HasOneRef()
// * Calls ReplyTaskOwner::Run()
// * if set_should_run_task() was called, the reply task will be run
// * Release the reference to ReplyTaskOwner
// * ReplyTaskOwner and associated |reply_| are deleted.
class TaskQueue::Impl::ReplyTaskOwner {
public:
ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
: reply_(std::move(reply)) {}
void Run() {
RTC_DCHECK(reply_);
if (run_task_) {
if (!reply_->Run())
reply_.release();
}
reply_.reset();
}
void set_should_run_task() {
RTC_DCHECK(!run_task_);
run_task_ = true;
}
private:
std::unique_ptr<QueuedTask> reply_;
bool run_task_ = false;
};
class TaskQueue::Impl::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue::Impl* reply_queue,
int reply_pipe)
: task_(std::move(task)),
reply_pipe_(reply_pipe),
reply_task_owner_(
new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
reply_queue->PrepareReplyTask(reply_task_owner_);
}
~PostAndReplyTask() override {
reply_task_owner_ = nullptr;
IgnoreSigPipeSignalOnCurrentThread();
// Send a signal to the reply queue that the reply task can run now.
// Depending on whether |set_should_run_task()| was called by the
// PostAndReplyTask(), the reply task may or may not actually run.
// In either case, it will be deleted.
char message = kRunReplyTask;
RTC_UNUSED(write(reply_pipe_, &message, sizeof(message)));
}
private:
bool Run() override {
if (!task_->Run())
task_.release();
reply_task_owner_->set_should_run_task();
return true;
}
std::unique_ptr<QueuedTask> task_;
int reply_pipe_;
scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
};
class TaskQueue::Impl::SetTimerTask : public QueuedTask {
public:
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
@ -396,15 +298,6 @@ void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
}
}
void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue::Impl* reply_queue) {
std::unique_ptr<QueuedTask> wrapper_task(
new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
reply_queue->wakeup_pipe_in_));
PostTask(std::move(wrapper_task));
}
// static
void TaskQueue::Impl::ThreadMain(void* context) {
TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
@ -448,22 +341,6 @@ void TaskQueue::Impl::OnWakeup(int socket,
task.release();
break;
}
case kRunReplyTask: {
scoped_refptr<ReplyTaskOwnerRef> reply_task;
{
CritScope lock(&ctx->queue->pending_lock_);
for (auto it = ctx->queue->pending_replies_.begin();
it != ctx->queue->pending_replies_.end(); ++it) {
if ((*it)->HasOneRef()) {
reply_task = std::move(*it);
ctx->queue->pending_replies_.erase(it);
break;
}
}
}
reply_task->Run();
break;
}
default:
RTC_NOTREACHED();
break;
@ -488,13 +365,6 @@ void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT
delete timer;
}
void TaskQueue::Impl::PrepareReplyTask(
scoped_refptr<ReplyTaskOwnerRef> reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
pending_replies_.push_back(std::move(reply_task));
}
TaskQueue::TaskQueue(const char* queue_name, Priority priority)
: impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
}
@ -515,19 +385,6 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
return TaskQueue::impl_->PostTask(std::move(task));
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
reply_queue->impl_.get());
}
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply) {
return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
impl_.get());
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);