158 lines
4.6 KiB
Go
158 lines
4.6 KiB
Go
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package common
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"sync/atomic"
|
|
)
|
|
|
|
const (
|
|
// pauseStateRunning indicates the pauser is running (not paused)
|
|
pauseStateRunning uint32 = iota
|
|
// pauseStatePaused indicates the pauser is paused
|
|
pauseStatePaused
|
|
// pauseStateLocked indicates the pauser is being held for exclusive access
|
|
// of its waiters field, and no other goroutines should be able to
|
|
// read/write the map of waiters when the state is Locked (all other
|
|
// goroutines trying to access waiters should cooperatively enter a spin
|
|
// loop).
|
|
pauseStateLocked
|
|
)
|
|
|
|
// The implementation is based on https://github.com/golang/sync/blob/master/semaphore/semaphore.go
|
|
|
|
// Pauser is a type which could allow multiple goroutines to wait on demand,
|
|
// similar to a gate or traffic light.
|
|
type Pauser struct {
|
|
// state has two purposes: (1) records whether we are paused, and (2) acts
|
|
// as a spin-lock for the `waiters` map.
|
|
state uint32
|
|
waiters map[chan<- struct{}]struct{}
|
|
}
|
|
|
|
// NewPauser returns an initialized pauser.
|
|
func NewPauser() *Pauser {
|
|
return &Pauser{
|
|
state: pauseStateRunning,
|
|
waiters: make(map[chan<- struct{}]struct{}, 32),
|
|
}
|
|
}
|
|
|
|
// Pause causes all calls to Wait() to block.
|
|
func (p *Pauser) Pause() {
|
|
// If the state was Paused, we do nothing.
|
|
// If the state was Locked, we loop again until the state becomes not Locked.
|
|
// If the state was Running, we atomically move into Paused state.
|
|
|
|
for {
|
|
oldState := atomic.LoadUint32(&p.state)
|
|
if oldState == pauseStatePaused || atomic.CompareAndSwapUint32(&p.state, pauseStateRunning, pauseStatePaused) {
|
|
return
|
|
}
|
|
runtime.Gosched()
|
|
}
|
|
}
|
|
|
|
// Resume causes all calls to Wait() to continue.
|
|
func (p *Pauser) Resume() {
|
|
// If the state was Running, we do nothing.
|
|
// If the state was Locked, we loop again until the state becomes not Locked.
|
|
// If the state was Paused, we Lock the pauser, clear the waiter map,
|
|
// then move into Running state.
|
|
|
|
for {
|
|
oldState := atomic.LoadUint32(&p.state)
|
|
if oldState == pauseStateRunning {
|
|
return
|
|
}
|
|
if atomic.CompareAndSwapUint32(&p.state, pauseStatePaused, pauseStateLocked) {
|
|
break
|
|
}
|
|
runtime.Gosched()
|
|
}
|
|
|
|
// extract all waiters, then notify them we changed from "Paused" to "Not Paused".
|
|
allWaiters := p.waiters
|
|
p.waiters = make(map[chan<- struct{}]struct{}, len(allWaiters))
|
|
|
|
atomic.StoreUint32(&p.state, pauseStateRunning)
|
|
|
|
for waiter := range allWaiters {
|
|
close(waiter)
|
|
}
|
|
}
|
|
|
|
// IsPaused gets whether the current state is paused or not.
|
|
func (p *Pauser) IsPaused() bool {
|
|
return atomic.LoadUint32(&p.state) != pauseStateRunning
|
|
}
|
|
|
|
// Wait blocks the current goroutine if the current state is paused, until the
|
|
// pauser itself is resumed at least once.
|
|
//
|
|
// If `ctx` is done, this method will also unblock immediately, and return the
|
|
// context error.
|
|
func (p *Pauser) Wait(ctx context.Context) error {
|
|
// If the state is Running, we return immediately (this path is hot and must
|
|
// be taken as soon as possible)
|
|
// If the state is Locked, we loop again until the state becomes not Locked.
|
|
// If the state is Paused, we Lock the pauser, add a waiter to the map, then
|
|
// revert to the original (Paused) state.
|
|
|
|
for {
|
|
oldState := atomic.LoadUint32(&p.state)
|
|
if oldState == pauseStateRunning {
|
|
return nil
|
|
}
|
|
if atomic.CompareAndSwapUint32(&p.state, pauseStatePaused, pauseStateLocked) {
|
|
break
|
|
}
|
|
runtime.Gosched()
|
|
}
|
|
|
|
waiter := make(chan struct{})
|
|
p.waiters[waiter] = struct{}{}
|
|
|
|
atomic.StoreUint32(&p.state, pauseStatePaused)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
err := ctx.Err()
|
|
p.cancel(waiter)
|
|
return err
|
|
case <-waiter:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// cancel removes a waiter from the waiters map
|
|
func (p *Pauser) cancel(waiter chan<- struct{}) {
|
|
// If the state is Locked, we loop again until the state becomes not Locked.
|
|
// Otherwise, we Lock the pauser, remove the waiter from the map, then
|
|
// revert to the original state.
|
|
|
|
for {
|
|
oldState := atomic.LoadUint32(&p.state)
|
|
if oldState != pauseStateLocked && atomic.CompareAndSwapUint32(&p.state, oldState, pauseStateLocked) {
|
|
delete(p.waiters, waiter)
|
|
atomic.StoreUint32(&p.state, oldState)
|
|
return
|
|
}
|
|
runtime.Gosched()
|
|
}
|
|
}
|