distsql: fix wrong schema version when snapshot has been set (#15258)
This commit is contained in:
committed by
GitHub
parent
9fdc5531ce
commit
1637c42d45
@ -17,6 +17,7 @@ import (
|
||||
"math"
|
||||
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/tidb/infoschema"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
@ -165,7 +166,11 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
|
||||
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
|
||||
builder.Request.Priority = builder.getKVPriority(sv)
|
||||
builder.Request.ReplicaRead = sv.GetReplicaRead()
|
||||
builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion
|
||||
if sv.SnapshotInfoschema != nil {
|
||||
builder.Request.SchemaVar = infoschema.GetInfoSchemaBySessionVars(sv).SchemaMetaVersion()
|
||||
} else {
|
||||
builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion
|
||||
}
|
||||
return builder
|
||||
}
|
||||
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/tidb/infoschema"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
@ -599,6 +600,27 @@ func (s *testSuite) TestRequestBuilder7(c *C) {
|
||||
c.Assert(actual, DeepEquals, expect)
|
||||
}
|
||||
|
||||
func (s *testSuite) TestRequestBuilder8(c *C) {
|
||||
sv := variable.NewSessionVars()
|
||||
sv.SnapshotInfoschema = infoschema.MockInfoSchemaWithSchemaVer(nil, 10000)
|
||||
actual, err := (&RequestBuilder{}).
|
||||
SetFromSessionVars(sv).
|
||||
Build()
|
||||
c.Assert(err, IsNil)
|
||||
expect := &kv.Request{
|
||||
Tp: 0,
|
||||
StartTs: 0x0,
|
||||
Data: []uint8(nil),
|
||||
Concurrency: 15,
|
||||
IsolationLevel: 0,
|
||||
Priority: 0,
|
||||
MemTracker: (*memory.Tracker)(nil),
|
||||
ReplicaRead: 0x1,
|
||||
SchemaVar: 10000,
|
||||
}
|
||||
c.Assert(actual, DeepEquals, expect)
|
||||
}
|
||||
|
||||
func (s *testSuite) TestTableRangesToKVRangesWithFbs(c *C) {
|
||||
ranges := []*ranger.Range{
|
||||
{
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/meta/autoid"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
@ -116,6 +117,30 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
|
||||
return result
|
||||
}
|
||||
|
||||
// MockInfoSchemaWithSchemaVer only serves for test.
|
||||
func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) InfoSchema {
|
||||
result := &infoSchema{}
|
||||
result.schemaMap = make(map[string]*schemaTables)
|
||||
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
|
||||
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
|
||||
tableNames := &schemaTables{
|
||||
dbInfo: dbInfo,
|
||||
tables: make(map[string]table.Table),
|
||||
}
|
||||
result.schemaMap["test"] = tableNames
|
||||
for _, tb := range tbList {
|
||||
tbl := table.MockTableFromMeta(tb)
|
||||
tableNames.tables[tb.Name.L] = tbl
|
||||
bucketIdx := tableBucketIdx(tb.ID)
|
||||
result.sortedTablesBuckets[bucketIdx] = append(result.sortedTablesBuckets[bucketIdx], tbl)
|
||||
}
|
||||
for i := range result.sortedTablesBuckets {
|
||||
sort.Sort(result.sortedTablesBuckets[i])
|
||||
}
|
||||
result.schemaMetaVersion = schemaVer
|
||||
return result
|
||||
}
|
||||
|
||||
var _ InfoSchema = (*infoSchema)(nil)
|
||||
|
||||
func (is *infoSchema) SchemaByName(schema model.CIStr) (val *model.DBInfo, ok bool) {
|
||||
@ -339,7 +364,12 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) {
|
||||
// GetInfoSchema gets TxnCtx InfoSchema if snapshot schema is not set,
|
||||
// Otherwise, snapshot schema is returned.
|
||||
func GetInfoSchema(ctx sessionctx.Context) InfoSchema {
|
||||
sessVar := ctx.GetSessionVars()
|
||||
return GetInfoSchemaBySessionVars(ctx.GetSessionVars())
|
||||
}
|
||||
|
||||
// GetInfoSchemaBySessionVars gets TxnCtx InfoSchema if snapshot schema is not set,
|
||||
// Otherwise, snapshot schema is returned.
|
||||
func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema {
|
||||
var is InfoSchema
|
||||
if snap := sessVar.SnapshotInfoschema; snap != nil {
|
||||
is = snap.(InfoSchema)
|
||||
|
||||
@ -28,7 +28,6 @@ import (
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/parser/terror"
|
||||
"github.com/pingcap/tidb/distsql"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/expression/aggregation"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
@ -628,9 +627,6 @@ func (h *rpcHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dag
|
||||
case tipb.EncodeType_TypeDefault:
|
||||
h.encodeDefault(selResp, rows, dagReq.OutputOffsets)
|
||||
case tipb.EncodeType_TypeChunk:
|
||||
if dagReq.GetChunkMemoryLayout().GetEndian() != distsql.GetSystemEndian() {
|
||||
return errors.Errorf("Mocktikv endian must be the same as TiDB system endian.")
|
||||
}
|
||||
colTypes := h.constructRespSchema(dagCtx)
|
||||
loc := dagCtx.evalCtx.sc.TimeZone
|
||||
err := h.encodeChunk(selResp, rows, colTypes, dagReq.OutputOffsets, loc)
|
||||
|
||||
Reference in New Issue
Block a user