Implement periodic cancelable task for task queue

using shared pointer to boolean flag.

Bug: None
Change-Id: I9d7ad7d7b187fefa7daa0247a1379e1ddd7e2b24
Reviewed-on: https://webrtc-review.googlesource.com/96300
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24511}
This commit is contained in:
Danil Chapovalov
2018-08-31 13:47:59 +02:00
committed by Commit Bot
parent d05916dccf
commit a10d164b4a
5 changed files with 483 additions and 0 deletions

View File

@ -526,6 +526,26 @@ rtc_source_set("rtc_task_queue_api") {
]
}
rtc_source_set("rtc_cancelable_task") {
sources = [
"cancelable_periodic_task.h",
"cancelable_task_handle.cc",
"cancelable_task_handle.h",
]
deps = [
":checks",
":logging",
":macromagic",
":ptr_util",
":refcount",
":rtc_task_queue",
":safe_conversions",
":sequenced_task_checker",
":thread_checker",
"//third_party/abseil-cpp/absl/memory",
]
}
if (rtc_enable_libevent) {
rtc_source_set("rtc_task_queue_libevent") {
visibility = [ ":rtc_task_queue_impl" ]
@ -1178,12 +1198,14 @@ if (rtc_include_tests) {
testonly = true
sources = [
"cancelable_periodic_task_unittest.cc",
"task_queue_unittest.cc",
]
deps = [
":rtc_base_approved",
":rtc_base_tests_main",
":rtc_base_tests_utils",
":rtc_cancelable_task",
":rtc_task_queue",
":rtc_task_queue_for_test",
"../test:test_support",

View File

@ -0,0 +1,83 @@
/*
* Copyright 2018 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_BASE_CANCELABLE_PERIODIC_TASK_H_
#define RTC_BASE_CANCELABLE_PERIODIC_TASK_H_
#include <memory>
#include <type_traits>
#include <utility>
#include "absl/memory/memory.h"
#include "rtc_base/cancelable_task_handle.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
namespace rtc {
namespace cancelable_periodic_task_internal {
// CancelablePeriodicTask runs a closure multiple times with delay decided
// by the return value of the closure itself.
// The task can be canceled with the handle returned by GetCancelationHandle().
// Note that the task can only be canceled on the task queue where it runs.
template <typename Closure>
class CancelablePeriodicTask final : public BaseCancelableTask {
public:
// |closure| should return time in ms until next run.
explicit CancelablePeriodicTask(Closure&& closure)
: closure_(std::forward<Closure>(closure)) {}
CancelablePeriodicTask(const CancelablePeriodicTask&) = delete;
CancelablePeriodicTask& operator=(const CancelablePeriodicTask&) = delete;
~CancelablePeriodicTask() override = default;
private:
bool Run() override {
// See QueuedTask::Run documentaion for return values meaning.
if (BaseCancelableTask::Canceled())
return true; // Caller retains ownership of `this`, and will destroy it.
// Run the actual task.
auto delay = closure_();
// Convert closure_() return type into uint32_t.
uint32_t delay_ms = 0;
if (rtc::IsValueInRangeForNumericType<uint32_t>(delay)) {
delay_ms = static_cast<uint32_t>(delay);
} else {
// Log and recover in production.
RTC_LOG(LS_ERROR) << "Invalid delay until next run: " << delay;
delay_ms = rtc::saturated_cast<uint32_t>(delay);
// But crash in debug.
RTC_DCHECK(false);
}
// Reschedule.
auto owned_task = absl::WrapUnique(this);
if (delay_ms == 0)
TaskQueue::Current()->PostTask(std::move(owned_task));
else
TaskQueue::Current()->PostDelayedTask(std::move(owned_task), delay_ms);
return false; // Caller will release ownership of `this`.
}
Closure closure_;
};
} // namespace cancelable_periodic_task_internal
template <typename Closure>
std::unique_ptr<BaseCancelableTask> CreateCancelablePeriodicTask(
Closure&& closure) {
using CleanedClosure = typename std::remove_cv<
typename std::remove_reference<Closure>::type>::type;
return absl::make_unique<cancelable_periodic_task_internal::
CancelablePeriodicTask<CleanedClosure>>(
std::forward<CleanedClosure>(closure));
}
} // namespace rtc
#endif // RTC_BASE_CANCELABLE_PERIODIC_TASK_H_

View File

@ -0,0 +1,228 @@
/*
* Copyright 2018 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/cancelable_periodic_task.h"
#include "rtc_base/event.h"
#include "test/gmock.h"
namespace {
using ::testing::AtLeast;
using ::testing::Invoke;
using ::testing::NiceMock;
using ::testing::MockFunction;
using ::testing::Return;
constexpr int kTimeoutMs = 1000;
class MockClosure {
public:
MOCK_METHOD0(Call, int());
MOCK_METHOD0(Delete, void());
};
class MoveOnlyClosure {
public:
explicit MoveOnlyClosure(MockClosure* mock) : mock_(mock) {}
MoveOnlyClosure(const MoveOnlyClosure&) = delete;
MoveOnlyClosure(MoveOnlyClosure&& other) : mock_(other.mock_) {
other.mock_ = nullptr;
}
~MoveOnlyClosure() {
if (mock_)
mock_->Delete();
}
int operator()() { return mock_->Call(); }
private:
MockClosure* mock_;
};
class CopyableClosure {
public:
explicit CopyableClosure(MockClosure* mock) : mock_(mock) {}
CopyableClosure(const CopyableClosure& other) : mock_(other.mock_) {}
~CopyableClosure() {
if (mock_) {
mock_->Delete();
mock_ = nullptr;
}
}
int operator()() { return mock_->Call(); }
private:
MockClosure* mock_;
};
TEST(CancelablePeriodicTaskTest, CanCallCancelOnEmptyHandle) {
rtc::CancelableTaskHandle handle;
handle.Cancel();
}
TEST(CancelablePeriodicTaskTest, CancelTaskBeforeItRuns) {
rtc::Event done(false, false);
MockClosure mock;
EXPECT_CALL(mock, Call).Times(0);
EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); }));
rtc::TaskQueue task_queue("queue");
auto task = rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&mock));
rtc::CancelableTaskHandle handle = task->GetCancellationHandle();
task_queue.PostTask([handle] { handle.Cancel(); });
task_queue.PostTask(std::move(task));
EXPECT_TRUE(done.Wait(kTimeoutMs));
}
TEST(CancelablePeriodicTaskTest, CancelDelayedTaskBeforeItRuns) {
rtc::Event done(false, false);
MockClosure mock;
EXPECT_CALL(mock, Call).Times(0);
EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); }));
rtc::TaskQueue task_queue("queue");
auto task = rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&mock));
rtc::CancelableTaskHandle handle = task->GetCancellationHandle();
task_queue.PostDelayedTask(std::move(task), 100);
task_queue.PostTask([handle] { handle.Cancel(); });
EXPECT_TRUE(done.Wait(kTimeoutMs));
}
TEST(CancelablePeriodicTaskTest, CancelTaskAfterItRuns) {
rtc::Event done(false, false);
MockClosure mock;
EXPECT_CALL(mock, Call).WillOnce(Return(100));
EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); }));
rtc::TaskQueue task_queue("queue");
auto task = rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&mock));
rtc::CancelableTaskHandle handle = task->GetCancellationHandle();
task_queue.PostTask(std::move(task));
task_queue.PostTask([handle] { handle.Cancel(); });
EXPECT_TRUE(done.Wait(kTimeoutMs));
}
TEST(CancelablePeriodicTaskTest, ZeroReturnValueRepostsTheTask) {
NiceMock<MockClosure> closure;
rtc::Event done(false, false);
EXPECT_CALL(closure, Call()).WillOnce(Return(0)).WillOnce(Invoke([&done] {
done.Set();
return kTimeoutMs;
}));
rtc::TaskQueue task_queue("queue");
task_queue.PostTask(
rtc::CreateCancelablePeriodicTask(MoveOnlyClosure(&closure)));
EXPECT_TRUE(done.Wait(kTimeoutMs));
}
TEST(CancelablePeriodicTaskTest, StartPeriodicTask) {
MockFunction<int()> closure;
rtc::Event done(false, false);
EXPECT_CALL(closure, Call())
.WillOnce(Return(20))
.WillOnce(Return(20))
.WillOnce(Invoke([&done] {
done.Set();
return kTimeoutMs;
}));
rtc::TaskQueue task_queue("queue");
task_queue.PostTask(
rtc::CreateCancelablePeriodicTask(closure.AsStdFunction()));
EXPECT_TRUE(done.Wait(kTimeoutMs));
}
// Validates perfect forwarding doesn't keep reference to deleted copy.
TEST(CancelablePeriodicTaskTest, CreateWithCopyOfAClosure) {
rtc::Event done(false, false);
MockClosure mock;
EXPECT_CALL(mock, Call).WillOnce(Invoke([&done] {
done.Set();
return kTimeoutMs;
}));
EXPECT_CALL(mock, Delete).Times(AtLeast(2));
CopyableClosure closure(&mock);
std::unique_ptr<rtc::BaseCancelableTask> task;
{
CopyableClosure copy = closure;
task = rtc::CreateCancelablePeriodicTask(copy);
}
rtc::TaskQueue task_queue("queue");
task_queue.PostTask(std::move(task));
EXPECT_TRUE(done.Wait(kTimeoutMs));
}
TEST(CancelablePeriodicTaskTest, DeletingHandleDoesntStopTheTask) {
rtc::Event run(false, false);
rtc::TaskQueue task_queue("queue");
auto task = rtc::CreateCancelablePeriodicTask(([&] {
run.Set();
return kTimeoutMs;
}));
rtc::CancelableTaskHandle handle = task->GetCancellationHandle();
handle = {}; // delete the handle.
task_queue.PostTask(std::move(task));
EXPECT_TRUE(run.Wait(kTimeoutMs));
}
// Example to test there are no thread races and use after free for suggested
// typical usage of the CancelablePeriodicTask
TEST(CancelablePeriodicTaskTest, Example) {
class ObjectOnTaskQueue {
public:
void DoPeriodicTask() {}
int TimeUntilNextRunMs() { return 100; }
rtc::CancelableTaskHandle StartPeriodicTask(rtc::TaskQueue* task_queue) {
auto periodic_task = rtc::CreateCancelablePeriodicTask([this] {
DoPeriodicTask();
return TimeUntilNextRunMs();
});
rtc::CancelableTaskHandle handle = periodic_task->GetCancellationHandle();
task_queue->PostTask(std::move(periodic_task));
return handle;
}
};
rtc::TaskQueue task_queue("queue");
auto object = absl::make_unique<ObjectOnTaskQueue>();
// Create and start the periodic task.
rtc::CancelableTaskHandle handle = object->StartPeriodicTask(&task_queue);
// Restart the task
task_queue.PostTask([handle] { handle.Cancel(); });
handle = object->StartPeriodicTask(&task_queue);
// Stop the task and destroy the object.
struct Destructor {
void operator()() {
// Cancel must be run on the task_queue, but if task failed to start
// because of task queue destruction, there is no need to run Cancel.
handle.Cancel();
}
// Destruction will happen either on the task queue or because task
// queue is destroyed.
std::unique_ptr<ObjectOnTaskQueue> object;
rtc::CancelableTaskHandle handle;
};
task_queue.PostTask(Destructor{std::move(object), std::move(handle)});
// Do not wait for the Destructor closure in order to create a race between
// task queue destruction and running the Desctructor closure.
}
} // namespace

View File

@ -0,0 +1,85 @@
/*
* Copyright 2018 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/cancelable_task_handle.h"
#include <utility>
#include "rtc_base/refcounter.h"
#include "rtc_base/sequenced_task_checker.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/thread_checker.h"
namespace rtc {
class CancelableTaskHandle::CancellationToken {
public:
CancellationToken() : canceled_(false), ref_count_(0) { checker_.Detach(); }
CancellationToken(const CancellationToken&) = delete;
CancellationToken& operator=(const CancellationToken&) = delete;
void Cancel() {
RTC_DCHECK_RUN_ON(&checker_);
canceled_ = true;
}
bool Canceled() {
RTC_DCHECK_RUN_ON(&checker_);
return canceled_;
}
void AddRef() { ref_count_.IncRef(); }
void Release() {
if (ref_count_.DecRef() == rtc::RefCountReleaseStatus::kDroppedLastRef)
delete this;
}
private:
~CancellationToken() = default;
rtc::SequencedTaskChecker checker_;
bool canceled_ RTC_GUARDED_BY(checker_);
webrtc::webrtc_impl::RefCounter ref_count_;
};
CancelableTaskHandle::CancelableTaskHandle() = default;
CancelableTaskHandle::CancelableTaskHandle(const CancelableTaskHandle&) =
default;
CancelableTaskHandle::CancelableTaskHandle(CancelableTaskHandle&&) = default;
CancelableTaskHandle& CancelableTaskHandle::operator=(
const CancelableTaskHandle&) = default;
CancelableTaskHandle& CancelableTaskHandle::operator=(CancelableTaskHandle&&) =
default;
CancelableTaskHandle::~CancelableTaskHandle() = default;
void CancelableTaskHandle::Cancel() const {
if (cancellation_token_.get() != nullptr)
cancellation_token_->Cancel();
}
CancelableTaskHandle::CancelableTaskHandle(
rtc::scoped_refptr<CancellationToken> cancellation_token)
: cancellation_token_(std::move(cancellation_token)) {}
BaseCancelableTask::~BaseCancelableTask() = default;
CancelableTaskHandle BaseCancelableTask::GetCancellationHandle() const {
return CancelableTaskHandle(cancellation_token_);
}
BaseCancelableTask::BaseCancelableTask()
: cancellation_token_(new CancelableTaskHandle::CancellationToken) {}
bool BaseCancelableTask::Canceled() const {
return cancellation_token_->Canceled();
}
} // namespace rtc

View File

@ -0,0 +1,65 @@
/*
* Copyright 2018 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_BASE_CANCELABLE_TASK_HANDLE_H_
#define RTC_BASE_CANCELABLE_TASK_HANDLE_H_
#include "rtc_base/scoped_ref_ptr.h"
#include "rtc_base/task_queue.h"
namespace rtc {
class BaseCancelableTask;
// Allows to cancel a cancelable task. Non-empty handle can be acquired by
// calling GetCancellationHandle() on a cancelable task.
class CancelableTaskHandle {
public:
// This class is copyable and cheaply movable.
CancelableTaskHandle();
CancelableTaskHandle(const CancelableTaskHandle&);
CancelableTaskHandle(CancelableTaskHandle&&);
CancelableTaskHandle& operator=(const CancelableTaskHandle&);
CancelableTaskHandle& operator=(CancelableTaskHandle&&);
// Deleting the handler doesn't Cancel the task.
~CancelableTaskHandle();
// Prevents the cancelable task to run.
// Must be executed on the same task queue as the task itself.
void Cancel() const;
private:
friend class BaseCancelableTask;
class CancellationToken;
explicit CancelableTaskHandle(
rtc::scoped_refptr<CancellationToken> cancelation_token);
rtc::scoped_refptr<CancellationToken> cancellation_token_;
};
class BaseCancelableTask : public QueuedTask {
public:
~BaseCancelableTask() override;
CancelableTaskHandle GetCancellationHandle() const;
protected:
BaseCancelableTask();
bool Canceled() const;
private:
rtc::scoped_refptr<CancelableTaskHandle::CancellationToken>
cancellation_token_;
};
} // namespace rtc
#endif // RTC_BASE_CANCELABLE_TASK_HANDLE_H_