plans: add check ambiguous helper function

This commit is contained in:
siddontang
2015-10-16 11:29:29 +08:00
parent 5faf998be9
commit 239fcabdf7
2 changed files with 164 additions and 1 deletions

View File

@ -59,10 +59,13 @@ func (s *SelectList) updateFields(table string, resultFields []*field.ResultFiel
func createEmptyResultField(f *field.Field) *field.ResultField {
result := &field.ResultField{}
// Set origin name
result.ColumnInfo.Name = model.NewCIStr(f.Expr.String())
if len(f.AsName) > 0 {
result.Name = f.AsName
} else {
result.Name = f.Expr.String()
result.Name = result.ColumnInfo.Name.O
}
return result
}
@ -140,6 +143,55 @@ func (s *SelectList) CloneHiddenField(name string, tableFields []*field.ResultFi
return false
}
// CheckReferAmbiguous checks whether an identifier reference is ambiguous or not in select list.
// e,g, "select c1 as a, c2 as a from t group by a" is ambiguous,
// but "select c1 as a, c1 as a from t group by a" is not.
// For MySQL "select c1 as a, c2 + 1 as a from t group by a" is not ambiguous too,
// so we will only check identifier too.
// If no ambiguous, -1 means expr refers none in select list, else an index in select list returns.
func (s *SelectList) CheckReferAmbiguous(expr expression.Expression) (int, error) {
if _, ok := expr.(*expression.Ident); !ok {
return -1, nil
}
name := expr.String()
if strings.Contains(name, ".") {
// name is qualified, no need to check
return -1, nil
}
lastIndex := -1
// only check origin select list, no hidden field.
for i := 0; i < s.HiddenFieldOffset; i++ {
if s.ResultFields[i].Name != name {
continue
} else if _, ok := s.Fields[i].Expr.(*expression.Ident); !ok {
// not identfier, no check
continue
}
if lastIndex == -1 {
// first match, continue
lastIndex = i
continue
}
// check origin name, e,g. "select c1 as c2, c2 from t group by c2" is ambiguous.
if s.ResultFields[i].ColumnInfo.Name.O != s.ResultFields[lastIndex].ColumnInfo.Name.O {
return -1, errors.Errorf("refer %s is ambiguous", expr)
}
// check table name, e.g, "select t1.c1, c1 from t group by c1" is not ambiguous.
if s.ResultFields[i].TableName != s.ResultFields[lastIndex].TableName {
return -1, errors.Errorf("refer %s is ambiguous", expr)
}
// TODO: check database name if possible.
}
return lastIndex, nil
}
// ResolveSelectList gets fields and result fields from selectFields and srcFields,
// including field validity check and wildcard field processing.
func ResolveSelectList(selectFields []*field.Field, srcFields []*field.ResultField) (*SelectList, error) {

View File

@ -0,0 +1,111 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package plans_test
import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/field"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan/plans"
)
type testSelectListSuite struct{}
var _ = Suite(&testSelectListSuite{})
func (s *testSelectListSuite) createTestResultFields(colNames []string) []*field.ResultField {
fs := make([]*field.ResultField, 0, len(colNames))
for i := 0; i < len(colNames); i++ {
f := &field.ResultField{
TableName: "t",
Name: colNames[i],
}
f.ColumnInfo.Name = model.NewCIStr(colNames[i])
fs = append(fs, f)
}
return fs
}
func (s *testSelectListSuite) TestAmbiguous(c *C) {
type pair struct {
Name string
AsName string
}
createResultFields := func(names []string) []*field.ResultField {
fs := make([]*field.ResultField, 0, len(names))
for i := 0; i < len(names); i++ {
f := &field.ResultField{
TableName: "t",
Name: names[i],
}
f.ColumnInfo.Name = model.NewCIStr(names[i])
fs = append(fs, f)
}
return fs
}
createFields := func(ps []pair) []*field.Field {
fields := make([]*field.Field, len(ps))
for i, f := range ps {
fields[i] = &field.Field{
Expr: &expression.Ident{
CIStr: model.NewCIStr(f.Name),
},
AsName: f.AsName,
}
}
return fields
}
tbl := []struct {
Fields []pair
Name string
Err bool
Index int
}{
{[]pair{{"id", ""}}, "id", false, 0},
{[]pair{{"id", "a"}, {"name", "a"}}, "a", true, -1},
{[]pair{{"id", "a"}, {"name", "a"}}, "id", false, -1},
}
for _, t := range tbl {
rs := createResultFields([]string{"id", "name"})
fs := createFields(t.Fields)
sl, err := plans.ResolveSelectList(fs, rs)
c.Assert(err, IsNil)
index, err := sl.CheckReferAmbiguous(&expression.Ident{
CIStr: model.NewCIStr(t.Name),
})
if t.Err {
c.Assert(err, NotNil)
continue
}
c.Assert(err, IsNil)
c.Assert(t.Index, Equals, index)
}
}