180 lines
7.1 KiB
Go
180 lines
7.1 KiB
Go
// Copyright 2025 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tests
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
|
"github.com/pingcap/tidb/pkg/dxf/framework/proto"
|
|
"github.com/pingcap/tidb/pkg/dxf/framework/schstatus"
|
|
"github.com/pingcap/tidb/pkg/dxf/framework/storage"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/tikv/client-go/v2/util"
|
|
)
|
|
|
|
func runAndCheckReqFn(t *testing.T, code int, resMsg string, doReqFn func() (*http.Response, error)) []byte {
|
|
t.Helper()
|
|
resp, err := doReqFn()
|
|
require.NoError(t, err)
|
|
require.Equal(t, code, resp.StatusCode)
|
|
body, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
require.Contains(t, string(body), resMsg)
|
|
require.NoError(t, resp.Body.Close())
|
|
return body
|
|
}
|
|
|
|
func TestDXFAPI(t *testing.T) {
|
|
if kerneltype.IsClassic() {
|
|
t.Skip("DXF API is only supported in nextgen kernel and only available in the SYSTEM keyspace")
|
|
}
|
|
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)")
|
|
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", "return(8)")
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
|
|
t.Run("schedule status api", func(t *testing.T) {
|
|
// invalid method
|
|
runAndCheckReqFn(t, http.StatusBadRequest, "This api only support GET method", func() (*http.Response, error) {
|
|
return ts.PostStatus("/dxf/schedule/status", "", bytes.NewBuffer([]byte("")))
|
|
})
|
|
// success
|
|
body := runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
|
|
return ts.FetchStatus("/dxf/schedule/status")
|
|
})
|
|
status := schstatus.Status{}
|
|
require.NoError(t, json.Unmarshal(body, &status))
|
|
require.Equal(t, 1, status.TiDBWorker.RequiredCount)
|
|
})
|
|
|
|
t.Run("schedule api", func(t *testing.T) {
|
|
runAndCheckReqFn(t, http.StatusBadRequest, "This api only support POST method", func() (*http.Response, error) {
|
|
return ts.FetchStatus("/dxf/schedule")
|
|
})
|
|
// success
|
|
for _, c := range []struct {
|
|
action string
|
|
enabled bool
|
|
}{
|
|
{action: "pause_scale_in", enabled: true},
|
|
{action: "resume_scale_in", enabled: false},
|
|
} {
|
|
body := runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
|
|
return ts.PostStatus(fmt.Sprintf("/dxf/schedule?action=%s", c.action), "", bytes.NewBuffer([]byte("")))
|
|
})
|
|
param := schstatus.TTLFlag{}
|
|
require.NoError(t, json.Unmarshal(body, ¶m))
|
|
require.Equal(t, c.enabled, param.Enabled)
|
|
if param.Enabled {
|
|
require.Equal(t, time.Hour, param.TTL)
|
|
require.WithinRange(t, param.ExpireTime, time.Now().Add(param.TTL-time.Minute), time.Now().Add(param.TTL))
|
|
}
|
|
}
|
|
})
|
|
|
|
t.Run("task max_runtime_slots api", func(t *testing.T) {
|
|
runAndCheckReqFn(t, http.StatusBadRequest, "This api only support POST method", func() (*http.Response, error) {
|
|
return ts.FetchStatus("/dxf/task/1/max_runtime_slots")
|
|
})
|
|
tm, err := storage.GetTaskManager()
|
|
require.NoError(t, err)
|
|
ctx := util.WithInternalSourceType(context.Background(), kv.InternalDistTask)
|
|
require.NoError(t, tm.InitMeta(ctx, ":4000", ""))
|
|
id, err := tm.CreateTask(ctx, "key1", proto.ImportInto, "", 8, "", 0, proto.ExtraParams{}, []byte("test"))
|
|
require.NoError(t, err)
|
|
|
|
for _, c := range [][2]string{
|
|
{"/dxf/task/0/max_runtime_slots", "invalid task ID"},
|
|
{"/dxf/task/aa/max_runtime_slots", "invalid task ID"},
|
|
{"/dxf/task/1/max_runtime_slots", "invalid value "},
|
|
{"/dxf/task/1/max_runtime_slots?value=aa", "invalid value "},
|
|
{"/dxf/task/1/max_runtime_slots?value=0", "invalid value "},
|
|
{"/dxf/task/1/max_runtime_slots?value=1&target_step=a", "invalid target step"},
|
|
{"/dxf/task/1/max_runtime_slots?value=1&target_step=1&target_step=aa", "invalid target step"},
|
|
{"/dxf/task/1123123/max_runtime_slots?value=1&target_step=1", "task not found"},
|
|
{fmt.Sprintf("/dxf/task/%d/max_runtime_slots?value=10", id), "max runtime slots should be less than required slots(8)"},
|
|
{fmt.Sprintf("/dxf/task/%d/max_runtime_slots?value=6&target_step=100", id), "invalid target step 100 for task type ImportInto"},
|
|
} {
|
|
path, errMsg := c[0], c[1]
|
|
runAndCheckReqFn(t, http.StatusBadRequest, errMsg, func() (*http.Response, error) {
|
|
return ts.PostStatus(path, "", bytes.NewBuffer([]byte("")))
|
|
})
|
|
}
|
|
|
|
body := runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
|
|
return ts.PostStatus(fmt.Sprintf("/dxf/task/%d/max_runtime_slots?value=6&target_step=3", id), "", bytes.NewBuffer([]byte("")))
|
|
})
|
|
out := struct {
|
|
RequiredSlots int `json:"required_slots"`
|
|
MaxRuntimeSlots int `json:"max_runtime_slots"`
|
|
TargetSteps []string `json:"target_steps"`
|
|
}{}
|
|
require.NoError(t, json.Unmarshal(body, &out))
|
|
require.Equal(t, 8, out.RequiredSlots)
|
|
require.Equal(t, 6, out.MaxRuntimeSlots)
|
|
require.Equal(t, []string{"encode"}, out.TargetSteps)
|
|
})
|
|
}
|
|
|
|
func TestDXFScheduleTuneAPI(t *testing.T) {
|
|
if kerneltype.IsClassic() {
|
|
t.Skip("only supported in nextgen kernel and only available in the SYSTEM keyspace")
|
|
}
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
|
|
// no keyspace
|
|
runAndCheckReqFn(t, http.StatusBadRequest, "invalid or empty target keyspace", func() (*http.Response, error) {
|
|
return ts.FetchStatus("/dxf/schedule/tune")
|
|
})
|
|
// when not set, return default value
|
|
runAndCheckReqFn(t, http.StatusBadRequest, "failed to load keyspace", func() (*http.Response, error) {
|
|
return ts.FetchStatus("/dxf/schedule/tune?keyspace=aaa")
|
|
})
|
|
// invalid value
|
|
for _, v := range []float64{0.9, 10.1} {
|
|
runAndCheckReqFn(t, http.StatusBadRequest, "is out of range", func() (*http.Response, error) {
|
|
return ts.PostStatus(fmt.Sprintf("/dxf/schedule/tune?keyspace=SYSTEM&lify_factor=%f", v), "", bytes.NewBuffer([]byte("")))
|
|
})
|
|
}
|
|
// success
|
|
body := runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
|
|
return ts.PostStatus("/dxf/schedule/tune?keyspace=SYSTEM&lify_factor=2&ttl=10h", "", bytes.NewBuffer([]byte("")))
|
|
})
|
|
ttlFactors := &schstatus.TTLTuneFactors{}
|
|
require.NoError(t, json.Unmarshal(body, ttlFactors))
|
|
require.Equal(t, 10*time.Hour, ttlFactors.TTL)
|
|
require.EqualValues(t, 2.0, ttlFactors.AmplifyFactor)
|
|
// get again
|
|
body = runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
|
|
return ts.FetchStatus("/dxf/schedule/tune?keyspace=SYSTEM")
|
|
})
|
|
factors := &schstatus.TuneFactors{}
|
|
require.NoError(t, json.Unmarshal(body, factors))
|
|
require.EqualValues(t, &schstatus.TuneFactors{AmplifyFactor: 2}, factors)
|
|
}
|