*: add UTF8 check global variable. (#2899)
This commit is contained in:
@ -14,7 +14,10 @@
|
||||
package executor_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/terror"
|
||||
"github.com/pingcap/tidb/util/testkit"
|
||||
"github.com/pingcap/tidb/util/testleak"
|
||||
@ -66,15 +69,19 @@ func (s *testSuite) TestStatementContext(c *C) {
|
||||
tk.MustExec("delete from sc where a < '1x'")
|
||||
tk.MustQuery("select * from sc where a > '1x'").Check(testkit.Rows("4"))
|
||||
|
||||
// TODO: enable UTF8 check
|
||||
// Test invalid UTF8
|
||||
//tk.MustExec("create table sc2 (a varchar(255))")
|
||||
//// Insert an invalid UTF8
|
||||
//tk.MustExec("insert sc2 values (unhex('4040ffff'))")
|
||||
//c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Greater, uint16(0))
|
||||
//tk.MustQuery("select * from sc2").Check(testkit.Rows(fmt.Sprintf("%v", []byte("@@"))))
|
||||
//tk.MustExec(strictModeSQL)
|
||||
//_, err = tk.Exec("insert sc2 values (unhex('4040ffff'))")
|
||||
//c.Assert(err, NotNil)
|
||||
//c.Assert(terror.ErrorEqual(err, table.ErrTruncateWrongValue), IsTrue)
|
||||
tk.MustExec("create table sc2 (a varchar(255))")
|
||||
// Insert an invalid UTF8
|
||||
tk.MustExec("insert sc2 values (unhex('4040ffff'))")
|
||||
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Greater, uint16(0))
|
||||
tk.MustQuery("select * from sc2").Check(testkit.Rows(fmt.Sprintf("%v", []byte("@@"))))
|
||||
tk.MustExec(strictModeSQL)
|
||||
_, err = tk.Exec("insert sc2 values (unhex('4040ffff'))")
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(terror.ErrorEqual(err, table.ErrTruncateWrongValue), IsTrue)
|
||||
|
||||
tk.MustExec("set @@tidb_skip_utf8_check = '1'")
|
||||
_, err = tk.Exec("insert sc2 values (unhex('4040ffff'))")
|
||||
c.Assert(err, IsNil)
|
||||
tk.MustQuery("select length(a) from sc2").Check(testkit.Rows("2", "4"))
|
||||
}
|
||||
|
||||
@ -37,8 +37,8 @@ var (
|
||||
_ Executor = &LoadData{}
|
||||
)
|
||||
|
||||
func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum, assignFlag []bool, t table.Table, offset int, onDuplicateUpdate bool) error {
|
||||
cols := t.Cols()
|
||||
func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum, assignFlag []bool, t table.Table, onDuplicateUpdate bool) error {
|
||||
cols := t.WritableCols()
|
||||
touched := make(map[int]bool, len(cols))
|
||||
assignExists := false
|
||||
sc := ctx.GetSessionVars().StmtCtx
|
||||
@ -50,13 +50,7 @@ func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum,
|
||||
}
|
||||
continue
|
||||
}
|
||||
if i < offset || i >= offset+len(cols) {
|
||||
// The assign expression is for another table, not this.
|
||||
continue
|
||||
}
|
||||
|
||||
colIndex := i - offset
|
||||
col := cols[colIndex]
|
||||
col := cols[i]
|
||||
if col.IsPKHandleColumn(t.Meta()) {
|
||||
newHandle = newData[i]
|
||||
}
|
||||
@ -70,8 +64,12 @@ func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum,
|
||||
}
|
||||
t.RebaseAutoID(val, true)
|
||||
}
|
||||
|
||||
touched[colIndex] = true
|
||||
casted, err := table.CastValue(ctx, newData[i], col.ToInfo())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
newData[i] = casted
|
||||
touched[i] = true
|
||||
assignExists = true
|
||||
}
|
||||
|
||||
@ -80,11 +78,6 @@ func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum,
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check whether new value is valid.
|
||||
if err := table.CastValues(ctx, newData, cols, false); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err := table.CheckNotNull(cols, newData); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -949,7 +942,7 @@ func (e *InsertExec) onDuplicateUpdate(row []types.Datum, h int64, cols map[int]
|
||||
assignFlag[i] = false
|
||||
}
|
||||
}
|
||||
if err = updateRecord(e.ctx, h, data, newData, assignFlag, e.Table, 0, true); err != nil {
|
||||
if err = updateRecord(e.ctx, h, data, newData, assignFlag, e.Table, true); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
@ -1128,16 +1121,18 @@ func (e *UpdateExec) Next() (*Row, error) {
|
||||
e.updatedRowKeys[tbl] = make(map[int64]struct{})
|
||||
}
|
||||
offset := getTableOffset(e.SelectExec.Schema(), entry)
|
||||
end := offset + len(tbl.WritableCols())
|
||||
handle := entry.Handle
|
||||
oldData := row.Data[offset : offset+len(tbl.WritableCols())]
|
||||
newTableData := newData[offset : offset+len(tbl.WritableCols())]
|
||||
oldData := row.Data[offset:end]
|
||||
newTableData := newData[offset:end]
|
||||
flags := assignFlag[offset:end]
|
||||
_, ok := e.updatedRowKeys[tbl][handle]
|
||||
if ok {
|
||||
// Each matched row is updated once, even if it matches the conditions multiple times.
|
||||
continue
|
||||
}
|
||||
// Update row
|
||||
err1 := updateRecord(e.ctx, handle, oldData, newTableData, assignFlag, tbl, offset, false)
|
||||
err1 := updateRecord(e.ctx, handle, oldData, newTableData, flags, tbl, false)
|
||||
if err1 != nil {
|
||||
return nil, errors.Trace(err1)
|
||||
}
|
||||
|
||||
10
session.go
10
session.go
@ -479,6 +479,11 @@ func (s *session) GetGlobalSysVar(name string) (string, error) {
|
||||
sysVar, err := s.getExecRet(s, sql)
|
||||
if err != nil {
|
||||
if executor.ErrResultIsEmpty.Equal(err) {
|
||||
sv, ok := variable.SysVars[name]
|
||||
isUninitializedGlobalVariable := ok && sv.Scope|variable.ScopeGlobal > 0
|
||||
if isUninitializedGlobalVariable {
|
||||
return sv.Value, nil
|
||||
}
|
||||
return "", variable.UnknownSystemVar.GenByArgs(name)
|
||||
}
|
||||
return "", errors.Trace(err)
|
||||
@ -488,8 +493,8 @@ func (s *session) GetGlobalSysVar(name string) (string, error) {
|
||||
|
||||
// SetGlobalSysVar implements GlobalVarAccessor.SetGlobalSysVar interface.
|
||||
func (s *session) SetGlobalSysVar(name string, value string) error {
|
||||
sql := fmt.Sprintf(`UPDATE %s.%s SET VARIABLE_VALUE="%s" WHERE VARIABLE_NAME="%s";`,
|
||||
mysql.SystemDB, mysql.GlobalVariablesTable, value, strings.ToLower(name))
|
||||
sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`,
|
||||
mysql.SystemDB, mysql.GlobalVariablesTable, strings.ToLower(name), value)
|
||||
_, _, err := s.ExecRestrictedSQL(s, sql)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -929,6 +934,7 @@ const loadCommonGlobalVarsSQL = "select * from mysql.global_variables where vari
|
||||
variable.SQLModeVar + "', '" +
|
||||
variable.DistSQLJoinConcurrencyVar + "', '" +
|
||||
variable.MaxAllowedPacket + "', '" +
|
||||
variable.TiDBSkipUTF8Check + "', '" +
|
||||
variable.DistSQLScanConcurrencyVar + "')"
|
||||
|
||||
// LoadCommonGlobalVariableIfNeeded loads and applies commonly used global variables for the session.
|
||||
|
||||
@ -137,6 +137,9 @@ type SessionVars struct {
|
||||
// SkipConstraintCheck is true when importing data.
|
||||
SkipConstraintCheck bool
|
||||
|
||||
// SkipUTF8 check on input value.
|
||||
SkipUTF8Check bool
|
||||
|
||||
// SkipDDLWait can be set to true to skip 2 lease wait after create/drop/truncate table, create/drop database.
|
||||
// Then if there are multiple TiDB servers, the new table may not be available for other TiDB servers.
|
||||
SkipDDLWait bool
|
||||
|
||||
@ -599,6 +599,7 @@ var defaultSysVars = []*SysVar{
|
||||
{ScopeGlobal | ScopeSession, DistSQLJoinConcurrencyVar, "5"},
|
||||
{ScopeSession, TiDBSkipConstraintCheck, "0"},
|
||||
{ScopeSession, TiDBSkipDDLWait, "0"},
|
||||
{ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, "0"},
|
||||
{ScopeSession, TiDBOptAggPushDown, "ON"},
|
||||
{ScopeSession, TiDBOptInSubqUnFolding, "OFF"},
|
||||
{ScopeSession, BuildStatsConcurrencyVar, "4"},
|
||||
@ -611,6 +612,7 @@ const (
|
||||
DistSQLJoinConcurrencyVar = "tidb_distsql_join_concurrency"
|
||||
TiDBSkipConstraintCheck = "tidb_skip_constraint_check"
|
||||
TiDBSkipDDLWait = "tidb_skip_ddl_wait"
|
||||
TiDBSkipUTF8Check = "tidb_skip_utf8_check"
|
||||
TiDBOptAggPushDown = "tidb_opt_agg_push_down"
|
||||
TiDBOptInSubqUnFolding = "tidb_opt_insubquery_unfold"
|
||||
BuildStatsConcurrencyVar = "tidb_build_stats_concurrency"
|
||||
|
||||
@ -115,6 +115,8 @@ func SetSessionSystemVar(vars *variable.SessionVars, name string, value types.Da
|
||||
}
|
||||
case variable.TiDBSkipConstraintCheck:
|
||||
vars.SkipConstraintCheck = tidbOptOn(sVal)
|
||||
case variable.TiDBSkipUTF8Check:
|
||||
vars.SkipUTF8Check = tidbOptOn(sVal)
|
||||
case variable.TiDBSkipDDLWait:
|
||||
vars.SkipDDLWait = tidbOptOn(sVal)
|
||||
case variable.TiDBOptAggPushDown:
|
||||
|
||||
@ -121,23 +121,22 @@ func CastValue(ctx context.Context, val types.Datum, col *model.ColumnInfo) (cas
|
||||
if err != nil {
|
||||
return casted, errors.Trace(err)
|
||||
}
|
||||
if ctx.GetSessionVars().SkipUTF8Check {
|
||||
return casted, nil
|
||||
}
|
||||
if !mysql.IsUTF8Charset(col.Charset) {
|
||||
return casted, nil
|
||||
}
|
||||
str := casted.GetString()
|
||||
for _, r := range str {
|
||||
for i, r := range str {
|
||||
if r == utf8.RuneError {
|
||||
// Truncate to valid utf8 string.
|
||||
// casted = types.NewStringDatum(str[:i])
|
||||
casted = types.NewStringDatum(str[:i])
|
||||
err = sc.HandleTruncate(ErrTruncateWrongValue)
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
// TODO: enable it when find a better way to handle old incorrect data.
|
||||
log.Debugf("invalid UTF8 value %v for column %s", str, col.Name.O)
|
||||
}
|
||||
return casted, nil
|
||||
return casted, errors.Trace(err)
|
||||
}
|
||||
|
||||
// ColDesc describes column information like MySQL desc and show columns do.
|
||||
|
||||
Reference in New Issue
Block a user