Files
tidb/pkg/lightning/backend/external/util_test.go

702 lines
23 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package external
import (
"context"
"encoding/json"
"fmt"
"slices"
"testing"
"github.com/pingcap/tidb/pkg/ingestor/engineapi"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/stretchr/testify/require"
)
func TestSeekPropsOffsets(t *testing.T) {
ctx := context.Background()
store := objstore.NewMemStorage()
rc1 := &rangePropertiesCollector{
props: []*rangeProperty{
{
firstKey: []byte("key1"),
offset: 10,
},
{
firstKey: []byte("key3"),
offset: 30,
},
{
firstKey: []byte("key5"),
offset: 50,
},
},
}
file1 := "/test1"
w1, err := store.Create(ctx, file1, nil)
require.NoError(t, err)
_, err = w1.Write(ctx, rc1.encode())
require.NoError(t, err)
err = w1.Close(ctx)
require.NoError(t, err)
rc2 := &rangePropertiesCollector{
props: []*rangeProperty{
{
firstKey: []byte("key2"),
offset: 20,
},
{
firstKey: []byte("key4"),
offset: 40,
},
},
}
file2 := "/test2"
w2, err := store.Create(ctx, file2, nil)
require.NoError(t, err)
_, err = w2.Write(ctx, rc2.encode())
require.NoError(t, err)
err = w2.Close(ctx)
require.NoError(t, err)
got, err := seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 20}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key2.6")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 20}, {10, 20}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{30, 20}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key2.5"), []byte("key3")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 20}, {30, 20}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{0, 0}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 0}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{0, 0}, {10, 0}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{50, 40}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999"), []byte("key999")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{50, 40}, {50, 40}}, got)
file3 := "/test3"
w3, err := store.Create(ctx, file3, nil)
require.NoError(t, err)
err = w3.Close(ctx)
require.NoError(t, err)
file4 := "/test4"
w4, err := store.Create(ctx, file4, nil)
require.NoError(t, err)
_, err = w4.Write(ctx, rc1.encode())
require.NoError(t, err)
err = w4.Close(ctx)
require.NoError(t, err)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3")}, []string{file1, file2, file3, file4}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{30, 20, 0, 30}}, got)
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key3"), []byte("key999")}, []string{file1, file2, file3, file4}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{30, 20, 0, 30}, {50, 40, 0, 50}}, got)
}
func TestGetAllFileNames(t *testing.T) {
ctx := context.Background()
store := objstore.NewMemStorage()
w := NewWriterBuilder().
SetMemorySizeLimit(10*(lengthBytes*2+2)).
SetBlockSize(10*(lengthBytes*2+2)).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", "0")
keys := make([][]byte, 0, 30)
values := make([][]byte, 0, 30)
for i := range 30 {
keys = append(keys, []byte{byte(i)})
values = append(values, []byte{byte(i)})
}
for i, key := range keys {
err := w.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
err := w.Close(ctx)
require.NoError(t, err)
w2 := NewWriterBuilder().
SetMemorySizeLimit(10*(lengthBytes*2+2)).
SetBlockSize(10*(lengthBytes*2+2)).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", "3")
for i, key := range keys {
err := w2.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
require.NoError(t, err)
err = w2.Close(ctx)
require.NoError(t, err)
w3 := NewWriterBuilder().
SetMemorySizeLimit(10*(lengthBytes*2+2)).
SetBlockSize(10*(lengthBytes*2+2)).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", "12")
for i, key := range keys {
err := w3.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
err = w3.Close(ctx)
require.NoError(t, err)
filenames, err := GetAllFileNames(ctx, store, "subtask")
require.NoError(t, err)
filenames = removePartitionPrefix(t, filenames)
require.Equal(t, []string{
"/subtask/0/0", "/subtask/0/1", "/subtask/0/2",
"/subtask/0_stat/0", "/subtask/0_stat/1", "/subtask/0_stat/2",
"/subtask/12/0", "/subtask/12/1", "/subtask/12/2",
"/subtask/12_stat/0", "/subtask/12_stat/1", "/subtask/12_stat/2",
"/subtask/3/0", "/subtask/3/1", "/subtask/3/2",
"/subtask/3_stat/0", "/subtask/3_stat/1", "/subtask/3_stat/2",
}, filenames)
}
func TestCleanUpFiles(t *testing.T) {
ctx := context.Background()
store := objstore.NewMemStorage()
w := NewWriterBuilder().
SetMemorySizeLimit(10*(lengthBytes*2+2)).
SetBlockSize(10*(lengthBytes*2+2)).
SetPropSizeDistance(5).
SetPropKeysDistance(3).
Build(store, "/subtask", "0")
keys := make([][]byte, 0, 30)
values := make([][]byte, 0, 30)
for i := range 30 {
keys = append(keys, []byte{byte(i)})
values = append(values, []byte{byte(i)})
}
for i, key := range keys {
err := w.WriteRow(ctx, key, values[i], nil)
require.NoError(t, err)
}
err := w.Close(ctx)
require.NoError(t, err)
filenames, err := GetAllFileNames(ctx, store, "subtask")
require.NoError(t, err)
filenames = removePartitionPrefix(t, filenames)
require.Equal(t, []string{
"/subtask/0/0", "/subtask/0/1", "/subtask/0/2",
"/subtask/0_stat/0", "/subtask/0_stat/1", "/subtask/0_stat/2",
}, filenames)
require.NoError(t, CleanUpFiles(ctx, store, "subtask"))
filenames, err = GetAllFileNames(ctx, store, "subtask")
require.NoError(t, err)
require.Equal(t, []string(nil), filenames)
}
func TestGetMaxOverlapping(t *testing.T) {
// [1, 3), [2, 4)
points := []Endpoint{
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1},
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1},
}
require.EqualValues(t, 2, GetMaxOverlapping(points))
// [1, 3), [2, 4), [3, 5)
points = []Endpoint{
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{3}, Tp: ExclusiveEnd, Weight: 1},
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{4}, Tp: ExclusiveEnd, Weight: 1},
{Key: []byte{3}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{5}, Tp: ExclusiveEnd, Weight: 1},
}
require.EqualValues(t, 2, GetMaxOverlapping(points))
// [1, 3], [2, 4], [3, 5]
points = []Endpoint{
{Key: []byte{1}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{3}, Tp: InclusiveEnd, Weight: 1},
{Key: []byte{2}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{4}, Tp: InclusiveEnd, Weight: 1},
{Key: []byte{3}, Tp: InclusiveStart, Weight: 1},
{Key: []byte{5}, Tp: InclusiveEnd, Weight: 1},
}
require.EqualValues(t, 3, GetMaxOverlapping(points))
}
func TestSortedKVMeta(t *testing.T) {
summary := []*WriterSummary{
{
Min: []byte("a"),
Max: []byte("b"),
TotalSize: 123,
MultipleFilesStats: []MultipleFilesStat{
{
Filenames: [][2]string{
{"f1", "stat1"},
{"f2", "stat2"},
},
},
},
ConflictInfo: engineapi.ConflictInfo{Count: 1, Files: []string{"a.txt"}},
},
{
Min: []byte("x"),
Max: []byte("y"),
TotalSize: 177,
MultipleFilesStats: []MultipleFilesStat{
{
Filenames: [][2]string{
{"f3", "stat3"},
{"f4", "stat4"},
},
},
},
},
}
meta0 := NewSortedKVMeta(summary[0])
require.Equal(t, []byte("a"), meta0.StartKey)
require.Equal(t, []byte{'b', 0}, meta0.EndKey)
require.Equal(t, uint64(123), meta0.TotalKVSize)
require.Equal(t, summary[0].MultipleFilesStats, meta0.MultipleFilesStats)
require.EqualValues(t, engineapi.ConflictInfo{Count: 1, Files: []string{"a.txt"}}, meta0.ConflictInfo)
meta1 := NewSortedKVMeta(summary[1])
require.Equal(t, []byte("x"), meta1.StartKey)
require.Equal(t, []byte{'y', 0}, meta1.EndKey)
require.Equal(t, uint64(177), meta1.TotalKVSize)
require.Equal(t, summary[1].MultipleFilesStats, meta1.MultipleFilesStats)
require.EqualValues(t, engineapi.ConflictInfo{}, meta1.ConflictInfo)
meta0.MergeSummary(summary[1])
require.Equal(t, []byte("a"), meta0.StartKey)
require.Equal(t, []byte{'y', 0}, meta0.EndKey)
require.Equal(t, uint64(300), meta0.TotalKVSize)
mergedStats := slices.Clone(summary[0].MultipleFilesStats)
mergedStats = append(mergedStats, summary[1].MultipleFilesStats...)
require.Equal(t, mergedStats, meta0.MultipleFilesStats)
require.EqualValues(t, engineapi.ConflictInfo{Count: 1, Files: []string{"a.txt"}}, meta0.ConflictInfo)
meta00 := NewSortedKVMeta(summary[0])
meta00.Merge(meta1)
require.Equal(t, meta0, meta00)
meta0.MergeSummary(&WriterSummary{Min: []byte("xx"), Max: []byte("yy"), ConflictInfo: engineapi.ConflictInfo{Count: 2, Files: []string{"b.txt"}}})
require.EqualValues(t, engineapi.ConflictInfo{Count: 3, Files: []string{"a.txt", "b.txt"}}, meta0.ConflictInfo)
}
func TestKeyMinMax(t *testing.T) {
require.Equal(t, []byte("a"), BytesMin([]byte("a"), []byte("b")))
require.Equal(t, []byte("a"), BytesMin([]byte("b"), []byte("a")))
require.Equal(t, []byte("b"), BytesMax([]byte("a"), []byte("b")))
require.Equal(t, []byte("b"), BytesMax([]byte("b"), []byte("a")))
}
func TestMarshalFields(t *testing.T) {
type Example struct {
X string
Y int `json:"y"`
}
testCases := []struct {
name string
instInternal any
instExternal any
expectedMarshal string
expectedOmit string
}{
{
name: "non-public",
instInternal: struct {
a int
}{a: 42},
instExternal: struct {
a int `external:"true"`
}{a: 42},
expectedMarshal: `{}`,
expectedOmit: `{}`,
},
{
name: "-",
instInternal: struct {
A string `json:"-"`
}{A: "42"},
instExternal: struct {
A string `json:"-" external:"true"`
}{A: "42"},
expectedMarshal: `{}`,
expectedOmit: `{}`,
},
{
name: "omitempty",
instInternal: struct {
A string `json:"a,omitempty"`
}{A: ""},
instExternal: struct {
A string `json:"a,omitempty" external:"true"`
}{A: ""},
expectedMarshal: `{}`,
expectedOmit: `{}`,
},
{
name: "int",
instInternal: struct {
A int
}{A: 42},
instExternal: struct {
A int `external:"true"`
}{A: 42},
expectedMarshal: `{"A":42}`,
expectedOmit: `{}`,
},
{
name: "rename",
instInternal: struct {
A string `json:"a"`
}{A: "42"},
instExternal: struct {
A string `json:"a" external:"true"`
}{A: "42"},
expectedMarshal: `{"a":"42"}`,
expectedOmit: `{}`,
},
{
name: "embed",
instInternal: struct {
Example
}{Example: Example{X: "42", Y: 42}},
instExternal: struct {
Example `external:"true"`
}{Example: Example{X: "42", Y: 42}},
expectedMarshal: `{"X":"42","y":42}`,
expectedOmit: `{}`,
},
{
name: "nested",
instInternal: struct {
Example Example
}{Example: Example{X: "42", Y: 42}},
instExternal: struct {
Example Example `external:"true"`
}{Example: Example{X: "42", Y: 42}},
expectedMarshal: `{"Example":{"X":"42","y":42}}`,
expectedOmit: `{}`,
},
{
name: "inline",
instInternal: struct {
Example `json:",inline"`
}{Example: Example{X: "42", Y: 42}},
instExternal: struct {
Example `json:",inline" external:"true"`
}{Example: Example{X: "42", Y: 42}},
expectedMarshal: `{"X":"42","y":42}`,
expectedOmit: `{}`,
},
{
name: "slice",
instInternal: struct {
A []string
}{A: []string{"42"}},
instExternal: struct {
A []string `external:"true"`
}{A: []string{"42"}},
expectedMarshal: `{"A":["42"]}`,
expectedOmit: `{}`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
data, err := marshalInternalFields(tc.instInternal)
require.NoError(t, err)
require.Equal(t, tc.expectedMarshal, string(data))
data, err = marshalExternalFields(tc.instInternal)
require.NoError(t, err)
require.Equal(t, tc.expectedOmit, string(data))
data, err = marshalInternalFields(tc.instExternal)
require.NoError(t, err)
require.Equal(t, tc.expectedOmit, string(data))
data, err = marshalExternalFields(tc.instExternal)
require.NoError(t, err)
require.Equal(t, tc.expectedMarshal, string(data))
})
}
}
func TestReadWriteJSON(t *testing.T) {
ctx := context.Background()
store := objstore.NewMemStorage()
type testStruct struct {
BaseExternalMeta
X int
Y string `external:"true"`
}
ts := &testStruct{
X: 42,
Y: "test",
}
data, err := ts.BaseExternalMeta.Marshal(ts)
require.NoError(t, err)
js, err := json.Marshal(ts)
require.NoError(t, err)
require.Equal(t, data, js)
ts.BaseExternalMeta.ExternalPath = "/test"
err = ts.WriteJSONToExternalStorage(ctx, store, ts)
require.NoError(t, err)
data, err = ts.BaseExternalMeta.Marshal(ts)
require.NoError(t, err)
var ts1 testStruct
err = ts1.ReadJSONFromExternalStorage(ctx, store, &ts1)
require.NoError(t, err)
require.NotEqual(t, ts, ts1)
var ts2 testStruct
err = json.Unmarshal(data, &ts2)
require.NoError(t, err)
require.NotEqual(t, ts, ts2)
err = ts2.ReadJSONFromExternalStorage(ctx, store, &ts2)
require.NoError(t, err)
require.Equal(t, *ts, ts2)
}
func TestExternalMetaPath(t *testing.T) {
require.Equal(t, "1/plan/merge-sort/1/meta.json", PlanMetaPath(1, "merge-sort", 1))
require.Equal(t, "2/plan/ingest/3/meta.json", PlanMetaPath(2, "ingest", 3))
require.Equal(t, "1/1/meta.json", SubtaskMetaPath(1, 1))
require.Equal(t, "2/3/meta.json", SubtaskMetaPath(2, 3))
}
func TestRemoveDuplicates(t *testing.T) {
valGetter := func(e *int) []byte {
return []byte{byte(*e)}
}
cases := []struct {
in []int
out []int
dups []int
}{
// no duplicates
{in: []int{}, out: []int{}, dups: []int{}},
{in: []int{1}, out: []int{1}, dups: []int{}},
{in: []int{1, 2}, out: []int{1, 2}, dups: []int{}},
{in: []int{1, 2, 3}, out: []int{1, 2, 3}, dups: []int{}},
{in: []int{1, 2, 3, 4, 5}, out: []int{1, 2, 3, 4, 5}, dups: []int{}},
// duplicates at beginning
{in: []int{1, 1}, out: []int{}, dups: []int{1, 1}},
{in: []int{1, 1, 1}, out: []int{}, dups: []int{1, 1, 1}},
{in: []int{1, 1, 2, 3}, out: []int{2, 3}, dups: []int{1, 1}},
{in: []int{1, 1, 1, 2, 3}, out: []int{2, 3}, dups: []int{1, 1, 1}},
// duplicates in middle
{in: []int{1, 2, 2, 3}, out: []int{1, 3}, dups: []int{2, 2}},
{in: []int{1, 2, 2, 2, 3}, out: []int{1, 3}, dups: []int{2, 2, 2}},
{in: []int{1, 2, 2, 2, 3, 3, 4}, out: []int{1, 4}, dups: []int{2, 2, 2, 3, 3}},
{in: []int{1, 2, 2, 2, 3, 3, 4, 4, 5}, out: []int{1, 5}, dups: []int{2, 2, 2, 3, 3, 4, 4}},
{in: []int{1, 2, 2, 2, 3, 4, 4, 5}, out: []int{1, 3, 5}, dups: []int{2, 2, 2, 4, 4}},
{in: []int{1, 2, 2, 2, 3, 4, 4, 5, 5, 6, 7, 8, 8, 9}, out: []int{1, 3, 6, 7, 9}, dups: []int{2, 2, 2, 4, 4, 5, 5, 8, 8}},
// duplicates at end
{in: []int{1, 2, 3, 3}, out: []int{1, 2}, dups: []int{3, 3}},
{in: []int{1, 2, 3, 3, 3}, out: []int{1, 2}, dups: []int{3, 3, 3}},
// mixing
{in: []int{1, 1, 2, 3, 3, 4}, out: []int{2, 4}, dups: []int{1, 1, 3, 3}},
{in: []int{1, 2, 3, 3, 4, 4}, out: []int{1, 2}, dups: []int{3, 3, 4, 4}},
{in: []int{1, 1, 2, 3, 4, 4}, out: []int{2, 3}, dups: []int{1, 1, 4, 4}},
{in: []int{1, 1, 2, 2, 3, 3}, out: []int{}, dups: []int{1, 1, 2, 2, 3, 3}},
{in: []int{1, 1, 2, 2, 2, 3, 3}, out: []int{}, dups: []int{1, 1, 2, 2, 2, 3, 3}},
{in: []int{1, 1, 2, 2, 2, 3, 3, 4, 4}, out: []int{}, dups: []int{1, 1, 2, 2, 2, 3, 3, 4, 4}},
{in: []int{1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5}, out: []int{}, dups: []int{1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5}},
{in: []int{1, 1, 2, 2, 2, 3, 4, 4, 5, 5}, out: []int{3}, dups: []int{1, 1, 2, 2, 2, 4, 4, 5, 5}},
{in: []int{1, 1, 2, 2, 2, 3, 4, 4, 5, 5, 6, 7, 8, 8, 9, 9}, out: []int{3, 6, 7}, dups: []int{1, 1, 2, 2, 2, 4, 4, 5, 5, 8, 8, 9, 9}},
}
for i, c := range cases {
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
require.True(t, slices.IsSorted(c.in))
require.True(t, slices.IsSorted(c.out))
require.True(t, slices.IsSorted(c.dups))
require.Equal(t, len(c.dups), len(c.in)-len(c.out))
tmpIn := make([]int, len(c.in))
copy(tmpIn, c.in)
out, dups, dupCnt := removeDuplicates(tmpIn, valGetter, true)
require.EqualValues(t, c.out, out)
require.EqualValues(t, c.dups, dups)
require.Equal(t, dupCnt, len(dups))
tmpIn = make([]int, len(c.in))
copy(tmpIn, c.in)
out, dups, dupCnt = removeDuplicates(tmpIn, valGetter, false)
require.EqualValues(t, c.out, out)
require.Empty(t, dups)
require.Equal(t, dupCnt, len(c.dups))
})
}
}
func TestRemoveDuplicatesMoreThan2(t *testing.T) {
valGetter := func(e *int) []byte {
return []byte{byte(*e)}
}
cases := []struct {
in []int
out []int
dups []int
total int
}{
// no duplicates
{in: []int{}, out: []int{}, dups: []int{}, total: 0},
{in: []int{1}, out: []int{1}, dups: []int{}, total: 0},
{in: []int{1, 2}, out: []int{1, 2}, dups: []int{}, total: 0},
{in: []int{1, 2, 3}, out: []int{1, 2, 3}, dups: []int{}, total: 0},
{in: []int{1, 2, 3, 4, 5}, out: []int{1, 2, 3, 4, 5}, dups: []int{}, total: 0},
// duplicates at beginning
{in: []int{1, 1}, out: []int{1, 1}, dups: []int{}, total: 2},
{in: []int{1, 1, 1}, out: []int{1, 1}, dups: []int{1}, total: 3},
{in: []int{1, 1, 1, 1}, out: []int{1, 1}, dups: []int{1, 1}, total: 4},
{in: []int{1, 1, 1, 1, 1}, out: []int{1, 1}, dups: []int{1, 1, 1}, total: 5},
{in: []int{1, 1, 2, 3}, out: []int{1, 1, 2, 3}, dups: []int{}, total: 2},
{in: []int{1, 1, 1, 2, 3}, out: []int{1, 1, 2, 3}, dups: []int{1}, total: 3},
{in: []int{1, 1, 1, 1, 2, 3}, out: []int{1, 1, 2, 3}, dups: []int{1, 1}, total: 4},
// duplicates in middle
{in: []int{1, 2, 2, 3}, out: []int{1, 2, 2, 3}, dups: []int{}, total: 2},
{in: []int{1, 2, 2, 2, 3}, out: []int{1, 2, 2, 3}, dups: []int{2}, total: 3},
{in: []int{1, 2, 2, 2, 2, 3}, out: []int{1, 2, 2, 3}, dups: []int{2, 2}, total: 4},
{in: []int{1, 2, 2, 2, 2, 2, 3}, out: []int{1, 2, 2, 3}, dups: []int{2, 2, 2}, total: 5},
{in: []int{1, 2, 2, 2, 3, 3, 4}, out: []int{1, 2, 2, 3, 3, 4}, dups: []int{2}, total: 5},
{in: []int{1, 2, 2, 2, 3, 3, 4, 4, 5}, out: []int{1, 2, 2, 3, 3, 4, 4, 5}, dups: []int{2}, total: 7},
{in: []int{1, 2, 2, 2, 3, 4, 4, 5}, out: []int{1, 2, 2, 3, 4, 4, 5}, dups: []int{2}, total: 5},
{in: []int{1, 2, 2, 2, 3, 4, 4, 5, 5, 5, 6, 7, 8, 8, 9}, out: []int{1, 2, 2, 3, 4, 4, 5, 5, 6, 7, 8, 8, 9}, dups: []int{2, 5}, total: 10},
// duplicates at end
{in: []int{1, 2, 3, 3}, out: []int{1, 2, 3, 3}, dups: []int{}, total: 2},
{in: []int{1, 2, 3, 3, 3}, out: []int{1, 2, 3, 3}, dups: []int{3}, total: 3},
{in: []int{1, 2, 3, 3, 3, 3}, out: []int{1, 2, 3, 3}, dups: []int{3, 3}, total: 4},
{in: []int{1, 2, 3, 3, 3, 3, 3}, out: []int{1, 2, 3, 3}, dups: []int{3, 3, 3}, total: 5},
// mixing
{in: []int{1, 1, 1, 1, 1, 2, 3, 3, 3, 4}, out: []int{1, 1, 2, 3, 3, 4}, dups: []int{1, 1, 1, 3}, total: 8},
{in: []int{1, 2, 3, 3, 3, 4, 4, 4}, out: []int{1, 2, 3, 3, 4, 4}, dups: []int{3, 4}, total: 6},
{in: []int{1, 1, 1, 2, 3, 4, 4, 4}, out: []int{1, 1, 2, 3, 4, 4}, dups: []int{1, 4}, total: 6},
{in: []int{1, 1, 1, 2, 2, 2, 3, 3, 3}, out: []int{1, 1, 2, 2, 3, 3}, dups: []int{1, 2, 3}, total: 9},
{in: []int{1, 1, 2, 2, 2, 3, 3}, out: []int{1, 1, 2, 2, 3, 3}, dups: []int{2}, total: 7},
{in: []int{1, 1, 2, 2, 2, 3, 3, 4, 4, 4}, out: []int{1, 1, 2, 2, 3, 3, 4, 4}, dups: []int{2, 4}, total: 10},
{in: []int{1, 1, 2, 2, 2, 3, 3, 4, 4, 4, 5, 5}, out: []int{1, 1, 2, 2, 3, 3, 4, 4, 5, 5}, dups: []int{2, 4}, total: 12},
{in: []int{1, 1, 2, 2, 2, 3, 4, 4, 4, 5, 5, 5}, out: []int{1, 1, 2, 2, 3, 4, 4, 5, 5}, dups: []int{2, 4, 5}, total: 11},
{in: []int{1, 1, 2, 2, 2, 3, 4, 4, 5, 5, 5, 6, 7, 8, 8, 9, 9}, out: []int{1, 1, 2, 2, 3, 4, 4, 5, 5, 6, 7, 8, 8, 9, 9}, dups: []int{2, 5}, total: 14},
}
for i, c := range cases {
t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) {
require.True(t, slices.IsSorted(c.in))
require.True(t, slices.IsSorted(c.out))
require.True(t, slices.IsSorted(c.dups))
require.Equal(t, len(c.dups), len(c.in)-len(c.out))
tmpIn := make([]int, len(c.in))
copy(tmpIn, c.in)
out, dups, totalDup := removeDuplicatesMoreThanTwo(tmpIn, valGetter)
require.EqualValues(t, c.out, out)
require.EqualValues(t, c.dups, dups)
require.Equal(t, c.total, totalDup)
})
}
}
func TestDivideMergeSortDataFilesBasic(t *testing.T) {
testCases := []struct {
fileCnt int
nodeCnt int
expectedSizes []int
}{
{31, 3, []int{31}},
{64, 2, []int{32, 32}},
{64, 3, []int{32, 32}},
{127, 3, []int{43, 42, 42}},
{128, 3, []int{43, 43, 42}},
{4000, 6, []int{667, 667, 667, 667, 666, 666}},
{4000, 7, []int{572, 572, 572, 571, 571, 571, 571}},
{40000, 7, []int{4000, 4000, 4000, 4000, 4000, 4000, 4000, 1715, 1715, 1714, 1714, 1714, 1714, 1714}},
{31000, 7, []int{4000, 4000, 4000, 4000, 4000, 4000, 4000, 429, 429, 429, 429, 428, 428, 428}},
{28100, 7, []int{4000, 4000, 4000, 4000, 4000, 4000, 4000, 34, 33, 33}},
{28031, 7, []int{4000, 4000, 4000, 4000, 4000, 4000, 4000, 31}},
}
for _, tc := range testCases {
name := fmt.Sprintf("distribute %d files to %d nodes", tc.fileCnt, tc.nodeCnt)
t.Run(name, func(t *testing.T) {
items := make([]string, tc.fileCnt)
result, err := DivideMergeSortDataFiles(items, tc.nodeCnt, 16)
require.NoError(t, err)
actualSizes := make([]int, len(result))
for i, batch := range result {
actualSizes[i] = len(batch)
}
require.EqualValues(t, tc.expectedSizes, actualSizes)
})
}
}
func TestDivideMergeSortDataFilesSubtaskCount(t *testing.T) {
const Concurrency = 16
for _, fileCount := range []int{3000, 4000, 40000, 400000, 712345, 1000000} {
for _, nodeCount := range []int{1, 3, 7, 16, 30, 60, 97} {
dataFiles := make([]string, fileCount)
dataFilesGroup, err := DivideMergeSortDataFiles(dataFiles, nodeCount, Concurrency)
require.NoError(t, err)
var totalTargetFileCount int
for _, dataFiles := range dataFilesGroup {
totalTargetFileCount += len(splitDataFiles(dataFiles, Concurrency))
}
t.Logf("nodeCount: %d, fileCount: %d, subtaskCount:%d, totalTargetFileCount: %d",
nodeCount, fileCount, len(dataFilesGroup), totalTargetFileCount)
require.LessOrEqual(t, len(dataFilesGroup), 250)
require.LessOrEqual(t, totalTargetFileCount, 4000)
}
}
}