409 lines
12 KiB
Go
409 lines
12 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package api
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestGetTimerOption(t *testing.T) {
|
|
var cond TimerCond
|
|
require.Empty(t, cond.FieldsSet())
|
|
|
|
// test 'Key' field
|
|
require.False(t, cond.Key.Present())
|
|
|
|
WithKey("k1")(&cond)
|
|
key, ok := cond.Key.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, "k1", key)
|
|
require.False(t, cond.KeyPrefix)
|
|
require.Equal(t, []string{"Key"}, cond.FieldsSet())
|
|
|
|
WithKeyPrefix("k2")(&cond)
|
|
key, ok = cond.Key.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, "k2", key)
|
|
require.True(t, cond.KeyPrefix)
|
|
require.Equal(t, []string{"Key"}, cond.FieldsSet())
|
|
|
|
WithKey("k3")(&cond)
|
|
key, ok = cond.Key.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, "k3", key)
|
|
require.False(t, cond.KeyPrefix)
|
|
require.Equal(t, []string{"Key"}, cond.FieldsSet())
|
|
|
|
// test 'ID' field
|
|
require.False(t, cond.ID.Present())
|
|
|
|
WithID("id1")(&cond)
|
|
id, ok := cond.ID.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, "id1", id)
|
|
require.Equal(t, []string{"ID", "Key"}, cond.FieldsSet())
|
|
|
|
// test 'Tags' field
|
|
require.False(t, cond.Tags.Present())
|
|
WithTag("l1", "l2")(&cond)
|
|
tags, ok := cond.Tags.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, []string{"l1", "l2"}, tags)
|
|
require.Equal(t, []string{"ID", "Key", "Tags"}, cond.FieldsSet())
|
|
}
|
|
|
|
func TestUpdateTimerOption(t *testing.T) {
|
|
var update TimerUpdate
|
|
require.Empty(t, update)
|
|
|
|
// test 'Enable' field
|
|
require.False(t, update.Enable.Present())
|
|
|
|
WithSetEnable(true)(&update)
|
|
setEnable, ok := update.Enable.Get()
|
|
require.True(t, ok)
|
|
require.True(t, setEnable)
|
|
require.Equal(t, []string{"Enable"}, update.FieldsSet())
|
|
|
|
WithSetEnable(false)(&update)
|
|
setEnable, ok = update.Enable.Get()
|
|
require.True(t, ok)
|
|
require.False(t, setEnable)
|
|
require.Equal(t, []string{"Enable"}, update.FieldsSet())
|
|
|
|
// test schedule policy
|
|
require.False(t, update.SchedPolicyType.Present())
|
|
require.False(t, update.SchedPolicyExpr.Present())
|
|
|
|
WithSetSchedExpr(SchedEventInterval, "3h")(&update)
|
|
stp, ok := update.SchedPolicyType.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, SchedEventInterval, stp)
|
|
expr, ok := update.SchedPolicyExpr.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, "3h", expr)
|
|
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet())
|
|
|
|
WithSetSchedExpr(SchedEventInterval, "1h")(&update)
|
|
stp, ok = update.SchedPolicyType.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, SchedEventInterval, stp)
|
|
expr, ok = update.SchedPolicyExpr.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, "1h", expr)
|
|
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr"}, update.FieldsSet())
|
|
|
|
// test 'Watermark' field
|
|
require.False(t, update.Watermark.Present())
|
|
|
|
WithSetWatermark(time.Unix(1234, 5678))(&update)
|
|
watermark, ok := update.Watermark.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, time.Unix(1234, 5678), watermark)
|
|
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark"}, update.FieldsSet())
|
|
|
|
// test 'SummaryData' field
|
|
require.False(t, update.SummaryData.Present())
|
|
|
|
WithSetSummaryData([]byte("hello"))(&update)
|
|
summary, ok := update.SummaryData.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, []byte("hello"), summary)
|
|
require.Equal(t, []string{"Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark", "SummaryData"}, update.FieldsSet())
|
|
|
|
// test 'Tags' field
|
|
require.False(t, update.Tags.Present())
|
|
WithSetTags(nil)(&update)
|
|
tags, ok := update.Tags.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, 0, len(tags))
|
|
WithSetTags([]string{"l1", "l2"})(&update)
|
|
tags, ok = update.Tags.Get()
|
|
require.True(t, ok)
|
|
require.Equal(t, []string{"l1", "l2"}, tags)
|
|
require.Equal(t, []string{"Tags", "Enable", "SchedPolicyType", "SchedPolicyExpr", "Watermark", "SummaryData"}, update.FieldsSet())
|
|
}
|
|
|
|
func TestDefaultClient(t *testing.T) {
|
|
store := NewMemoryTimerStore()
|
|
cli := NewDefaultTimerClient(store)
|
|
ctx := context.Background()
|
|
spec := TimerSpec{
|
|
Key: "k1",
|
|
SchedPolicyType: SchedEventInterval,
|
|
SchedPolicyExpr: "1h",
|
|
Data: []byte("data1"),
|
|
Tags: []string{"l1", "l2"},
|
|
}
|
|
|
|
// create
|
|
timer, err := cli.CreateTimer(ctx, spec)
|
|
require.NoError(t, err)
|
|
spec.Namespace = "default"
|
|
require.NotEmpty(t, timer.ID)
|
|
require.Equal(t, spec, timer.TimerSpec)
|
|
require.Equal(t, SchedEventIdle, timer.EventStatus)
|
|
require.Equal(t, "", timer.EventID)
|
|
require.Empty(t, timer.EventData)
|
|
require.Empty(t, timer.SummaryData)
|
|
|
|
// get by id
|
|
got, err := cli.GetTimerByID(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, timer, got)
|
|
|
|
// get by key
|
|
got, err = cli.GetTimerByKey(ctx, timer.Key)
|
|
require.NoError(t, err)
|
|
require.Equal(t, timer, got)
|
|
|
|
// get by key prefix
|
|
tms, err := cli.GetTimers(ctx, WithKeyPrefix("k"))
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(tms))
|
|
require.Equal(t, timer, tms[0])
|
|
|
|
// get by tag
|
|
tms, err = cli.GetTimers(ctx, WithTag("l1"))
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(tms))
|
|
require.Equal(t, timer, tms[0])
|
|
|
|
tms, err = cli.GetTimers(ctx, WithTag("l1", "l2"))
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(tms))
|
|
require.Equal(t, timer, tms[0])
|
|
|
|
tms, err = cli.GetTimers(ctx, WithTag("l3"))
|
|
require.NoError(t, err)
|
|
require.Equal(t, 0, len(tms))
|
|
|
|
// update
|
|
err = cli.UpdateTimer(ctx, timer.ID, WithSetSchedExpr(SchedEventInterval, "3h"))
|
|
require.NoError(t, err)
|
|
timer.SchedPolicyType = SchedEventInterval
|
|
timer.SchedPolicyExpr = "3h"
|
|
got, err = cli.GetTimerByID(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.Greater(t, got.Version, timer.Version)
|
|
timer.Version = got.Version
|
|
require.Equal(t, timer, got)
|
|
|
|
// close event
|
|
eventStart := time.Now().Add(-time.Second)
|
|
err = store.Update(ctx, timer.ID, &TimerUpdate{
|
|
EventStatus: NewOptionalVal(SchedEventTrigger),
|
|
EventID: NewOptionalVal("event1"),
|
|
EventData: NewOptionalVal([]byte("d1")),
|
|
SummaryData: NewOptionalVal([]byte("s1")),
|
|
EventStart: NewOptionalVal(eventStart),
|
|
EventExtra: NewOptionalVal(EventExtra{
|
|
EventManualRequestID: "req1",
|
|
EventWatermark: time.Unix(456, 0),
|
|
}),
|
|
})
|
|
require.NoError(t, err)
|
|
err = cli.CloseTimerEvent(ctx, timer.ID, "event2")
|
|
require.True(t, errors.ErrorEqual(ErrEventIDNotMatch, err))
|
|
|
|
err = cli.CloseTimerEvent(ctx, timer.ID, "event2", WithSetSchedExpr(SchedEventInterval, "1h"))
|
|
require.EqualError(t, err, "The field(s) [SchedPolicyType, SchedPolicyExpr] are not allowed to update when close event")
|
|
|
|
err = cli.CloseTimerEvent(ctx, timer.ID, "event1")
|
|
require.NoError(t, err)
|
|
timer, err = cli.GetTimerByID(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, SchedEventIdle, timer.EventStatus)
|
|
require.Empty(t, timer.EventID)
|
|
require.Empty(t, timer.EventData)
|
|
require.True(t, timer.EventStart.IsZero())
|
|
require.Equal(t, []byte("s1"), timer.SummaryData)
|
|
require.Equal(t, eventStart, timer.Watermark)
|
|
require.Equal(t, EventExtra{}, timer.EventExtra)
|
|
|
|
// close event with option
|
|
err = store.Update(ctx, timer.ID, &TimerUpdate{
|
|
EventID: NewOptionalVal("event1"),
|
|
EventData: NewOptionalVal([]byte("d1")),
|
|
SummaryData: NewOptionalVal([]byte("s1")),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
watermark := time.Now().Add(time.Hour)
|
|
err = cli.CloseTimerEvent(ctx, timer.ID, "event1", WithSetWatermark(watermark), WithSetSummaryData([]byte("s2")))
|
|
require.NoError(t, err)
|
|
timer, err = cli.GetTimerByID(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, SchedEventIdle, timer.EventStatus)
|
|
require.Empty(t, timer.EventID)
|
|
require.Empty(t, timer.EventData)
|
|
require.True(t, timer.EventStart.IsZero())
|
|
require.Equal(t, []byte("s2"), timer.SummaryData)
|
|
require.Equal(t, watermark, timer.Watermark)
|
|
|
|
// manual trigger
|
|
err = store.Update(ctx, timer.ID, &TimerUpdate{
|
|
EventID: NewOptionalVal("event1"),
|
|
EventData: NewOptionalVal([]byte("d1")),
|
|
SummaryData: NewOptionalVal([]byte("s1")),
|
|
})
|
|
require.NoError(t, err)
|
|
reqID, err := cli.ManualTriggerEvent(ctx, timer.ID)
|
|
require.Empty(t, reqID)
|
|
require.EqualError(t, err, "manual trigger is not allowed when event is not closed")
|
|
|
|
require.NoError(t, cli.CloseTimerEvent(ctx, timer.ID, "event1"))
|
|
require.NoError(t, cli.UpdateTimer(ctx, timer.ID, WithSetEnable(false)))
|
|
reqID, err = cli.ManualTriggerEvent(ctx, timer.ID)
|
|
require.Empty(t, reqID)
|
|
require.EqualError(t, err, "manual trigger is not allowed when timer is disabled")
|
|
|
|
require.NoError(t, cli.UpdateTimer(ctx, timer.ID, WithSetEnable(true)))
|
|
now := time.Now()
|
|
reqID, err = cli.ManualTriggerEvent(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, reqID)
|
|
timer, err = cli.GetTimerByID(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, timer.ManualRequestID)
|
|
require.GreaterOrEqual(t, timer.ManualRequestTime.Unix(), now.Unix())
|
|
require.Less(t, timer.ManualRequestTime.Sub(now), 10*time.Second)
|
|
require.Equal(t, ManualRequest{
|
|
ManualRequestID: reqID,
|
|
ManualRequestTime: timer.ManualRequestTime,
|
|
ManualTimeout: 2 * time.Minute,
|
|
}, timer.ManualRequest)
|
|
|
|
// close manual triggered event
|
|
manualRequest := timer.ManualRequest.SetProcessed("event1")
|
|
err = store.Update(ctx, timer.ID, &TimerUpdate{
|
|
ManualRequest: NewOptionalVal(manualRequest),
|
|
EventExtra: NewOptionalVal(EventExtra{
|
|
EventManualRequestID: manualRequest.ManualRequestID,
|
|
EventWatermark: timer.Watermark,
|
|
}),
|
|
EventID: NewOptionalVal("event1"),
|
|
EventStart: NewOptionalVal(time.Now()),
|
|
EventStatus: NewOptionalVal(SchedEventTrigger),
|
|
})
|
|
require.NoError(t, err)
|
|
err = cli.CloseTimerEvent(ctx, timer.ID, "event1")
|
|
require.NoError(t, err)
|
|
timer, err = cli.GetTimerByID(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, manualRequest, timer.ManualRequest)
|
|
require.Equal(t, EventExtra{}, timer.EventExtra)
|
|
|
|
// delete
|
|
exit, err := cli.DeleteTimer(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.True(t, exit)
|
|
|
|
// delete no exist
|
|
exit, err = cli.DeleteTimer(ctx, timer.ID)
|
|
require.NoError(t, err)
|
|
require.False(t, exit)
|
|
}
|
|
|
|
type injectedTimerStore struct {
|
|
*TimerStore
|
|
beforeUpdate func()
|
|
}
|
|
|
|
func (s *injectedTimerStore) Update(ctx context.Context, timerID string, update *TimerUpdate) error {
|
|
if s.beforeUpdate != nil {
|
|
s.beforeUpdate()
|
|
}
|
|
return s.TimerStore.Update(ctx, timerID, update)
|
|
}
|
|
|
|
func TestDefaultClientManualTriggerRetry(t *testing.T) {
|
|
inject := &injectedTimerStore{
|
|
TimerStore: NewMemoryTimerStore(),
|
|
}
|
|
|
|
store := &TimerStore{
|
|
TimerStoreCore: inject,
|
|
}
|
|
cli := NewDefaultTimerClient(store)
|
|
cli.(*defaultTimerClient).retryBackoff = 1
|
|
ctx := context.Background()
|
|
spec := TimerSpec{
|
|
Key: "k1",
|
|
SchedPolicyType: SchedEventInterval,
|
|
SchedPolicyExpr: "1h",
|
|
Data: []byte("data1"),
|
|
Tags: []string{"l1", "l2"},
|
|
Enable: true,
|
|
}
|
|
|
|
timer, err := cli.CreateTimer(ctx, spec)
|
|
require.NoError(t, err)
|
|
timerID := timer.ID
|
|
|
|
// retry and success
|
|
updateTimes := 0
|
|
inject.beforeUpdate = func() {
|
|
updateTimes++
|
|
if updateTimes < 3 {
|
|
err = inject.TimerStore.Update(context.TODO(), timerID, &TimerUpdate{
|
|
Watermark: NewOptionalVal(time.Now()),
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
reqID, err := cli.ManualTriggerEvent(context.TODO(), timerID)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, reqID)
|
|
require.Equal(t, 3, updateTimes)
|
|
|
|
// max retry
|
|
inject.beforeUpdate = func() {
|
|
err = inject.TimerStore.Update(context.TODO(), timerID, &TimerUpdate{
|
|
Watermark: NewOptionalVal(time.Now()),
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
reqID, err = cli.ManualTriggerEvent(context.TODO(), timerID)
|
|
require.EqualError(t, err, "timer version not match")
|
|
require.Empty(t, reqID)
|
|
|
|
// retry to other error
|
|
updateTimes = 0
|
|
inject.beforeUpdate = func() {
|
|
updateTimes++
|
|
if updateTimes < 3 {
|
|
err = inject.TimerStore.Update(context.TODO(), timerID, &TimerUpdate{
|
|
Watermark: NewOptionalVal(time.Now()),
|
|
})
|
|
require.NoError(t, err)
|
|
} else {
|
|
err = inject.TimerStore.Update(context.TODO(), timerID, &TimerUpdate{
|
|
Enable: NewOptionalVal(false),
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
reqID, err = cli.ManualTriggerEvent(context.TODO(), timerID)
|
|
require.EqualError(t, err, "manual trigger is not allowed when timer is disabled")
|
|
require.Empty(t, reqID)
|
|
require.Equal(t, 3, updateTimes)
|
|
}
|