util: refine chunk.SwapColumn to rebuild the column reference (#7841)

This commit is contained in:
HuaiyuXu
2018-10-09 14:26:55 +08:00
committed by GitHub
parent 75d6a3ee9e
commit 7623899b62
2 changed files with 85 additions and 1 deletions

View File

@ -137,9 +137,47 @@ func (c *Chunk) MakeRef(srcColIdx, dstColIdx int) {
c.columns[dstColIdx] = c.columns[srcColIdx]
}
// SwapColumn swaps column "c.columns[colIdx]" with column "other.columns[otherIdx]".
// SwapColumn swaps column "c.columns[colIdx]" with column
// "other.columns[otherIdx]". If there exists columns refer to the column to be
// swapped, we need to re-build the reference.
func (c *Chunk) SwapColumn(colIdx int, other *Chunk, otherIdx int) {
// Find the leftmost column of the reference which is the actual column to
// be swapped.
for i := 0; i < colIdx; i++ {
if c.columns[i] == c.columns[colIdx] {
colIdx = i
}
}
for i := 0; i < otherIdx; i++ {
if other.columns[i] == other.columns[otherIdx] {
otherIdx = i
}
}
// Find the columns which refer to the actual column to be swapped.
refColsIdx := make([]int, 0, len(c.columns)-colIdx)
for i := colIdx; i < len(c.columns); i++ {
if c.columns[i] == c.columns[colIdx] {
refColsIdx = append(refColsIdx, i)
}
}
refColsIdx4Other := make([]int, 0, len(other.columns)-otherIdx)
for i := otherIdx; i < len(other.columns); i++ {
if other.columns[i] == other.columns[otherIdx] {
refColsIdx4Other = append(refColsIdx4Other, i)
}
}
// Swap columns from two chunks.
c.columns[colIdx], other.columns[otherIdx] = other.columns[otherIdx], c.columns[colIdx]
// Rebuild the reference.
for _, i := range refColsIdx {
c.MakeRef(colIdx, i)
}
for _, i := range refColsIdx4Other {
other.MakeRef(otherIdx, i)
}
}
// SwapColumns swaps columns with another Chunk.

View File

@ -471,6 +471,52 @@ func (s *testChunkSuite) TestChunkMemoryUsage(c *check.C) {
c.Assert(memUsage, check.Equals, int64(expectedUsage))
}
func (s *testChunkSuite) TestSwapColumn(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 2)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
// chk1: column1 refers to column0
chk1 := NewChunkWithCapacity(fieldTypes, 1)
chk1.AppendFloat64(0, 1)
chk1.MakeRef(0, 1)
chk1.AppendFloat64(2, 3)
// chk2: column1 refers to column0
chk2 := NewChunkWithCapacity(fieldTypes, 1)
chk2.AppendFloat64(0, 1)
chk2.MakeRef(0, 1)
chk2.AppendFloat64(2, 3)
c.Assert(chk1.columns[0] == chk1.columns[1], check.IsTrue)
c.Assert(chk2.columns[0] == chk2.columns[1], check.IsTrue)
checkRef := func() {
c.Assert(chk1.columns[0] == chk1.columns[1], check.IsTrue)
c.Assert(chk1.columns[0] == chk2.columns[0], check.IsFalse)
c.Assert(chk2.columns[0] == chk2.columns[1], check.IsTrue)
}
chk1.SwapColumn(0, chk2, 0)
checkRef()
chk1.SwapColumn(0, chk2, 1)
checkRef()
chk2.SwapColumn(1, chk2, 0)
checkRef()
chk2.SwapColumn(1, chk2, 1)
checkRef()
chk2.SwapColumn(1, chk2, 2)
checkRef()
chk2.SwapColumn(2, chk2, 0)
checkRef()
}
func BenchmarkAppendInt(b *testing.B) {
b.ReportAllocs()
chk := newChunk(8)