Files
tidb/pkg/timer/runtime/runtime_test.go

1001 lines
32 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runtime
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/timer/api"
mockutil "github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestRuntimeStartStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := api.NewMemoryTimerStore()
defer store.Close()
cli := api.NewDefaultTimerClient(store)
_, err := cli.CreateTimer(ctx, api.TimerSpec{
Namespace: "n1",
Key: "k1",
SchedPolicyType: api.SchedEventInterval,
SchedPolicyExpr: "1m",
Enable: true,
HookClass: "hook1",
})
require.NoError(t, err)
timerProcessed := make(chan struct{})
hook := newMockHook()
hook.On("Start").Return().Once()
hook.On("Stop").Return().Once()
hook.On("OnPreSchedEvent", mock.Anything, mock.Anything).
Return(api.PreSchedEventResult{}, nil).Once()
hook.On("OnSchedEvent", mock.Anything, mock.Anything).
Return(nil).Once().
Run(func(args mock.Arguments) {
close(timerProcessed)
})
var factoryMock mock.Mock
factoryMock.On("factory", "hook1", cli).Return(hook).Once()
hookFactory := func(hookClass string, cli api.TimerClient) api.Hook {
return factoryMock.MethodCalled("factory", hookClass, cli).Get(0).(api.Hook)
}
runtime := NewTimerRuntimeBuilder("g1", store).
RegisterHookFactory("hook1", hookFactory).
Build()
require.NotNil(t, runtime.fullRefreshTimerCounter)
require.NotNil(t, runtime.partialRefreshTimerCounter)
runtime.Start()
require.True(t, runtime.Running())
waitDone(timerProcessed, time.Minute)
go func() {
runtime.Stop()
require.False(t, runtime.Running())
cancel()
}()
waitDone(ctx.Done(), time.Minute)
factoryMock.AssertExpectations(t)
hook.AssertExpectations(t)
}
func TestEnsureWorker(t *testing.T) {
store := api.NewMemoryTimerStore()
defer store.Close()
cli := api.NewDefaultTimerClient(store)
var factoryMock mock.Mock
hook := newMockHook()
hook.On("Start").Return().Once()
hook.On("Stop").Return().Once()
factoryMock.On("factory", "hook1", cli).Return(hook).Once()
hookFactory := func(hookClass string, cli api.TimerClient) api.Hook {
return factoryMock.MethodCalled("factory", hookClass, cli).Get(0).(api.Hook)
}
runtime := NewTimerRuntimeBuilder("g1", store).
RegisterHookFactory("hook1", hookFactory).
Build()
runtime.initCtx()
worker1, ok := runtime.ensureWorker("hook1")
require.True(t, ok)
waitDone(hook.started, time.Minute)
worker2, ok := runtime.ensureWorker("hook1")
require.True(t, ok)
require.Same(t, worker1, worker2)
_, ok = runtime.ensureWorker("hook2")
require.False(t, ok)
runtime.Stop()
factoryMock.AssertExpectations(t)
hook.AssertExpectations(t)
}
func TestTryTriggerTimer(t *testing.T) {
now := time.Now()
store := api.NewMemoryTimerStore()
defer store.Close()
runtime := NewTimerRuntimeBuilder("g1", store).Build()
runtime.setNowFunc(func() time.Time {
return now
})
runtime.initCtx()
// t1: idle timer
t1 := newTestTimer("t1", "1m", now.Add(-time.Hour))
runtime.cache.updateTimer(t1)
// t2: not idle timer, it will be triggered event timer disabled
t2 := newTestTimer("t2", "1h", now)
t2.EventStatus = api.SchedEventTrigger
t2.EventID = "event2"
t2.EventStart = now.Add(-time.Hour)
t2.Enable = false
runtime.cache.updateTimer(t2)
// t3: next event time after now
t3 := newTestTimer("t3", "10m", now)
runtime.cache.updateTimer(t3)
runtime.cache.updateNextTryTriggerTime(t3.ID, now.Add(-10*time.Minute))
// t4: next try trigger time after now
t4 := newTestTimer("t4", "1m", now.Add(-time.Hour))
runtime.cache.updateTimer(t4)
runtime.cache.updateNextTryTriggerTime(t4.ID, now.Add(time.Second))
t5 := newTestTimer("t5", "5m", now.Add(-10*time.Minute))
runtime.cache.updateTimer(t5)
// t6: worker chan will full when emit t6
t6 := newTestTimer("t6", "6m", now.Add(-10*time.Minute))
runtime.cache.updateTimer(t6)
// t6: worker chan will full when emit t7
t7 := newTestTimer("t7", "6m", now.Add(-10*time.Minute))
runtime.cache.updateTimer(t7)
// t8: triggering
t8 := newTestTimer("t8", "1m", now.Add(-2*time.Hour))
runtime.cache.updateTimer(t8)
runtime.cache.setTimerProcStatus(t8.ID, procTriggering, "event8")
// t9: wait close
t9 := newTestTimer("t9", "1m", now.Add(-2*time.Hour))
t9.EventStatus = api.SchedEventTrigger
t9.EventID = "event9"
t9.EventStart = now.Add(-2 * time.Hour)
runtime.cache.updateTimer(t9)
runtime.cache.setTimerProcStatus(t9.ID, procWaitTriggerClose, "event9")
ch := make(chan *triggerEventRequest, 3)
runtime.workers["hook1"] = &hookWorker{ch: ch}
runtime.tryTriggerTimerEvents()
require.Equal(t, procTriggering, runtime.cache.items[t1.ID].procStatus)
require.NotEmpty(t, runtime.cache.items[t1.ID].triggerEventID)
require.Equal(t, procTriggering, runtime.cache.items[t2.ID].procStatus)
require.Equal(t, "event2", runtime.cache.items[t2.ID].triggerEventID)
require.Equal(t, procIdle, runtime.cache.items[t3.ID].procStatus)
require.Empty(t, runtime.cache.items[t3.ID].triggerEventID)
require.Equal(t, procIdle, runtime.cache.items[t4.ID].procStatus)
require.Empty(t, runtime.cache.items[t4.ID].triggerEventID)
require.Equal(t, procTriggering, runtime.cache.items[t5.ID].procStatus)
require.NotEmpty(t, runtime.cache.items[t5.ID].triggerEventID)
require.Equal(t, procIdle, runtime.cache.items[t6.ID].procStatus)
require.Empty(t, runtime.cache.items[t6.ID].triggerEventID)
require.Equal(t, runtime.cache.items[t6.ID].nextTryTriggerTime, now.Add(retryBusyWorkerInterval))
require.Equal(t, procIdle, runtime.cache.items[t7.ID].procStatus)
require.Empty(t, runtime.cache.items[t7.ID].triggerEventID)
require.Equal(t, runtime.cache.items[t7.ID].nextTryTriggerTime, now.Add(retryBusyWorkerInterval))
require.Equal(t, procTriggering, runtime.cache.items[t8.ID].procStatus)
require.Equal(t, procWaitTriggerClose, runtime.cache.items[t9.ID].procStatus)
consumeAndVerify := func(tm *api.TimerRecord) {
select {
case req := <-ch:
if tm == nil {
require.FailNow(t, "should not reach here")
return
}
require.Equal(t, tm, req.timer)
require.Same(t, runtime.store, req.store)
require.NotNil(t, runtime.workerRespCh)
require.Equal(t, runtime.cache.items[tm.ID].triggerEventID, req.eventID)
default:
if tm != nil {
require.FailNow(t, "should not reach here")
}
}
}
consumeAndVerify(t2)
consumeAndVerify(t1)
consumeAndVerify(t5)
consumeAndVerify(nil)
// t3: has a processed manual request
t3 = t3.Clone()
t3.Version++
t3.ManualRequest = api.ManualRequest{
ManualRequestID: "req1",
ManualRequestTime: now,
ManualTimeout: time.Minute,
ManualProcessed: true,
ManualEventID: "event1",
}
runtime.cache.updateTimer(t3)
runtime.tryTriggerTimerEvents()
consumeAndVerify(nil)
// t3: has a not processed manual request but timer is disabled
t3 = t3.Clone()
t3.Enable = false
t3.ManualRequest = api.ManualRequest{
ManualRequestID: "req2",
ManualRequestTime: now,
ManualTimeout: time.Minute,
}
t3.Version++
runtime.cache.updateTimer(t3)
runtime.tryTriggerTimerEvents()
consumeAndVerify(nil)
// t3: has a not processed manual request
t3 = t3.Clone()
t3.Enable = true
t3.Version++
runtime.cache.updateTimer(t3)
runtime.tryTriggerTimerEvents()
consumeAndVerify(t3)
}
func TestTryTriggerTimePriority(t *testing.T) {
now := time.Now()
store := api.NewMemoryTimerStore()
defer store.Close()
runtime := NewTimerRuntimeBuilder("g1", store).Build()
runtime.setNowFunc(func() time.Time {
return now
})
runtime.initCtx()
ch := make(chan *triggerEventRequest, 2)
runtime.workers["hook1"] = &hookWorker{ch: ch}
t1 := newTestTimer("t1", "1m", now.Add(-time.Hour))
runtime.cache.updateTimer(t1)
runtime.cache.updateNextTryTriggerTime(t1.ID, now.Add(-3*time.Minute))
t2 := newTestTimer("t2", "1m", now.Add(-2*time.Hour))
runtime.cache.updateTimer(t2)
runtime.cache.updateNextTryTriggerTime(t2.ID, now.Add(-2*time.Minute))
t3 := newTestTimer("t3", "1h", now)
t3.EventStatus = api.SchedEventTrigger
t3.EventID = "event2"
t3.EventStart = now.Add(-time.Minute)
t3.Enable = false
runtime.cache.updateTimer(t3)
t4 := newTestTimer("t4", "1m", now.Add(-10*time.Hour))
runtime.cache.updateTimer(t4)
runtime.cache.updateNextTryTriggerTime(t4.ID, now.Add(time.Minute))
// nextEventTime: t3 (nil) < t4 < t2 < t1
// nextTryTriggerTime: t1 < t2 < t3 (eventStart) < t4
// we should test the priority trigger is ordered by `nextEventTime` because to ensure the timer who has a max
// delay time will be triggered first.
// t4 should not be scheduled for the next trigger time is after now.
// so, t3 and t2 will be triggered when the capacity of chan is 2
runtime.tryTriggerTimerEvents()
require.Equal(t, procTriggering, runtime.cache.items[t2.ID].procStatus)
require.Equal(t, procTriggering, runtime.cache.items[t3.ID].procStatus)
// t1, t4 should keep not triggered
require.Equal(t, procIdle, runtime.cache.items[t1.ID].procStatus)
require.Equal(t, procIdle, runtime.cache.items[t4.ID].procStatus)
}
func TestHandleHookWorkerResponse(t *testing.T) {
now := time.Now()
store := api.NewMemoryTimerStore()
defer store.Close()
runtime := NewTimerRuntimeBuilder("g1", store).Build()
runtime.setNowFunc(func() time.Time {
return now
})
runtime.initCtx()
t1 := newTestTimer("t1", "1m", now.Add(-time.Hour))
runtime.cache.updateTimer(t1)
runtime.cache.setTimerProcStatus(t1.ID, procTriggering, "event1")
// success response
runtime.cache.removeTimer(t1.ID)
runtime.cache.updateTimer(t1)
triggerTimer1 := t1.Clone()
triggerTimer1.EventID = "event1"
triggerTimer1.EventStatus = api.SchedEventTrigger
triggerTimer1.EventStart = now
triggerTimer1.EventData = []byte("data1")
triggerTimer1.Version++
runtime.handleWorkerResponse(&triggerEventResponse{
success: true,
timerID: t1.ID,
eventID: "event1",
newTimerRecord: api.NewOptionalVal(triggerTimer1),
})
item := runtime.cache.items[t1.ID]
require.Equal(t, item.timer, triggerTimer1)
require.Equal(t, procWaitTriggerClose, item.procStatus)
require.Equal(t, "event1", item.triggerEventID)
require.Equal(t, 1, len(runtime.cache.waitCloseTimerIDs))
_, ok := runtime.cache.waitCloseTimerIDs[t1.ID]
require.True(t, ok)
// not success response with timer removed
var newTimer *api.TimerRecord
runtime.cache.removeTimer(t1.ID)
runtime.cache.updateTimer(t1)
runtime.handleWorkerResponse(&triggerEventResponse{
success: false,
timerID: t1.ID,
eventID: "event1",
newTimerRecord: api.NewOptionalVal(newTimer),
})
require.False(t, runtime.cache.hasTimer(t1.ID))
require.Equal(t, 0, len(runtime.cache.waitCloseTimerIDs))
// not success response with timer changed
runtime.cache.removeTimer(t1.ID)
runtime.cache.updateTimer(t1)
newTimer = t1.Clone()
newTimer.Version++
newTimer.Watermark = now.Add(time.Second)
runtime.handleWorkerResponse(&triggerEventResponse{
success: false,
timerID: t1.ID,
eventID: "event1",
newTimerRecord: api.NewOptionalVal(newTimer),
})
item = runtime.cache.items[t1.ID]
require.Equal(t, newTimer, item.timer)
require.Equal(t, procIdle, item.procStatus)
require.Equal(t, "", item.triggerEventID)
require.Equal(t, 0, len(runtime.cache.waitCloseTimerIDs))
// not success response with retry after
runtime.cache.removeTimer(t1.ID)
runtime.cache.updateTimer(t1)
runtime.handleWorkerResponse(&triggerEventResponse{
success: false,
timerID: t1.ID,
eventID: "event1",
retryAfter: api.NewOptionalVal(12 * time.Second),
})
item = runtime.cache.items[t1.ID]
require.Equal(t, t1, item.timer)
require.Equal(t, procIdle, item.procStatus)
require.Equal(t, "", item.triggerEventID)
require.Equal(t, now.Add(12*time.Second), item.nextTryTriggerTime)
require.Equal(t, 0, len(runtime.cache.waitCloseTimerIDs))
}
func TestNextTryTriggerDuration(t *testing.T) {
origMinTriggerEventInterval := minTriggerEventInterval
minTriggerEventInterval = time.Second
defer func() {
minTriggerEventInterval = origMinTriggerEventInterval
}()
now := time.Now()
store := api.NewMemoryTimerStore()
defer store.Close()
runtime := NewTimerRuntimeBuilder("g1", store).Build()
runtime.setNowFunc(func() time.Time {
return now
})
runtime.initCtx()
t1 := newTestTimer("t1", "0.1m", now)
runtime.cache.updateTimer(t1)
runtime.cache.setTimerProcStatus(t1.ID, procTriggering, "event1")
t2 := newTestTimer("t2", "1.5m", now)
runtime.cache.updateTimer(t2)
t3 := newTestTimer("t3", "2m", now)
runtime.cache.updateTimer(t3)
interval := runtime.getNextTryTriggerDuration(now)
require.Equal(t, 60*time.Second, interval)
now = now.Add(70 * time.Second)
interval = runtime.getNextTryTriggerDuration(now)
require.Equal(t, 20*time.Second, interval)
now = now.Add(19*time.Second + 500*time.Millisecond)
interval = runtime.getNextTryTriggerDuration(now.Add(-time.Second))
require.Equal(t, 500*time.Millisecond, interval)
interval = runtime.getNextTryTriggerDuration(now)
require.Equal(t, time.Second, interval)
interval = runtime.getNextTryTriggerDuration(now.Add(100 * time.Millisecond))
require.Equal(t, time.Second, interval)
now = now.Add(time.Hour)
interval = runtime.getNextTryTriggerDuration(time.UnixMilli(0))
require.Equal(t, time.Duration(0), interval)
}
func TestFullRefreshTimers(t *testing.T) {
fullRefreshCounter := &mockutil.MetricsCounter{}
mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
require.NotNil(t, runtime.fullRefreshTimerCounter)
runtime.fullRefreshTimerCounter = fullRefreshCounter
runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")}
runtime.initCtx()
timers := make([]*api.TimerRecord, 7)
for i := range timers {
timer := newTestTimer(fmt.Sprintf("t%d", i), "1m", time.Now())
procStatus := procIdle
if i == 2 || i == 4 {
timer.EventStatus = api.SchedEventTrigger
timer.EventStart = time.Now()
timer.EventID = fmt.Sprintf("event%d", i+1)
procStatus = procWaitTriggerClose
}
if i == 6 {
procStatus = procTriggering
}
runtime.cache.updateTimer(timer)
runtime.cache.setTimerProcStatus(timer.ID, procStatus, timer.EventID)
timers[i] = timer
}
t0New := timers[0].Clone()
t0New.Version++
t2New := timers[2].Clone()
t2New.Version++
t4New := timers[4].Clone()
t4New.EventStatus = api.SchedEventIdle
t4New.EventID = ""
t4New.Version++
t6New := timers[6].Clone()
t6New.Version++
mockCore.On("List", mock.Anything, runtime.cond).Return(timers[0:], errors.New("mockErr")).Once()
require.Equal(t, float64(0), fullRefreshCounter.Val())
runtime.fullRefreshTimers()
require.Equal(t, float64(1), fullRefreshCounter.Val())
require.Equal(t, 7, len(runtime.cache.items))
mockCore.On("List", mock.Anything, runtime.cond).Return([]*api.TimerRecord{t0New, timers[1], t2New, t4New, t6New}, nil).Once()
runtime.fullRefreshTimers()
require.Equal(t, float64(2), fullRefreshCounter.Val())
mockCore.AssertExpectations(t)
require.Equal(t, 5, len(runtime.cache.items))
require.Equal(t, t0New, runtime.cache.items["t0"].timer)
require.Equal(t, timers[1], runtime.cache.items["t1"].timer)
require.Equal(t, t2New, runtime.cache.items["t2"].timer)
require.Equal(t, procWaitTriggerClose, runtime.cache.items["t2"].procStatus)
require.Equal(t, t4New, runtime.cache.items["t4"].timer)
require.Equal(t, procIdle, runtime.cache.items["t4"].procStatus)
require.Equal(t, t6New, runtime.cache.items["t6"].timer)
require.Equal(t, procTriggering, runtime.cache.items["t6"].procStatus)
}
func TestBatchHandlerWatchResponses(t *testing.T) {
partialRefreshCounter := &mockutil.MetricsCounter{}
mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
require.NotNil(t, runtime.partialRefreshTimerCounter)
runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")}
runtime.initCtx()
runtime.partialRefreshTimerCounter = partialRefreshCounter
timers := make([]*api.TimerRecord, 7)
for i := range timers {
timer := newTestTimer(fmt.Sprintf("t%d", i), "1m", time.Now())
procStatus := procIdle
if i == 2 {
timer.EventStatus = api.SchedEventTrigger
timer.EventStart = time.Now()
timer.EventID = fmt.Sprintf("event%d", i+1)
procStatus = procWaitTriggerClose
}
if i == 6 {
procStatus = procTriggering
}
runtime.cache.updateTimer(timer)
runtime.cache.setTimerProcStatus(timer.ID, procStatus, timer.EventID)
timers[i] = timer
}
t10 := newTestTimer("t10", "1m", time.Now())
t2New := timers[2].Clone()
t2New.EventStatus = api.SchedEventIdle
t2New.EventID = ""
t2New.Version++
t6New := timers[6].Clone()
t6New.Version++
mockCore.On("List", mock.Anything, mock.Anything).
Return([]*api.TimerRecord{t2New, t6New, t10}, nil).Once().
Run(func(args mock.Arguments) {
and, ok := args[1].(*api.Operator)
require.True(t, ok)
require.Equal(t, api.OperatorAnd, and.Op)
require.False(t, and.Not)
require.Equal(t, 2, len(and.Children))
require.Equal(t, runtime.cond, and.Children[0])
or, ok := and.Children[1].(*api.Operator)
require.True(t, ok)
require.Equal(t, api.OperatorOr, or.Op)
require.False(t, or.Not)
require.Equal(t, 2, len(or.Children))
condIDs := make(map[string]struct{})
for i := range or.Children {
idCond, ok := or.Children[i].(*api.TimerCond)
require.True(t, ok)
got, ok := idCond.ID.Get()
require.True(t, ok)
require.Empty(t, idCond.FieldsSet(unsafe.Pointer(&idCond.ID)))
condIDs[got] = struct{}{}
}
require.Equal(t, len(condIDs), 2)
require.Contains(t, condIDs, "t10")
require.Contains(t, condIDs, "t2")
})
require.Equal(t, float64(0), partialRefreshCounter.Val())
runtime.batchHandleWatchResponses([]api.WatchTimerResponse{
{
Events: []*api.WatchTimerEvent{
{
Tp: api.WatchTimerEventDelete,
TimerID: "t0",
},
{
Tp: api.WatchTimerEventCreate,
TimerID: "t10",
},
},
},
{
Events: []*api.WatchTimerEvent{
{
Tp: api.WatchTimerEventUpdate,
TimerID: "t2",
},
{
Tp: api.WatchTimerEventDelete,
TimerID: "t5",
},
},
},
})
require.Equal(t, float64(1), partialRefreshCounter.Val())
mockCore.AssertExpectations(t)
require.Equal(t, 6, len(runtime.cache.items))
require.False(t, runtime.cache.hasTimer("t0"))
require.False(t, runtime.cache.hasTimer("t5"))
require.Equal(t, t10, runtime.cache.items["t10"].timer)
require.Equal(t, procIdle, runtime.cache.items["t10"].procStatus)
require.Equal(t, t2New, runtime.cache.items["t2"].timer)
require.Equal(t, procIdle, runtime.cache.items["t2"].procStatus)
require.Equal(t, t6New, runtime.cache.items["t6"].timer)
require.Equal(t, procTriggering, runtime.cache.items["t6"].procStatus)
}
func TestCloseWaitingCloseTimers(t *testing.T) {
mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
runtime.cond = &api.TimerCond{Namespace: api.NewOptionalVal("n1")}
runtime.initCtx()
require.False(t, runtime.tryCloseTriggeringTimers())
timers := make([]*api.TimerRecord, 5)
for i := range timers {
timer := newTestTimer(fmt.Sprintf("t%d", i), "1m", time.Now())
timer.EventStatus = api.SchedEventTrigger
timer.EventStart = time.Now()
timer.EventID = fmt.Sprintf("event%d", i)
runtime.cache.updateTimer(timer)
runtime.cache.setTimerProcStatus(timer.ID, procWaitTriggerClose, timer.EventID)
timers[i] = timer
}
mockCore.On("List", mock.Anything, mock.Anything).
Return(timers, nil).Once().
Run(func(args mock.Arguments) {
and, ok := args[1].(*api.Operator)
require.True(t, ok)
require.Equal(t, api.OperatorAnd, and.Op)
require.False(t, and.Not)
require.Equal(t, 2, len(and.Children))
require.Equal(t, runtime.cond, and.Children[0])
or, ok := and.Children[1].(*api.Operator)
require.True(t, ok)
require.Equal(t, api.OperatorOr, or.Op)
require.False(t, or.Not)
require.Equal(t, len(timers), len(or.Children))
condIDs := make(map[string]struct{})
for i := range or.Children {
idCond, ok := or.Children[i].(*api.TimerCond)
require.True(t, ok)
got, ok := idCond.ID.Get()
require.True(t, ok)
require.Empty(t, idCond.FieldsSet(unsafe.Pointer(&idCond.ID)))
condIDs[got] = struct{}{}
}
require.Equal(t, len(condIDs), len(or.Children))
for i := range timers {
require.Contains(t, condIDs, fmt.Sprintf("t%d", i))
}
})
require.False(t, runtime.tryCloseTriggeringTimers())
mockCore.AssertExpectations(t)
require.Equal(t, len(timers), len(runtime.cache.waitCloseTimerIDs))
require.Equal(t, len(timers), len(runtime.cache.items))
require.Equal(t, len(timers), runtime.cache.sorted.Len())
t1New := timers[1].Clone()
t1New.EventStatus = api.SchedEventIdle
t1New.EventID = ""
t1New.Version++
t4New := timers[4].Clone()
t4New.EventID = "event_next"
t4New.Version++
mockCore.On("List", mock.Anything, mock.Anything).
Return([]*api.TimerRecord{timers[0], t1New, timers[2], t4New}, nil).Once()
require.True(t, runtime.tryCloseTriggeringTimers())
mockCore.AssertExpectations(t)
require.Equal(t, 2, len(runtime.cache.waitCloseTimerIDs))
require.Equal(t, 4, len(runtime.cache.items))
require.Equal(t, 4, runtime.cache.sorted.Len())
require.Contains(t, runtime.cache.waitCloseTimerIDs, "t0")
require.Contains(t, runtime.cache.waitCloseTimerIDs, "t2")
require.Equal(t, timers[0], runtime.cache.items["t0"].timer)
require.Equal(t, procWaitTriggerClose, runtime.cache.items["t0"].procStatus)
require.Equal(t, t1New, runtime.cache.items["t1"].timer)
require.Equal(t, procIdle, runtime.cache.items["t1"].procStatus)
require.Equal(t, timers[2], runtime.cache.items["t2"].timer)
require.Equal(t, procWaitTriggerClose, runtime.cache.items["t2"].procStatus)
require.Equal(t, t4New, runtime.cache.items["t4"].timer)
require.Equal(t, procIdle, runtime.cache.items["t4"].procStatus)
}
func TestCreateWatchTimerChan(t *testing.T) {
mockCore, mockStore := newMockStore()
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
ch := make(chan api.WatchTimerResponse, 1)
ch <- api.WatchTimerResponse{Events: []*api.WatchTimerEvent{{TimerID: "AAA"}}}
retCh := api.WatchTimerChan(ch)
mockCore.On("Watch", mock.Anything).Return(retCh).Once()
mockCore.On("WatchSupported").Return(true).Once()
got := runtime.createWatchTimerChan(context.Background())
require.True(t, got != idleWatchChan)
select {
case resp, ok := <-got:
require.True(t, ok)
require.Equal(t, 1, len(resp.Events))
require.Equal(t, "AAA", resp.Events[0].TimerID)
default:
require.FailNow(t, "should fail here")
}
mockCore.AssertExpectations(t)
mockCore.On("WatchSupported").Return(false).Once()
got = runtime.createWatchTimerChan(context.Background())
require.True(t, got == idleWatchChan)
select {
case <-got:
require.FailNow(t, "should fail here")
default:
}
mockCore.AssertExpectations(t)
}
func TestWatchTimerRetry(t *testing.T) {
origReWatchInterval := reWatchInterval
reWatchInterval = 100 * time.Millisecond
defer func() {
reWatchInterval = origReWatchInterval
}()
mockCore, mockStore := newMockStore()
ch := make(chan api.WatchTimerResponse)
close(ch)
closedCh := api.WatchTimerChan(ch)
ch = make(chan api.WatchTimerResponse)
normalCh := api.WatchTimerChan(ch)
mockCore.On("WatchSupported").Return(true).Times(3)
var watch1StartTime atomic.Pointer[time.Time]
var watch2StartTime atomic.Pointer[time.Time]
done := make(chan struct{})
mockCore.On("List", mock.Anything, mock.Anything).Return([]*api.TimerRecord(nil), nil)
mockCore.On("Watch", mock.Anything).Return(closedCh).Once().Run(func(args mock.Arguments) {
now := time.Now()
watch1StartTime.Store(&now)
})
mockCore.On("Watch", mock.Anything).Return(normalCh).Once().Run(func(args mock.Arguments) {
now := time.Now()
watch2StartTime.Store(&now)
close(done)
})
runtime := NewTimerRuntimeBuilder("g1", mockStore).Build()
runtime.Start()
defer runtime.Stop()
waitDone(done, time.Minute)
require.NotNil(t, watch1StartTime.Load())
require.NotNil(t, watch2StartTime.Load())
require.GreaterOrEqual(t, watch2StartTime.Load().Sub(*watch1StartTime.Load()), reWatchInterval)
}
func TestTimerFullProcess(t *testing.T) {
origBatchProcessWatchRespInterval := batchProcessWatchRespInterval
origMinTriggerEventInterval := minTriggerEventInterval
origMaxTriggerEventInterval := maxTriggerEventInterval
batchProcessWatchRespInterval = time.Millisecond
minTriggerEventInterval = time.Millisecond
maxTriggerEventInterval = 10 * time.Millisecond
defer func() {
batchProcessWatchRespInterval = origBatchProcessWatchRespInterval
minTriggerEventInterval = origMinTriggerEventInterval
maxTriggerEventInterval = origMaxTriggerEventInterval
}()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var now atomic.Pointer[time.Time]
setNow := func(n time.Time) { now.Store(&n) }
setNow(time.UnixMilli(0))
var zeroTime time.Time
store := api.NewMemoryTimerStore()
defer store.Close()
cli := api.NewDefaultTimerClient(store)
hook := newMockHook()
runtime := NewTimerRuntimeBuilder("g1", store).
RegisterHookFactory("h1", func(hookClass string, timerCli api.TimerClient) api.Hook {
require.Equal(t, "h1", hookClass)
require.Equal(t, cli, timerCli)
return hook
}).
Build()
runtime.setNowFunc(func() time.Time {
return *now.Load()
})
var currentEvent atomic.Pointer[string]
var preSchedTimer atomic.Pointer[api.TimerRecord]
checkPreSchedEventFunc := func(checkTimer *api.TimerRecord) func(args mock.Arguments) {
return func(args mock.Arguments) {
argCtx, ok := args[0].(context.Context)
require.True(t, ok)
require.NotNil(t, argCtx)
argEvent, ok := args[1].(api.TimerShedEvent)
require.True(t, ok)
require.NotNil(t, argEvent)
eventID := argEvent.EventID()
currentEvent.Store(&eventID)
argTimer := argEvent.Timer()
require.Equal(t, checkTimer, argTimer)
preSchedTimer.Store(argTimer)
}
}
onSchedDone := make(chan struct{})
var onSchedTimer atomic.Pointer[api.TimerRecord]
checkOnSchedEventFunc := func(checkEventData []byte, checkEventStart time.Time) func(args mock.Arguments) {
return func(args mock.Arguments) {
argCtx, ok := args[0].(context.Context)
require.True(t, ok)
require.NotNil(t, argCtx)
argEvent, ok := args[1].(api.TimerShedEvent)
require.True(t, ok)
require.NotNil(t, argEvent)
eventID := argEvent.EventID()
require.Equal(t, *currentEvent.Load(), eventID)
argTimer := argEvent.Timer()
preTimer := preSchedTimer.Load()
require.Equal(t, preTimer.ID, argTimer.ID)
require.Equal(t, preTimer.TimerSpec, argTimer.TimerSpec)
require.Equal(t, preTimer.Watermark, argTimer.Watermark)
require.Equal(t, api.SchedEventTrigger, argTimer.EventStatus)
require.Equal(t, preTimer.SummaryData, argTimer.SummaryData)
require.Equal(t, eventID, argTimer.EventID)
require.Equal(t, checkEventData, argTimer.EventData)
require.Equal(t, checkEventStart, argTimer.EventStart)
currentEvent.Store(nil)
preSchedTimer.Store(nil)
onSchedTimer.Store(argTimer)
close(onSchedDone)
}
}
hookStartWait := make(chan time.Time)
hook.On("Start").Return().Once().WaitUntil(hookStartWait)
hook.On("Stop").Return().Maybe()
runtime.Start()
defer runtime.Stop()
timer, err := cli.CreateTimer(ctx, api.TimerSpec{
Key: "key1",
Data: []byte("timer1data"),
SchedPolicyType: api.SchedEventInterval,
SchedPolicyExpr: "1m",
HookClass: "h1",
Enable: true,
})
require.NoError(t, err)
timerID := timer.ID
close(hookStartWait)
hook.On("OnPreSchedEvent", mock.Anything, mock.Anything).
Return(api.PreSchedEventResult{EventData: []byte("eventdata1")}, nil).
Once().
Run(checkPreSchedEventFunc(timer))
hook.On("OnSchedEvent", mock.Anything, mock.Anything).
Return(nil).
Once().
Run(checkOnSchedEventFunc([]byte("eventdata1"), *now.Load()))
waitDone(onSchedDone, 5*time.Second)
onSchedDone = make(chan struct{})
hook.AssertExpectations(t)
timer, err = cli.GetTimerByID(ctx, timerID)
require.NoError(t, err)
require.Equal(t, onSchedTimer.Load(), timer)
onSchedTimer.Store(nil)
// should not trigger again before close previous event
setNow(now.Load().Add(2 * time.Minute))
checkNotDone(onSchedDone, time.Second)
tmpTimer, err := cli.GetTimerByID(ctx, timerID)
require.NoError(t, err)
require.Equal(t, timer, tmpTimer)
// close event
err = cli.CloseTimerEvent(ctx, timerID, timer.EventID,
api.WithSetWatermark(*now.Load()),
api.WithSetSummaryData([]byte("summary1")),
)
require.NoError(t, err)
timer, err = cli.GetTimerByID(ctx, timerID)
require.NoError(t, err)
require.Equal(t, api.SchedEventIdle, timer.EventStatus)
require.Empty(t, timer.EventID)
require.Equal(t, zeroTime, timer.EventStart)
require.Empty(t, timer.EventData)
require.Equal(t, []byte("summary1"), timer.SummaryData)
checkNotDone(onSchedDone, time.Second)
// trigger again after 1 minute
setNow(now.Load().Add(time.Minute))
hook.On("OnPreSchedEvent", mock.Anything, mock.Anything).
Return(api.PreSchedEventResult{EventData: []byte("eventdata2")}, nil).
Once().
Run(checkPreSchedEventFunc(timer))
hook.On("OnSchedEvent", mock.Anything, mock.Anything).
Return(nil).
Once().
Run(checkOnSchedEventFunc([]byte("eventdata2"), *now.Load()))
waitDone(onSchedDone, 5*time.Second)
onSchedDone = make(chan struct{})
hook.AssertExpectations(t)
timer, err = cli.GetTimerByID(ctx, timer.ID)
require.Nil(t, err)
require.Equal(t, onSchedTimer.Load(), timer)
onSchedTimer.Store(nil)
}
func TestTimerRuntimeLoopPanicRecover(t *testing.T) {
mockCore, mockStore := newMockStore()
rt := NewTimerRuntimeBuilder("g1", mockStore).Build()
// start and panic two times, then normal
started := make(chan struct{})
mockCore.On("WatchSupported").Return(false).Times(3)
mockCore.On("List", mock.Anything, mock.Anything).Panic("store panic").Twice()
mockCore.On("List", mock.Anything, mock.Anything).Return([]*api.TimerRecord(nil), nil).Once().Run(func(args mock.Arguments) {
close(started)
})
rt.retryLoopWait = time.Millisecond
rt.Start()
waitDone(started, 5*time.Second)
mockCore.AssertExpectations(t)
// normal stop
stopped := make(chan struct{})
go func() {
rt.Stop()
close(stopped)
}()
waitDone(stopped, 5*time.Second)
mockCore.AssertExpectations(t)
// start and panic always
rt = NewTimerRuntimeBuilder("g1", mockStore).Build()
mockCore.On("WatchSupported").Return(false)
mockCore.On("List", mock.Anything, mock.Anything).Panic("store panic")
rt.retryLoopWait = time.Millisecond
rt.Start()
time.Sleep(10 * time.Millisecond)
// can also stop
stopped = make(chan struct{})
go func() {
rt.Stop()
close(stopped)
}()
waitDone(stopped, 5*time.Second)
mockCore.AssertExpectations(t)
// stop should stop immediately
mockCore, mockStore = newMockStore()
rt = NewTimerRuntimeBuilder("g1", mockStore).Build()
started = make(chan struct{})
var once sync.Once
mockCore.On("WatchSupported").Return(false).Once()
mockCore.On("List", mock.Anything, mock.Anything).Once().Run(func(args mock.Arguments) {
once.Do(func() {
close(started)
})
panic("store panic")
})
rt.retryLoopWait = time.Minute
rt.Start()
waitDone(started, 5*time.Second)
time.Sleep(time.Millisecond)
stopped = make(chan struct{})
go func() {
rt.Stop()
close(stopped)
}()
waitDone(stopped, 5*time.Second)
mockCore.AssertExpectations(t)
}