ddl, planner: make partition table consider collation in create table and select sentence (#20937)

This commit is contained in:
xiongjiwei
2020-11-21 09:53:53 +08:00
committed by GitHub
parent 2c66371d8b
commit fbaab3ecb6
5 changed files with 88 additions and 13 deletions

View File

@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
)
@ -355,7 +356,9 @@ func (s *testIntegrationSuite2) TestCreateTableWithHashPartition(c *C) {
tk.MustExec("create table t2 (a date, b datetime) partition by hash (EXTRACT(YEAR_MONTH FROM a)) partitions 7")
}
func (s *testIntegrationSuite1) TestCreateTableWithRangeColumnPartition(c *C) {
func (s *testIntegrationSuite7) TestCreateTableWithRangeColumnPartition(c *C) {
collate.SetNewCollationEnabledForTest(true)
defer collate.SetNewCollationEnabledForTest(false)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists log_message_1;")
@ -476,6 +479,13 @@ create table log_message_1 (
"PARTITION p1 VALUES LESS THAN (20190906));",
ddl.ErrWrongTypeColumnValue,
},
{
"create table t(a char(10) collate utf8mb4_bin) " +
"partition by range columns (a) (" +
"partition p0 values less than ('a'), " +
"partition p1 values less than ('G'));",
ddl.ErrRangeNotIncreasing,
},
}
for i, t := range cases {
_, err := tk.Exec(t.sql)
@ -485,13 +495,25 @@ create table log_message_1 (
))
}
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1 (a int, b char(3)) partition by range columns (a, b) (" +
"partition p0 values less than (1, 'a')," +
"partition p1 values less than (2, maxvalue))")
tk.MustExec("drop table if exists t2;")
tk.MustExec("create table t2 (a int, b char(3)) partition by range columns (b) (" +
"partition p0 values less than ( 'a')," +
"partition p1 values less than (maxvalue))")
tk.MustExec("drop table if exists t;")
tk.MustExec(`create table t(a char(10) collate utf8mb4_unicode_ci) partition by range columns (a) (
partition p0 values less than ('a'),
partition p1 values less than ('G'));`)
tk.MustExec("drop table if exists t;")
tk.MustExec(`create table t(a int) partition by range columns (a) (
partition p0 values less than (10),
partition p1 values less than (20));`)
}
func (s *testIntegrationSuite1) TestCreateTableWithListPartition(c *C) {

View File

@ -2121,7 +2121,8 @@ func checkTwoRangeColumns(ctx sessionctx.Context, curr, prev *model.PartitionDef
// PARTITION p0 VALUES LESS THAN (5,10,'ggg')
// PARTITION p1 VALUES LESS THAN (10,20,'mmm')
// PARTITION p2 VALUES LESS THAN (15,30,'sss')
succ, err := parseAndEvalBoolExpr(ctx, fmt.Sprintf("(%s) > (%s)", curr.LessThan[i], prev.LessThan[i]), tbInfo)
colInfo := findColumnByName(pi.Columns[i].L, tbInfo)
succ, err := parseAndEvalBoolExpr(ctx, curr.LessThan[i], prev.LessThan[i], colInfo, tbInfo)
if err != nil {
return false, err
}
@ -2133,11 +2134,20 @@ func checkTwoRangeColumns(ctx sessionctx.Context, curr, prev *model.PartitionDef
return false, nil
}
func parseAndEvalBoolExpr(ctx sessionctx.Context, expr string, tbInfo *model.TableInfo) (bool, error) {
e, err := expression.ParseSimpleExprWithTableInfo(ctx, expr, tbInfo)
func parseAndEvalBoolExpr(ctx sessionctx.Context, l, r string, colInfo *model.ColumnInfo, tbInfo *model.TableInfo) (bool, error) {
lexpr, err := expression.ParseSimpleExprWithTableInfo(ctx, l, tbInfo)
if err != nil {
return false, err
}
rexpr, err := expression.ParseSimpleExprWithTableInfo(ctx, r, tbInfo)
if err != nil {
return false, err
}
e, err := expression.NewFunctionBase(ctx, ast.GT, types.NewFieldType(mysql.TypeLonglong), lexpr, rexpr)
if err != nil {
return false, err
}
e.SetCharsetAndCollation(colInfo.Charset, colInfo.Collate)
res, _, err1 := e.EvalInt(ctx, chunk.Row{})
if err1 != nil {
return false, err1

View File

@ -327,11 +327,6 @@ func buildTablePartitionInfo(ctx sessionctx.Context, s *ast.CreateTableStmt) (*m
}
pi.Expr = buf.String()
} else if s.Partition.ColumnNames != nil {
// TODO: Support multiple columns for 'PARTITION BY RANGE COLUMNS'.
if s.Partition.Tp == model.PartitionTypeRange && len(s.Partition.ColumnNames) != 1 {
pi.Enable = false
ctx.GetSessionVars().StmtCtx.AppendWarning(ErrUnsupportedPartitionByRangeColumns)
}
pi.Columns = make([]model.CIStr, 0, len(s.Partition.ColumnNames))
for _, cn := range s.Partition.ColumnNames {
pi.Columns = append(pi.Columns, cn.Name)

View File

@ -21,10 +21,11 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/mock"
)
var _ = Suite(&testPartitionPruningSuite{})
var _ = SerialSuites(&testPartitionPruningSuite{})
type testPartitionPruningSuite struct {
partitionProcessor
@ -315,6 +316,52 @@ func (s *testPartitionPruningSuite) TestPartitionRangePrunner2VarChar(c *C) {
}
}
func (s *testPartitionPruningSuite) TestPartitionRangePrunner2CharWithCollation(c *C) {
collate.SetNewCollationEnabledForTest(true)
defer collate.SetNewCollationEnabledForTest(false)
tc := prepareTestCtx(c,
"create table t (a char(32) collate utf8mb4_unicode_ci)",
"a",
)
lessThanDataInt := []string{"'c'", "'F'", "'h'", "'L'", "'t'"}
lessThan := make([]expression.Expression, len(lessThanDataInt)+1) // +1 for maxvalue
for i, str := range lessThanDataInt {
tmp, err := expression.ParseSimpleExprsWithNames(tc.sctx, str, tc.schema, tc.names)
c.Assert(err, IsNil)
lessThan[i] = tmp[0]
}
prunner := &rangeColumnsPruner{lessThan, tc.columns[0], true}
cases := []struct {
input string
result partitionRangeOR
}{
{"a > 'G'", partitionRangeOR{{2, 6}}},
{"a > 'g'", partitionRangeOR{{2, 6}}},
{"a < 'h'", partitionRangeOR{{0, 3}}},
{"a >= 'M'", partitionRangeOR{{4, 6}}},
{"a > 'm'", partitionRangeOR{{4, 6}}},
{"a < 'F'", partitionRangeOR{{0, 2}}},
{"a = 'C'", partitionRangeOR{{1, 2}}},
{"a > 't'", partitionRangeOR{{5, 6}}},
{"a > 'C' and a < 'q'", partitionRangeOR{{1, 5}}},
{"a > 'c' and a < 'Q'", partitionRangeOR{{1, 5}}},
{"a < 'l' or a >= 'W'", partitionRangeOR{{0, 4}, {5, 6}}},
{"a is null", partitionRangeOR{{0, 1}}},
{"'Mm' > a", partitionRangeOR{{0, 5}}},
{"'f' <= a", partitionRangeOR{{2, 6}}},
{"'f' >= a", partitionRangeOR{{0, 3}}},
}
for _, ca := range cases {
expr, err := expression.ParseSimpleExprsWithNames(tc.sctx, ca.input, tc.schema, tc.names)
c.Assert(err, IsNil)
result := fullRange(len(lessThan))
result = partitionRangeForExpr(tc.sctx, expr[0], prunner, result)
c.Assert(equalPartitionRangeOR(ca.result, result), IsTrue, Commentf("unexpected:", ca.input))
}
}
func (s *testPartitionPruningSuite) TestPartitionRangePrunner2Date(c *C) {
tc := prepareTestCtx(c,
"create table t (a date)",

View File

@ -1155,11 +1155,11 @@ func (p *rangeColumnsPruner) partitionRangeForExpr(sctx sessionctx.Context, expr
return 0, len(p.data), false
}
start, end := p.pruneUseBinarySearch(sctx, opName, con)
start, end := p.pruneUseBinarySearch(sctx, opName, con, op)
return start, end, true
}
func (p *rangeColumnsPruner) pruneUseBinarySearch(sctx sessionctx.Context, op string, data *expression.Constant) (start int, end int) {
func (p *rangeColumnsPruner) pruneUseBinarySearch(sctx sessionctx.Context, op string, data *expression.Constant, f *expression.ScalarFunction) (start int, end int) {
var err error
var isNull bool
compare := func(ith int, op string, v *expression.Constant) bool {
@ -1169,7 +1169,8 @@ func (p *rangeColumnsPruner) pruneUseBinarySearch(sctx sessionctx.Context, op st
}
}
var expr expression.Expression
expr, err = expression.NewFunction(sctx, op, types.NewFieldType(mysql.TypeLonglong), p.data[ith], v)
expr, err = expression.NewFunctionBase(sctx, op, types.NewFieldType(mysql.TypeLonglong), p.data[ith], v)
expr.SetCharsetAndCollation(f.CharsetAndCollation(sctx))
var val int64
val, isNull, err = expr.EvalInt(sctx, chunk.Row{})
return val > 0