Files
tidb/pkg/planner/core/expression_codec_fn.go

469 lines
14 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"context"
"encoding/hex"
"encoding/json"
"strconv"
"strings"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/expropt"
"github.com/pingcap/tidb/pkg/infoschema"
infoschemactx "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
)
// tidbCodecFuncHelper contains some utililty functions for
// - tidb_decode_key(hex_string)
// - tidb_encode_record_key(database_name, table_name, handle/pk columns...)
// - tidb_encode_index_key(database_name, table_name, index_name, index columns..., handle/pk columns...)
//
// define an individual struct instead of a bunch of un-exported functions
// to avoid polluting the global scope of current package.
type tidbCodecFuncHelper struct{}
func (h tidbCodecFuncHelper) encodeHandleFromRow(
ctx expression.EvalContext,
checker expropt.PrivilegeChecker,
isVer infoschemactx.MetaOnlyInfoSchema,
args []expression.Expression,
row chunk.Row,
) ([]byte, bool, error) {
dbName, isNull, err := args[0].EvalString(ctx, row)
if err != nil || isNull {
return nil, isNull, err
}
tblName, isNull, err := args[1].EvalString(ctx, row)
if err != nil || isNull {
return nil, isNull, err
}
is := isVer.(infoschema.InfoSchema)
tbl, _, err := h.findCommonOrPartitionedTable(checker, is, dbName, tblName)
if err != nil {
return nil, false, err
}
recordID, err := h.buildHandle(ctx, tbl.Meta(), args[2:], row)
if err != nil {
return nil, false, err
}
key := tablecodec.EncodeRecordKey(tbl.RecordPrefix(), recordID)
return key, false, nil
}
func (h tidbCodecFuncHelper) findCommonOrPartitionedTable(
checker expropt.PrivilegeChecker,
is infoschema.InfoSchema,
dbName string,
tblName string,
) (table.Table, int64, error) {
tblName, partName := h.extractTablePartition(tblName)
tbl, err := is.TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tblName))
if err != nil {
return nil, 0, err
}
if !checker.RequestVerification(dbName, tblName, "", mysql.AllPrivMask) {
// The arguments will be filled by caller.
return nil, 0, plannererrors.ErrSpecificAccessDenied
}
if len(partName) > 0 {
if part := tbl.GetPartitionedTable(); part != nil {
pid, err := tables.FindPartitionByName(tbl.Meta(), partName)
if err != nil {
return nil, 0, errors.Trace(err)
}
tbl = part.GetPartition(pid)
return tbl, pid, nil
}
return nil, 0, errors.New("not a partitioned table")
}
return tbl, tbl.Meta().ID, nil
}
func (tidbCodecFuncHelper) extractTablePartition(str string) (table, partition string) {
start := strings.IndexByte(str, '(')
if start == -1 {
return str, ""
}
end := strings.IndexByte(str, ')')
if end == -1 {
return str, ""
}
return str[:start], str[start+1 : end]
}
func (tidbCodecFuncHelper) buildHandle(
ctx expression.EvalContext,
tblInfo *model.TableInfo,
pkArgs []expression.Expression,
row chunk.Row,
) (kv.Handle, error) {
var recordID kv.Handle
if !tblInfo.IsCommonHandle {
h, _, err := pkArgs[0].EvalInt(ctx, row)
if err != nil {
return nil, err
}
recordID = kv.IntHandle(h)
} else {
pkIdx := tables.FindPrimaryIndex(tblInfo)
if len(pkIdx.Columns) != len(pkArgs) {
return nil, errors.Errorf("pk column count mismatch, expected %d, got %d", len(pkIdx.Columns), pkArgs)
}
pkDts := make([]types.Datum, 0, len(pkIdx.Columns))
for i, idxCol := range pkIdx.Columns {
dt, err := pkArgs[i].Eval(ctx, row)
if err != nil {
return nil, err
}
ft := tblInfo.Columns[idxCol.Offset].FieldType
pkDt, err := dt.ConvertTo(ctx.TypeCtx(), &ft)
if err != nil {
return nil, err
}
pkDts = append(pkDts, pkDt)
}
tablecodec.TruncateIndexValues(tblInfo, pkIdx, pkDts)
var handleBytes []byte
handleBytes, err := codec.EncodeKey(ctx.Location(), nil, pkDts...)
ec := ctx.ErrCtx()
err = ec.HandleError(err)
if err != nil {
return nil, err
}
recordID, err = kv.NewCommonHandle(handleBytes)
if err != nil {
return nil, err
}
}
return recordID, nil
}
func (h tidbCodecFuncHelper) encodeIndexKeyFromRow(
ctx expression.EvalContext,
checker expropt.PrivilegeChecker,
isVer infoschemactx.MetaOnlyInfoSchema,
args []expression.Expression,
row chunk.Row,
) ([]byte, bool, error) {
dbName, isNull, err := args[0].EvalString(ctx, row)
if err != nil || isNull {
return nil, isNull, err
}
tblName, isNull, err := args[1].EvalString(ctx, row)
if err != nil || isNull {
return nil, isNull, err
}
idxName, isNull, err := args[2].EvalString(ctx, row)
if err != nil || isNull {
return nil, isNull, err
}
is := isVer.(infoschema.InfoSchema)
tbl, physicalID, err := h.findCommonOrPartitionedTable(checker, is, dbName, tblName)
if err != nil {
return nil, false, err
}
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName(strings.ToLower(idxName))
if idxInfo == nil {
return nil, false, errors.New("index not found")
}
pkLen := 1
var pkIdx *model.IndexInfo
if tblInfo.IsCommonHandle {
pkIdx = tables.FindPrimaryIndex(tblInfo)
pkLen = len(pkIdx.Columns)
}
if len(idxInfo.Columns)+pkLen != len(args)-3 {
return nil, false, errors.Errorf(
"column count mismatch, expected %d (index length + pk/rowid length), got %d",
len(idxInfo.Columns)+pkLen, len(args)-3)
}
handle, err := h.buildHandle(ctx, tblInfo, args[3+len(idxInfo.Columns):], row)
if err != nil {
return nil, false, err
}
idxDts := make([]types.Datum, 0, len(idxInfo.Columns))
for i, idxCol := range idxInfo.Columns {
dt, err := args[i+3].Eval(ctx, row)
if err != nil {
return nil, false, err
}
ft := tblInfo.Columns[idxCol.Offset].FieldType
idxDt, err := dt.ConvertTo(ctx.TypeCtx(), &ft)
if err != nil {
return nil, false, err
}
idxDts = append(idxDts, idxDt)
}
tablecodec.TruncateIndexValues(tblInfo, idxInfo, idxDts)
// Use physicalID instead of tblInfo.ID here to handle the partition case.
idx, err := tables.NewIndex(physicalID, tblInfo, idxInfo)
if err != nil {
return nil, false, err
}
idxKey, _, err := idx.GenIndexKey(ctx.ErrCtx(), ctx.Location(), idxDts, handle, nil)
return idxKey, false, err
}
func (h tidbCodecFuncHelper) decodeKeyFromString(
tc types.Context, isVer infoschemactx.MetaOnlyInfoSchema, s string) string {
key, err := hex.DecodeString(s)
if err != nil {
tc.AppendWarning(errors.NewNoStackErrorf("invalid key: %X", key))
return s
}
// Auto decode byte if needed.
_, bs, err := codec.DecodeBytes(key, nil)
if err == nil {
key = bs
}
tableID := tablecodec.DecodeTableID(key)
if tableID <= 0 {
tc.AppendWarning(errors.NewNoStackErrorf("invalid key: %X", key))
return s
}
is, ok := isVer.(infoschema.InfoSchema)
if !ok {
tc.AppendWarning(errors.NewNoStackErrorf("infoschema not found when decoding key: %X", key))
return s
}
tbl, _ := infoschema.FindTableByTblOrPartID(is, tableID)
loc := tc.Location()
if tablecodec.IsRecordKey(key) {
ret, err := h.decodeRecordKey(key, tableID, tbl, loc)
if err != nil {
tc.AppendWarning(err)
return s
}
return ret
} else if tablecodec.IsIndexKey(key) {
ret, err := h.decodeIndexKey(key, tableID, tbl, loc)
if err != nil {
tc.AppendWarning(err)
return s
}
return ret
} else if tablecodec.IsTableKey(key) {
ret, err := h.decodeTableKey(key, tableID, tbl)
if err != nil {
tc.AppendWarning(err)
return s
}
return ret
}
tc.AppendWarning(errors.NewNoStackErrorf("invalid key: %X", key))
return s
}
func (h tidbCodecFuncHelper) decodeRecordKey(
key []byte, tableID int64, tbl table.Table, loc *time.Location) (string, error) {
_, handle, err := tablecodec.DecodeRecordKey(key)
if err != nil {
return "", errors.Trace(err)
}
if handle.IsInt() {
ret := make(map[string]any)
if tbl != nil && tbl.Meta().Partition != nil {
ret["partition_id"] = tableID
tableID = tbl.Meta().ID
}
ret["table_id"] = strconv.FormatInt(tableID, 10)
// When the clustered index is enabled, we should show the PK name.
if tbl != nil && tbl.Meta().HasClusteredIndex() {
ret[tbl.Meta().GetPkName().String()] = handle.IntValue()
} else {
ret["_tidb_rowid"] = handle.IntValue()
}
retStr, err := json.Marshal(ret)
if err != nil {
return "", errors.Trace(err)
}
return string(retStr), nil
}
if tbl != nil {
tblInfo := tbl.Meta()
idxInfo := tables.FindPrimaryIndex(tblInfo)
if idxInfo == nil {
return "", errors.Trace(errors.Errorf("primary key not found when decoding record key: %X", key))
}
cols := make(map[int64]*types.FieldType, len(tblInfo.Columns))
for _, col := range tblInfo.Columns {
cols[col.ID] = &(col.FieldType)
}
handleColIDs := make([]int64, 0, len(idxInfo.Columns))
for _, col := range idxInfo.Columns {
handleColIDs = append(handleColIDs, tblInfo.Columns[col.Offset].ID)
}
if len(handleColIDs) != handle.NumCols() {
return "", errors.Trace(errors.Errorf("primary key length not match handle columns number in key"))
}
datumMap, err := tablecodec.DecodeHandleToDatumMap(handle, handleColIDs, cols, loc, nil)
if err != nil {
return "", errors.Trace(err)
}
ret := make(map[string]any)
if tbl.Meta().Partition != nil {
ret["partition_id"] = tableID
tableID = tbl.Meta().ID
}
ret["table_id"] = tableID
handleRet := make(map[string]any)
for colID := range datumMap {
dt := datumMap[colID]
dtStr, err := h.datumToJSONObject(&dt)
if err != nil {
return "", errors.Trace(err)
}
found := false
for _, colInfo := range tblInfo.Columns {
if colInfo.ID == colID {
found = true
handleRet[colInfo.Name.L] = dtStr
break
}
}
if !found {
return "", errors.Trace(errors.Errorf("column not found when decoding record key: %X", key))
}
}
ret["handle"] = handleRet
retStr, err := json.Marshal(ret)
if err != nil {
return "", errors.Trace(err)
}
return string(retStr), nil
}
ret := make(map[string]any)
ret["table_id"] = tableID
ret["handle"] = handle.String()
retStr, err := json.Marshal(ret)
if err != nil {
return "", errors.Trace(err)
}
return string(retStr), nil
}
func (h tidbCodecFuncHelper) decodeIndexKey(
key []byte, tableID int64, tbl table.Table, loc *time.Location) (string, error) {
if tbl != nil {
_, indexID, _, err := tablecodec.DecodeKeyHead(key)
if err != nil {
return "", errors.Trace(errors.Errorf("invalid record/index key: %X", key))
}
tblInfo := tbl.Meta()
var targetIndex *model.IndexInfo
for _, idx := range tblInfo.Indices {
if idx.ID == indexID {
targetIndex = idx
break
}
}
if targetIndex == nil {
return "", errors.Trace(errors.Errorf("index not found when decoding index key: %X", key))
}
colInfos := tables.BuildRowcodecColInfoForIndexColumns(targetIndex, tblInfo)
tps := tables.BuildFieldTypesForIndexColumns(targetIndex, tblInfo)
values, err := tablecodec.DecodeIndexKV(key, []byte{0}, len(colInfos), tablecodec.HandleNotNeeded, colInfos)
if err != nil {
return "", errors.Trace(err)
}
ds := make([]types.Datum, 0, len(colInfos))
for i := range colInfos {
d, err := tablecodec.DecodeColumnValue(values[i], tps[i], loc)
if err != nil {
return "", errors.Trace(err)
}
ds = append(ds, d)
}
ret := make(map[string]any)
if tbl.Meta().Partition != nil {
ret["partition_id"] = tableID
tableID = tbl.Meta().ID
}
ret["table_id"] = tableID
ret["index_id"] = indexID
idxValMap := make(map[string]any, len(targetIndex.Columns))
for i := range targetIndex.Columns {
dtStr, err := h.datumToJSONObject(&ds[i])
if err != nil {
return "", errors.Trace(err)
}
idxValMap[targetIndex.Columns[i].Name.L] = dtStr
}
ret["index_vals"] = idxValMap
retStr, err := json.Marshal(ret)
if err != nil {
return "", errors.Trace(err)
}
return string(retStr), nil
}
_, indexID, indexValues, err := tablecodec.DecodeIndexKey(key)
if err != nil {
return "", errors.Trace(errors.Errorf("invalid index key: %X", key))
}
ret := make(map[string]any)
ret["table_id"] = tableID
ret["index_id"] = indexID
ret["index_vals"] = strings.Join(indexValues, ", ")
retStr, err := json.Marshal(ret)
if err != nil {
return "", errors.Trace(err)
}
return string(retStr), nil
}
func (tidbCodecFuncHelper) decodeTableKey(_ []byte, tableID int64, tbl table.Table) (string, error) {
ret := map[string]int64{}
if tbl != nil && tbl.Meta().GetPartitionInfo() != nil {
ret["partition_id"] = tableID
tableID = tbl.Meta().ID
}
ret["table_id"] = tableID
retStr, err := json.Marshal(ret)
if err != nil {
return "", errors.Trace(err)
}
return string(retStr), nil
}
func (tidbCodecFuncHelper) datumToJSONObject(d *types.Datum) (any, error) {
if d.IsNull() {
return nil, nil
}
return d.ToString()
}