executor: Support read operation for local temporary table (#26353)
This commit is contained in:
@ -322,8 +322,22 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tempTableData := ctx.GetSessionVars().TemporaryTableData
|
||||
for _, rg := range kvRanges {
|
||||
iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
|
||||
if tempTableData != nil {
|
||||
snapIter, err := tempTableData.Iter(rg.StartKey, rg.EndKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
iter, err = NewUnionIter(iter, snapIter, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for ; iter.Valid(); err = iter.Next() {
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
159
executor/union_iter.go
Normal file
159
executor/union_iter.go
Normal file
@ -0,0 +1,159 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package executor
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/pingcap/tidb/kv"
|
||||
)
|
||||
|
||||
// UnionIter implements kv.Iterator
|
||||
type UnionIter struct {
|
||||
dirtyIt kv.Iterator
|
||||
snapshotIt kv.Iterator
|
||||
|
||||
dirtyValid bool
|
||||
snapshotValid bool
|
||||
|
||||
curIsDirty bool
|
||||
isValid bool
|
||||
reverse bool
|
||||
}
|
||||
|
||||
// NewUnionIter returns a union iterator for BufferStore.
|
||||
func NewUnionIter(dirtyIt kv.Iterator, snapshotIt kv.Iterator, reverse bool) (*UnionIter, error) {
|
||||
it := &UnionIter{
|
||||
dirtyIt: dirtyIt,
|
||||
snapshotIt: snapshotIt,
|
||||
dirtyValid: dirtyIt.Valid(),
|
||||
snapshotValid: snapshotIt.Valid(),
|
||||
reverse: reverse,
|
||||
}
|
||||
err := it.updateCur()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return it, nil
|
||||
}
|
||||
|
||||
// dirtyNext makes iter.dirtyIt go and update valid status.
|
||||
func (iter *UnionIter) dirtyNext() error {
|
||||
err := iter.dirtyIt.Next()
|
||||
iter.dirtyValid = iter.dirtyIt.Valid()
|
||||
return err
|
||||
}
|
||||
|
||||
// snapshotNext makes iter.snapshotIt go and update valid status.
|
||||
func (iter *UnionIter) snapshotNext() error {
|
||||
err := iter.snapshotIt.Next()
|
||||
iter.snapshotValid = iter.snapshotIt.Valid()
|
||||
return err
|
||||
}
|
||||
|
||||
func (iter *UnionIter) updateCur() error {
|
||||
iter.isValid = true
|
||||
for {
|
||||
if !iter.dirtyValid && !iter.snapshotValid {
|
||||
iter.isValid = false
|
||||
break
|
||||
}
|
||||
|
||||
if !iter.dirtyValid {
|
||||
iter.curIsDirty = false
|
||||
break
|
||||
}
|
||||
|
||||
if !iter.snapshotValid {
|
||||
iter.curIsDirty = true
|
||||
break
|
||||
}
|
||||
|
||||
// both valid
|
||||
if iter.snapshotValid && iter.dirtyValid {
|
||||
snapshotKey := iter.snapshotIt.Key()
|
||||
dirtyKey := iter.dirtyIt.Key()
|
||||
cmp := bytes.Compare(dirtyKey, snapshotKey)
|
||||
if iter.reverse {
|
||||
cmp = -cmp
|
||||
}
|
||||
// if equal, means both have value
|
||||
if cmp == 0 {
|
||||
if err := iter.snapshotNext(); err != nil {
|
||||
return err
|
||||
}
|
||||
iter.curIsDirty = true
|
||||
break
|
||||
} else if cmp > 0 {
|
||||
// record from snapshot comes first
|
||||
iter.curIsDirty = false
|
||||
break
|
||||
} else {
|
||||
// record from dirty comes first
|
||||
iter.curIsDirty = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next implements the Iterator Next interface.
|
||||
func (iter *UnionIter) Next() error {
|
||||
var err error
|
||||
if !iter.curIsDirty {
|
||||
err = iter.snapshotNext()
|
||||
} else {
|
||||
err = iter.dirtyNext()
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = iter.updateCur()
|
||||
return err
|
||||
}
|
||||
|
||||
// Value implements the Iterator Value interface.
|
||||
// Multi columns
|
||||
func (iter *UnionIter) Value() []byte {
|
||||
if !iter.curIsDirty {
|
||||
return iter.snapshotIt.Value()
|
||||
}
|
||||
return iter.dirtyIt.Value()
|
||||
}
|
||||
|
||||
// Key implements the Iterator Key interface.
|
||||
func (iter *UnionIter) Key() kv.Key {
|
||||
if !iter.curIsDirty {
|
||||
return iter.snapshotIt.Key()
|
||||
}
|
||||
return iter.dirtyIt.Key()
|
||||
}
|
||||
|
||||
// Valid implements the Iterator Valid interface.
|
||||
func (iter *UnionIter) Valid() bool {
|
||||
return iter.isValid
|
||||
}
|
||||
|
||||
// Close implements the Iterator Close interface.
|
||||
func (iter *UnionIter) Close() {
|
||||
if iter.snapshotIt != nil {
|
||||
iter.snapshotIt.Close()
|
||||
iter.snapshotIt = nil
|
||||
}
|
||||
if iter.dirtyIt != nil {
|
||||
iter.dirtyIt.Close()
|
||||
iter.dirtyIt = nil
|
||||
}
|
||||
}
|
||||
167
executor/union_iter_test.go
Normal file
167
executor/union_iter_test.go
Normal file
@ -0,0 +1,167 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package executor_test
|
||||
|
||||
import (
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/executor"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
)
|
||||
|
||||
var _ = Suite(&testUnionIterSuit{})
|
||||
|
||||
type testUnionIterSuit struct {
|
||||
}
|
||||
|
||||
func (s *testUnionIterSuit) TestUnionIter(c *C) {
|
||||
// test iter normal cases, snap iter become invalid before dirty iter
|
||||
snapRecords := []*mockRecord{
|
||||
r("k01", "v1"),
|
||||
r("k03", "v3"),
|
||||
r("k06", "v6"),
|
||||
r("k10", "v10"),
|
||||
r("k12", "v12"),
|
||||
r("k15", "v15"),
|
||||
r("k16", "v16"),
|
||||
}
|
||||
|
||||
dirtyRecords := []*mockRecord{
|
||||
r("k03", "x3"),
|
||||
r("k05", "x5"),
|
||||
r("k07", "x7"),
|
||||
r("k08", "x8"),
|
||||
}
|
||||
|
||||
assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{
|
||||
r("k01", "v1"),
|
||||
r("k03", "x3"),
|
||||
r("k05", "x5"),
|
||||
r("k06", "v6"),
|
||||
r("k07", "x7"),
|
||||
r("k08", "x8"),
|
||||
r("k10", "v10"),
|
||||
r("k12", "v12"),
|
||||
r("k15", "v15"),
|
||||
r("k16", "v16"),
|
||||
})
|
||||
|
||||
// test iter normal cases, dirty iter become invalid before snap iter
|
||||
dirtyRecords = []*mockRecord{
|
||||
r("k03", "x3"),
|
||||
r("k05", "x5"),
|
||||
r("k07", "x7"),
|
||||
r("k08", "x8"),
|
||||
r("k17", "x17"),
|
||||
r("k18", "x18"),
|
||||
}
|
||||
|
||||
assertUnionIter(c, dirtyRecords, snapRecords, []*mockRecord{
|
||||
r("k01", "v1"),
|
||||
r("k03", "x3"),
|
||||
r("k05", "x5"),
|
||||
r("k06", "v6"),
|
||||
r("k07", "x7"),
|
||||
r("k08", "x8"),
|
||||
r("k10", "v10"),
|
||||
r("k12", "v12"),
|
||||
r("k15", "v15"),
|
||||
r("k16", "v16"),
|
||||
r("k17", "x17"),
|
||||
r("k18", "x18"),
|
||||
})
|
||||
}
|
||||
|
||||
func assertUnionIter(c *C, dirtyRecords, snapRecords, expected []*mockRecord) {
|
||||
iter, err := executor.NewUnionIter(newMockIter(dirtyRecords), newMockIter(snapRecords), false)
|
||||
c.Assert(err, IsNil)
|
||||
assertIter(c, iter, expected)
|
||||
|
||||
// assert reverse is true
|
||||
iter, err = executor.NewUnionIter(newMockIter(reverseRecords(dirtyRecords)), newMockIter(reverseRecords(snapRecords)), true)
|
||||
c.Assert(err, IsNil)
|
||||
assertIter(c, iter, reverseRecords(expected))
|
||||
}
|
||||
|
||||
func assertIter(c *C, iter kv.Iterator, expected []*mockRecord) {
|
||||
records := make([]*mockRecord, 0, len(expected))
|
||||
for iter.Valid() {
|
||||
records = append(records, &mockRecord{iter.Key(), iter.Value()})
|
||||
err := iter.Next()
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
c.Assert(len(records), Equals, len(expected))
|
||||
for idx, record := range records {
|
||||
c.Assert(record.key, BytesEquals, expected[idx].key)
|
||||
c.Assert(record.value, BytesEquals, expected[idx].value)
|
||||
}
|
||||
}
|
||||
|
||||
func reverseRecords(records []*mockRecord) []*mockRecord {
|
||||
reversed := make([]*mockRecord, 0)
|
||||
for i := range records {
|
||||
reversed = append(reversed, records[len(records)-i-1])
|
||||
}
|
||||
return reversed
|
||||
}
|
||||
|
||||
type mockRecord struct {
|
||||
key []byte
|
||||
value []byte
|
||||
}
|
||||
|
||||
func r(key, value string) *mockRecord {
|
||||
bKey := []byte(key)
|
||||
bValue := []byte(value)
|
||||
if value == "nil" {
|
||||
bValue = nil
|
||||
}
|
||||
|
||||
return &mockRecord{bKey, bValue}
|
||||
}
|
||||
|
||||
type mockIter struct {
|
||||
data []*mockRecord
|
||||
cur int
|
||||
}
|
||||
|
||||
func newMockIter(records []*mockRecord) *mockIter {
|
||||
return &mockIter{
|
||||
records,
|
||||
0,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockIter) Valid() bool {
|
||||
return m.cur >= 0 && m.cur < len(m.data)
|
||||
}
|
||||
|
||||
func (m *mockIter) Key() kv.Key {
|
||||
return m.data[m.cur].key
|
||||
}
|
||||
|
||||
func (m *mockIter) Value() []byte {
|
||||
return m.data[m.cur].value
|
||||
}
|
||||
|
||||
func (m *mockIter) Next() error {
|
||||
if m.Valid() {
|
||||
m.cur += 1
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockIter) Close() {
|
||||
m.cur = -1
|
||||
}
|
||||
@ -4084,7 +4084,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
|
||||
|
||||
var result LogicalPlan = ds
|
||||
dirty := tableHasDirtyContent(b.ctx, tableInfo)
|
||||
if dirty {
|
||||
if dirty || tableInfo.TempTableType == model.TempTableLocal {
|
||||
us := LogicalUnionScan{handleCols: handleCols}.Init(b.ctx, b.getSelectOffset())
|
||||
us.SetChildren(ds)
|
||||
result = us
|
||||
|
||||
@ -460,7 +460,7 @@ type LogicalMemTable struct {
|
||||
QueryTimeRange QueryTimeRange
|
||||
}
|
||||
|
||||
// LogicalUnionScan is only used in non read-only txn.
|
||||
// LogicalUnionScan is used in non read-only txn or for scanning a local temporary table whose snapshot data is located in memory.
|
||||
type LogicalUnionScan struct {
|
||||
baseLogicalPlan
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/parser/ast"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/planner/property"
|
||||
@ -327,7 +328,7 @@ func (ds *DataSource) DeriveStats(childStats []*property.StatsInfo, selfSchema *
|
||||
}
|
||||
}
|
||||
}
|
||||
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn {
|
||||
if isPossibleIdxMerge && sessionAndStmtPermission && needConsiderIndexMerge && isReadOnlyTxn && ds.tableInfo.TempTableType != model.TempTableLocal {
|
||||
err := ds.generateAndPruneIndexMergePath(ds.indexMergeHints != nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -5057,3 +5057,95 @@ func (s *testSessionSuite) TestLocalTemporaryTableBatchPointGet(c *C) {
|
||||
tk.MustQuery("select * from tmp1 where id in (1, 4)").Check(testkit.Rows("1 11 101"))
|
||||
tk.MustQuery("select * from tmp1 where u in (11, 14)").Check(testkit.Rows("1 11 101"))
|
||||
}
|
||||
|
||||
func (s *testSessionSuite) TestLocalTemporaryTableScan(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("set @@tidb_enable_noop_functions=1")
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)")
|
||||
tk.MustExec("insert into tmp1 values" +
|
||||
"(1, 101, 1001), (3, 113, 1003), (5, 105, 1005), (7, 117, 1007), (9, 109, 1009)," +
|
||||
"(10, 110, 1010), (12, 112, 1012), (14, 114, 1014), (16, 116, 1016), (18, 118, 1018)",
|
||||
)
|
||||
|
||||
assertSelectAsUnModified := func() {
|
||||
// For TableReader
|
||||
tk.MustQuery("select * from tmp1 where id>3 order by id").Check(testkit.Rows(
|
||||
"5 105 1005", "7 117 1007", "9 109 1009",
|
||||
"10 110 1010", "12 112 1012", "14 114 1014", "16 116 1016", "18 118 1018",
|
||||
))
|
||||
|
||||
// For IndexLookUpReader
|
||||
tk.MustQuery("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u").Check(testkit.Rows(
|
||||
"5 105 1005", "9 109 1009", "10 110 1010",
|
||||
"12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018",
|
||||
))
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows())
|
||||
|
||||
// For IndexReader
|
||||
tk.MustQuery("select /*+ use_index(tmp1, u) */ id,u from tmp1 where u>101 order by id").Check(testkit.Rows(
|
||||
"3 113", "5 105", "7 117", "9 109", "10 110",
|
||||
"12 112", "14 114", "16 116", "18 118",
|
||||
))
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows())
|
||||
|
||||
// For IndexMerge, temporary table should not use index merge
|
||||
tk.MustQuery("select /*+ use_index_merge(tmp1, primary, u) */ * from tmp1 where id>5 or u>110 order by u").Check(testkit.Rows(
|
||||
"9 109 1009", "10 110 1010",
|
||||
"12 112 1012", "3 113 1003", "14 114 1014", "16 116 1016", "7 117 1007", "18 118 1018",
|
||||
))
|
||||
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 IndexMerge is inapplicable or disabled"))
|
||||
}
|
||||
|
||||
doModify := func() {
|
||||
tk.MustExec("insert into tmp1 values(2, 100, 1002)")
|
||||
tk.MustExec("insert into tmp1 values(4, 104, 1004)")
|
||||
tk.MustExec("insert into tmp1 values(11, 111, 1011)")
|
||||
tk.MustExec("update tmp1 set v=9999 where id=7")
|
||||
tk.MustExec("update tmp1 set u=132 where id=12")
|
||||
tk.MustExec("delete from tmp1 where id=16")
|
||||
}
|
||||
|
||||
assertSelectAsModified := func() {
|
||||
// For TableReader
|
||||
tk.MustQuery("select * from tmp1 where id>3 order by id").Check(testkit.Rows(
|
||||
"4 104 1004", "5 105 1005", "7 117 9999", "9 109 1009",
|
||||
"10 110 1010", "11 111 1011", "12 132 1012", "14 114 1014", "18 118 1018",
|
||||
))
|
||||
|
||||
// For IndexLookUpReader
|
||||
tk.MustQuery("select /*+ use_index(tmp1, u) */ * from tmp1 where u>101 order by u").Check(testkit.Rows(
|
||||
"4 104 1004", "5 105 1005", "9 109 1009", "10 110 1010", "11 111 1011",
|
||||
"3 113 1003", "14 114 1014", "7 117 9999", "18 118 1018", "12 132 1012",
|
||||
))
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows())
|
||||
|
||||
// For IndexReader
|
||||
tk.MustQuery("select /*+ use_index(tmp1, u) */ id,u from tmp1 where u>101 order by id").Check(testkit.Rows(
|
||||
"3 113", "4 104", "5 105", "7 117", "9 109",
|
||||
"10 110", "11 111", "12 132", "14 114", "18 118",
|
||||
))
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows())
|
||||
|
||||
// For IndexMerge, temporary table should not use index merge
|
||||
tk.MustQuery("select /*+ use_index_merge(tmp1, primary, u) */ * from tmp1 where id>5 or u>110 order by u").Check(testkit.Rows(
|
||||
"9 109 1009", "10 110 1010", "11 111 1011",
|
||||
"3 113 1003", "14 114 1014", "7 117 9999", "18 118 1018", "12 132 1012",
|
||||
))
|
||||
|
||||
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 IndexMerge is inapplicable or disabled"))
|
||||
}
|
||||
|
||||
assertSelectAsUnModified()
|
||||
tk.MustExec("begin")
|
||||
assertSelectAsUnModified()
|
||||
doModify()
|
||||
tk.MustExec("rollback")
|
||||
assertSelectAsUnModified()
|
||||
tk.MustExec("begin")
|
||||
doModify()
|
||||
assertSelectAsModified()
|
||||
tk.MustExec("commit")
|
||||
assertSelectAsModified()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user