322 lines
7.6 KiB
Go
322 lines
7.6 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tablestore
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/parser/terror"
|
|
"github.com/pingcap/tidb/pkg/timer/api"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
notifyTimeout = 20 * time.Second
|
|
minNotifyInterval = time.Second
|
|
etcdNotifyKeyTTLSeconds = 60
|
|
watchTimerEventCreate = "create"
|
|
watchTimerEventUpdate = "update"
|
|
watchTimerEventDelete = "delete"
|
|
)
|
|
|
|
var idleKeepAliveCh = make(<-chan *clientv3.LeaseKeepAliveResponse)
|
|
|
|
type notifyMessage struct {
|
|
Events []*notifyEvent `json:"events"`
|
|
}
|
|
|
|
type notifyEvent struct {
|
|
Tp string `json:"tp"`
|
|
TimerID string `json:"timer_id"`
|
|
Timestamp int64 `json:"timestamp"`
|
|
}
|
|
|
|
func (e *notifyEvent) toWatchEvent() (*api.WatchTimerEvent, error) {
|
|
if e.TimerID == "" {
|
|
return nil, errors.Errorf("timerID is empty")
|
|
}
|
|
|
|
event := &api.WatchTimerEvent{
|
|
TimerID: e.TimerID,
|
|
}
|
|
|
|
switch e.Tp {
|
|
case watchTimerEventCreate:
|
|
event.Tp = api.WatchTimerEventCreate
|
|
case watchTimerEventUpdate:
|
|
event.Tp = api.WatchTimerEventUpdate
|
|
case watchTimerEventDelete:
|
|
event.Tp = api.WatchTimerEventDelete
|
|
default:
|
|
return nil, errors.Errorf("invalid WatchTimerEventType: %s", e.Tp)
|
|
}
|
|
|
|
return event, nil
|
|
}
|
|
|
|
func newNotifyEvent(tp api.WatchTimerEventType, timerID string) (*notifyEvent, error) {
|
|
event := ¬ifyEvent{TimerID: timerID, Timestamp: time.Now().Unix()}
|
|
switch tp {
|
|
case api.WatchTimerEventCreate:
|
|
event.Tp = watchTimerEventCreate
|
|
case api.WatchTimerEventUpdate:
|
|
event.Tp = watchTimerEventUpdate
|
|
case api.WatchTimerEventDelete:
|
|
event.Tp = watchTimerEventDelete
|
|
default:
|
|
return nil, errors.Errorf("invalid WatchTimerEventType: %v, timer: %s", tp, timerID)
|
|
}
|
|
return event, nil
|
|
}
|
|
|
|
type etcdNotifier struct {
|
|
ctx context.Context
|
|
cancel func()
|
|
wg sync.WaitGroup
|
|
mu sync.Mutex
|
|
etcd *clientv3.Client
|
|
keyPrefix string
|
|
key string
|
|
notifyBgChan chan struct{}
|
|
events []*notifyEvent
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewEtcdNotifier creates a notifier based on etcd
|
|
func NewEtcdNotifier(clusterID uint64, etcd *clientv3.Client) api.TimerWatchEventNotifier {
|
|
keyPrefix := fmt.Sprintf("/tidb/timer/cluster/%d/notify/", clusterID)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
key := path.Join(keyPrefix, uuid.NewString())
|
|
notifier := &etcdNotifier{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
etcd: etcd,
|
|
keyPrefix: keyPrefix,
|
|
key: key,
|
|
events: make([]*notifyEvent, 0, 8),
|
|
notifyBgChan: make(chan struct{}, 1),
|
|
logger: logutil.BgLogger().With(zap.String("EtcdKey", key)),
|
|
}
|
|
notifier.wg.Add(1)
|
|
go notifier.notifyLoop()
|
|
return notifier
|
|
}
|
|
|
|
func (n *etcdNotifier) Watch(ctx context.Context) api.WatchTimerChan {
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
ch := make(chan api.WatchTimerResponse)
|
|
if n.cancel == nil {
|
|
// it means closed
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
n.wg.Add(1)
|
|
go func() {
|
|
logger := n.logger.With(zap.String("watcherID", uuid.NewString()))
|
|
logger.Info("new etcd watcher created to watch timer events")
|
|
defer func() {
|
|
logger.Info("etcd watcher exited to watch timer events")
|
|
close(ch)
|
|
n.wg.Done()
|
|
}()
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
etcdCh := n.etcd.Watch(ctx, n.keyPrefix, clientv3.WithPrefix())
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-n.ctx.Done():
|
|
return
|
|
case etcdResp, ok := <-etcdCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
for _, evt := range etcdResp.Events {
|
|
if evt.Type != mvccpb.PUT {
|
|
continue
|
|
}
|
|
|
|
jsonStr := evt.Kv.Value
|
|
var msg notifyMessage
|
|
if err := json.Unmarshal(jsonStr, &msg); err != nil {
|
|
logger.Error("failed to decode message", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
resp := api.WatchTimerResponse{
|
|
Events: make([]*api.WatchTimerEvent, 0, len(msg.Events)),
|
|
}
|
|
|
|
for _, event := range msg.Events {
|
|
watchEvent, err := event.toWatchEvent()
|
|
if err != nil {
|
|
logger.Error("failed to make a watch event",
|
|
zap.Error(err),
|
|
zap.ByteString("json", jsonStr))
|
|
continue
|
|
}
|
|
resp.Events = append(resp.Events, watchEvent)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-n.ctx.Done():
|
|
return
|
|
case ch <- resp:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return ch
|
|
}
|
|
|
|
func (n *etcdNotifier) Notify(tp api.WatchTimerEventType, timerID string) {
|
|
event, err := newNotifyEvent(tp, timerID)
|
|
if err != nil {
|
|
n.logger.Error("failed to create notify event", zap.Any("tp", tp), zap.String("timerID", timerID))
|
|
return
|
|
}
|
|
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
if n.cancel == nil {
|
|
// it means closed
|
|
return
|
|
}
|
|
|
|
n.events = append(n.events, event)
|
|
select {
|
|
case n.notifyBgChan <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (n *etcdNotifier) notifyLoop() {
|
|
n.logger.Info("etcd notify loop to watch timer events started")
|
|
defer func() {
|
|
n.logger.Info("etcd notify loop to watch timer events stopped")
|
|
n.wg.Done()
|
|
}()
|
|
var leaseID clientv3.LeaseID
|
|
keepAlive := idleKeepAliveCh
|
|
lastNotify := time.Now().Add(-time.Second)
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-n.ctx.Done():
|
|
return
|
|
case _, ok := <-keepAlive:
|
|
if !ok {
|
|
leaseID = 0
|
|
keepAlive = idleKeepAliveCh
|
|
n.logger.Warn("keep alive failed with etcd", zap.Int64("lease", int64(leaseID)))
|
|
}
|
|
case _, ok := <-n.notifyBgChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if interval := time.Since(lastNotify); interval < minNotifyInterval {
|
|
select {
|
|
case <-time.After(minNotifyInterval - interval):
|
|
case <-n.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
lastNotify = time.Now()
|
|
if leaseID == 0 {
|
|
newLease, newKeepAlive, err := n.newLease()
|
|
if err != nil {
|
|
n.logger.Error("create lease failed", zap.Error(err))
|
|
continue loop
|
|
}
|
|
leaseID, keepAlive = newLease, newKeepAlive
|
|
}
|
|
n.sendEvents(leaseID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (n *etcdNotifier) newLease() (clientv3.LeaseID, <-chan *clientv3.LeaseKeepAliveResponse, error) {
|
|
resp, err := n.etcd.Grant(n.ctx, etcdNotifyKeyTTLSeconds)
|
|
if err != nil {
|
|
return 0, nil, errors.Trace(err)
|
|
}
|
|
|
|
ch, err := n.etcd.KeepAlive(n.ctx, resp.ID)
|
|
if err != nil {
|
|
return 0, nil, errors.Trace(err)
|
|
}
|
|
|
|
return resp.ID, ch, nil
|
|
}
|
|
|
|
func (n *etcdNotifier) takeEvents() ([]byte, error) {
|
|
n.mu.Lock()
|
|
defer n.mu.Unlock()
|
|
if len(n.events) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
message := ¬ifyMessage{Events: n.events}
|
|
bs, err := json.Marshal(message)
|
|
n.events = n.events[:0]
|
|
return bs, err
|
|
}
|
|
|
|
func (n *etcdNotifier) sendEvents(leaseID clientv3.LeaseID) {
|
|
bs, err := n.takeEvents()
|
|
if len(bs) == 0 || err != nil {
|
|
terror.Log(err)
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(n.ctx, notifyTimeout)
|
|
defer cancel()
|
|
|
|
if _, err := n.etcd.Put(ctx, n.key, string(bs), clientv3.WithLease(leaseID)); err != nil {
|
|
n.logger.Error("failed to put key", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (n *etcdNotifier) Close() {
|
|
n.mu.Lock()
|
|
if n.cancel != nil {
|
|
n.cancel()
|
|
n.cancel = nil
|
|
}
|
|
n.mu.Unlock()
|
|
n.wg.Wait()
|
|
}
|