252 lines
6.6 KiB
Go
252 lines
6.6 KiB
Go
// Copyright 2021 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cteutil
|
|
|
|
import (
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestStorageBasic(t *testing.T) {
|
|
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLong)}
|
|
chkSize := 1
|
|
storage := NewStorageRowContainer(fields, chkSize)
|
|
require.NotNil(t, storage)
|
|
|
|
// Close before open.
|
|
err := storage.DerefAndClose()
|
|
require.Error(t, err)
|
|
|
|
err = storage.OpenAndRef()
|
|
require.NoError(t, err)
|
|
|
|
err = storage.DerefAndClose()
|
|
require.NoError(t, err)
|
|
|
|
err = storage.DerefAndClose()
|
|
require.Error(t, err)
|
|
|
|
// Open twice.
|
|
err = storage.OpenAndRef()
|
|
require.NoError(t, err)
|
|
err = storage.OpenAndRef()
|
|
require.NoError(t, err)
|
|
err = storage.DerefAndClose()
|
|
require.NoError(t, err)
|
|
err = storage.DerefAndClose()
|
|
require.NoError(t, err)
|
|
err = storage.DerefAndClose()
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestOpenAndClose(t *testing.T) {
|
|
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLong)}
|
|
chkSize := 1
|
|
storage := NewStorageRowContainer(fields, chkSize)
|
|
|
|
for range 10 {
|
|
err := storage.OpenAndRef()
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
for range 9 {
|
|
err := storage.DerefAndClose()
|
|
require.NoError(t, err)
|
|
}
|
|
err := storage.DerefAndClose()
|
|
require.NoError(t, err)
|
|
|
|
err = storage.DerefAndClose()
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestAddAndGetChunk(t *testing.T) {
|
|
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLong)}
|
|
chkSize := 10
|
|
|
|
storage := NewStorageRowContainer(fields, chkSize)
|
|
|
|
inChk := chunk.NewChunkWithCapacity(fields, chkSize)
|
|
for i := range chkSize {
|
|
inChk.AppendInt64(0, int64(i))
|
|
}
|
|
|
|
err := storage.Add(inChk)
|
|
require.Error(t, err)
|
|
|
|
err = storage.OpenAndRef()
|
|
require.NoError(t, err)
|
|
|
|
err = storage.Add(inChk)
|
|
require.NoError(t, err)
|
|
|
|
outChk, err1 := storage.GetChunk(0)
|
|
require.NoError(t, err1)
|
|
|
|
in64s := inChk.Column(0).Int64s()
|
|
out64s := outChk.Column(0).Int64s()
|
|
require.Equal(t, in64s, out64s)
|
|
}
|
|
|
|
func TestSpillToDisk(t *testing.T) {
|
|
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLong)}
|
|
chkSize := 10
|
|
storage := NewStorageRowContainer(fields, chkSize)
|
|
var tmp any = storage
|
|
|
|
inChk := chunk.NewChunkWithCapacity(fields, chkSize)
|
|
for i := range chkSize {
|
|
inChk.AppendInt64(0, int64(i))
|
|
}
|
|
|
|
err := storage.OpenAndRef()
|
|
require.NoError(t, err)
|
|
|
|
memTracker := storage.GetMemTracker()
|
|
memTracker.SetBytesLimit(inChk.MemoryUsage() + 1)
|
|
action := tmp.(*StorageRC).ActionSpillForTest()
|
|
memTracker.FallbackOldAndSetNewAction(action)
|
|
diskTracker := storage.GetDiskTracker()
|
|
|
|
// All data is in memory.
|
|
err = storage.Add(inChk)
|
|
require.NoError(t, err)
|
|
outChk, err1 := storage.GetChunk(0)
|
|
require.NoError(t, err1)
|
|
|
|
in64s := inChk.Column(0).Int64s()
|
|
out64s := outChk.Column(0).Int64s()
|
|
require.Equal(t, in64s, out64s)
|
|
|
|
require.Greater(t, memTracker.BytesConsumed(), int64(0))
|
|
require.Greater(t, memTracker.MaxConsumed(), int64(0))
|
|
require.Equal(t, int64(0), diskTracker.BytesConsumed())
|
|
require.Equal(t, int64(0), diskTracker.MaxConsumed())
|
|
|
|
// Add again, and will trigger spill to disk.
|
|
err = storage.Add(inChk)
|
|
require.NoError(t, err)
|
|
action.WaitForTest()
|
|
require.Equal(t, int64(0), memTracker.BytesConsumed())
|
|
require.Greater(t, memTracker.MaxConsumed(), int64(0))
|
|
require.Greater(t, diskTracker.BytesConsumed(), int64(0))
|
|
require.Greater(t, diskTracker.MaxConsumed(), int64(0))
|
|
|
|
outChk, err = storage.GetChunk(0)
|
|
require.NoError(t, err)
|
|
out64s = outChk.Column(0).Int64s()
|
|
require.Equal(t, in64s, out64s)
|
|
|
|
outChk, err = storage.GetChunk(1)
|
|
require.NoError(t, err)
|
|
out64s = outChk.Column(0).Int64s()
|
|
require.Equal(t, in64s, out64s)
|
|
}
|
|
|
|
func TestReopen(t *testing.T) {
|
|
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLong)}
|
|
chkSize := 10
|
|
storage := NewStorageRowContainer(fields, chkSize)
|
|
err := storage.OpenAndRef()
|
|
require.NoError(t, err)
|
|
|
|
inChk := chunk.NewChunkWithCapacity(fields, chkSize)
|
|
for i := range chkSize {
|
|
inChk.AppendInt64(0, int64(i))
|
|
}
|
|
err = storage.Add(inChk)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, storage.NumChunks())
|
|
err = storage.Reopen()
|
|
require.NoError(t, err)
|
|
require.Equal(t, 0, storage.NumChunks())
|
|
|
|
err = storage.Add(inChk)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, storage.NumChunks())
|
|
|
|
outChk, err := storage.GetChunk(0)
|
|
require.NoError(t, err)
|
|
in64s := inChk.Column(0).Int64s()
|
|
out64s := outChk.Column(0).Int64s()
|
|
require.Equal(t, in64s, out64s)
|
|
// Reopen multiple times.
|
|
for range 100 {
|
|
err = storage.Reopen()
|
|
require.NoError(t, err)
|
|
}
|
|
err = storage.Add(inChk)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, storage.NumChunks())
|
|
|
|
outChk, err = storage.GetChunk(0)
|
|
require.NoError(t, err)
|
|
in64s = inChk.Column(0).Int64s()
|
|
out64s = outChk.Column(0).Int64s()
|
|
require.Equal(t, in64s, out64s)
|
|
}
|
|
|
|
func TestSwapData(t *testing.T) {
|
|
tp1 := []*types.FieldType{types.NewFieldType(mysql.TypeLong)}
|
|
chkSize := 10
|
|
storage1 := NewStorageRowContainer(tp1, chkSize)
|
|
err := storage1.OpenAndRef()
|
|
require.NoError(t, err)
|
|
inChk1 := chunk.NewChunkWithCapacity(tp1, chkSize)
|
|
for i := range chkSize {
|
|
inChk1.AppendInt64(0, int64(i))
|
|
}
|
|
in1 := inChk1.Column(0).Int64s()
|
|
err = storage1.Add(inChk1)
|
|
require.NoError(t, err)
|
|
|
|
tp2 := []*types.FieldType{types.NewFieldType(mysql.TypeVarString)}
|
|
storage2 := NewStorageRowContainer(tp2, chkSize)
|
|
err = storage2.OpenAndRef()
|
|
require.NoError(t, err)
|
|
|
|
inChk2 := chunk.NewChunkWithCapacity(tp2, chkSize)
|
|
for i := range chkSize {
|
|
inChk2.AppendString(0, strconv.FormatInt(int64(i), 10))
|
|
}
|
|
in2 := make([]string, 0, inChk2.NumRows())
|
|
for i := range inChk2.NumRows() {
|
|
in2 = append(in2, inChk2.Column(0).GetString(i))
|
|
}
|
|
err = storage2.Add(inChk2)
|
|
require.NoError(t, err)
|
|
|
|
err = storage1.SwapData(storage2)
|
|
require.NoError(t, err)
|
|
|
|
outChk1, err := storage1.GetChunk(0)
|
|
require.NoError(t, err)
|
|
outChk2, err := storage2.GetChunk(0)
|
|
require.NoError(t, err)
|
|
|
|
out1 := make([]string, 0, outChk1.NumRows())
|
|
for i := range outChk1.NumRows() {
|
|
out1 = append(out1, outChk1.Column(0).GetString(i))
|
|
}
|
|
out2 := outChk2.Column(0).Int64s()
|
|
require.Equal(t, in1, out2)
|
|
require.Equal(t, in2, out1)
|
|
}
|