264 lines
8.6 KiB
Go
264 lines
8.6 KiB
Go
// Copyright 2020 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package kv
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/pingcap/kvproto/pkg/mpp"
|
|
"github.com/pingcap/tidb/pkg/util/tiflash"
|
|
"github.com/pingcap/tidb/pkg/util/tiflashcompute"
|
|
"github.com/tikv/client-go/v2/tikv"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
)
|
|
|
|
// MppVersion indicates the mpp-version used to build mpp plan
|
|
type MppVersion int64
|
|
|
|
const (
|
|
// MppVersionV0 supports TiFlash version [~, ~]
|
|
MppVersionV0 MppVersion = iota
|
|
|
|
// MppVersionV1 supports TiFlash version [v6.6.x, ~]
|
|
MppVersionV1
|
|
|
|
// MppVersionV2 supports TiFlash version [v7.3, ~], support ReportMPPTaskStatus service
|
|
MppVersionV2
|
|
|
|
// MppVersionV3 supports TiFlash version [v9.0, ~], support new serdes format of strings
|
|
MppVersionV3
|
|
|
|
mppVersionMax
|
|
|
|
newestMppVersion MppVersion = mppVersionMax - 1
|
|
|
|
// MppVersionUnspecified means the illegal or unspecified version, it only used in TiDB.
|
|
MppVersionUnspecified MppVersion = -1
|
|
|
|
// MppVersionUnspecifiedName denotes name of UNSPECIFIED mpp version
|
|
MppVersionUnspecifiedName string = "UNSPECIFIED"
|
|
)
|
|
|
|
// ToInt64 transforms MppVersion to int64
|
|
func (v MppVersion) ToInt64() int64 {
|
|
return int64(v)
|
|
}
|
|
|
|
// ToMppVersion transforms string to MppVersion
|
|
func ToMppVersion(name string) (MppVersion, bool) {
|
|
name = strings.ToUpper(name)
|
|
if name == MppVersionUnspecifiedName {
|
|
return MppVersionUnspecified, true
|
|
}
|
|
v, err := strconv.ParseInt(name, 10, 64)
|
|
if err != nil {
|
|
return MppVersionUnspecified, false
|
|
}
|
|
version := MppVersion(v)
|
|
if version >= MppVersionUnspecified && version <= newestMppVersion {
|
|
return version, true
|
|
}
|
|
return MppVersionUnspecified, false
|
|
}
|
|
|
|
// GetNewestMppVersion returns the mpp-version can be used in mpp plan
|
|
func GetNewestMppVersion() MppVersion {
|
|
return newestMppVersion
|
|
}
|
|
|
|
// MPPTaskMeta means the meta info such as location of a mpp task.
|
|
type MPPTaskMeta interface {
|
|
// GetAddress indicates which node this task should execute on.
|
|
GetAddress() string
|
|
}
|
|
|
|
// MPPQueryID means the global unique id of a mpp query.
|
|
type MPPQueryID struct {
|
|
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
|
|
LocalQueryID uint64 // unique mpp query id in local tidb memory.
|
|
ServerID uint64
|
|
}
|
|
|
|
// MPPTask means the minimum execution unit of a mpp computation job.
|
|
type MPPTask struct {
|
|
Meta MPPTaskMeta // on which store this task will execute
|
|
ID int64 // mppTaskID
|
|
StartTs uint64
|
|
GatherID uint64
|
|
MppQueryID MPPQueryID
|
|
TableID int64 // physical table id
|
|
MppVersion MppVersion // mpp version
|
|
SessionID uint64
|
|
SessionAlias string
|
|
|
|
PartitionTableIDs []int64
|
|
TiFlashStaticPrune bool
|
|
}
|
|
|
|
// ToPB generates the pb structure.
|
|
func (t *MPPTask) ToPB() *mpp.TaskMeta {
|
|
meta := &mpp.TaskMeta{
|
|
StartTs: t.StartTs,
|
|
GatherId: t.GatherID,
|
|
QueryTs: t.MppQueryID.QueryTs,
|
|
LocalQueryId: t.MppQueryID.LocalQueryID,
|
|
ServerId: t.MppQueryID.ServerID,
|
|
TaskId: t.ID,
|
|
MppVersion: t.MppVersion.ToInt64(),
|
|
ConnectionId: t.SessionID,
|
|
ConnectionAlias: t.SessionAlias,
|
|
}
|
|
if t.ID != -1 {
|
|
meta.Address = t.Meta.GetAddress()
|
|
}
|
|
return meta
|
|
}
|
|
|
|
// MppTaskStates denotes the state of mpp tasks
|
|
type MppTaskStates uint8
|
|
|
|
const (
|
|
// MppTaskReady means the task is ready
|
|
MppTaskReady MppTaskStates = iota
|
|
// MppTaskRunning means the task is running
|
|
MppTaskRunning
|
|
// MppTaskCancelled means the task is cancelled
|
|
MppTaskCancelled
|
|
// MppTaskDone means the task is done
|
|
MppTaskDone
|
|
)
|
|
|
|
// MPPDispatchRequest stands for a dispatching task.
|
|
type MPPDispatchRequest struct {
|
|
Data []byte // data encodes the dag coprocessor request.
|
|
Meta MPPTaskMeta // mpp store is the location of tiflash store.
|
|
IsRoot bool // root task returns data to tidb directly.
|
|
Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed.
|
|
// SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary.
|
|
SchemaVar int64
|
|
StartTs uint64
|
|
MppQueryID MPPQueryID
|
|
GatherID uint64
|
|
ID int64 // identify a single task
|
|
MppVersion MppVersion
|
|
CoordinatorAddress string
|
|
ReportExecutionSummary bool
|
|
State MppTaskStates
|
|
ResourceGroupName string
|
|
ConnectionID uint64
|
|
ConnectionAlias string
|
|
}
|
|
|
|
// CancelMPPTasksParam represents parameter for MPPClient's CancelMPPTasks
|
|
type CancelMPPTasksParam struct {
|
|
StoreAddr map[string]bool
|
|
Reqs []*MPPDispatchRequest
|
|
}
|
|
|
|
// EstablishMPPConnsParam represents parameter for MPPClient's EstablishMPPConns
|
|
type EstablishMPPConnsParam struct {
|
|
Ctx context.Context
|
|
Req *MPPDispatchRequest
|
|
TaskMeta *mpp.TaskMeta
|
|
Bo *tikv.Backoffer
|
|
}
|
|
|
|
// DispatchMPPTaskParam represents parameter for MPPClient's DispatchMPPTask
|
|
type DispatchMPPTaskParam struct {
|
|
Ctx context.Context
|
|
Req *MPPDispatchRequest
|
|
EnableCollectExecutionInfo bool
|
|
Bo *tikv.Backoffer
|
|
}
|
|
|
|
// MPPClient accepts and processes mpp requests.
|
|
type MPPClient interface {
|
|
// ConstructMPPTasks schedules task for a plan fragment.
|
|
// TODO:: This interface will be refined after we support more executors.
|
|
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration, tiflashcompute.DispatchPolicy, tiflash.ReplicaRead, func(error)) ([]MPPTaskMeta, error)
|
|
|
|
// DispatchMPPTask dispatch mpp task, and returns valid response when retry = false and err is nil.
|
|
DispatchMPPTask(DispatchMPPTaskParam) (resp *mpp.DispatchTaskResponse, retry bool, err error)
|
|
|
|
// EstablishMPPConns build a mpp connection to receive data, return valid response when err is nil.
|
|
EstablishMPPConns(EstablishMPPConnsParam) (resp *tikvrpc.MPPStreamResponse, retry bool, err error)
|
|
|
|
// CancelMPPTasks cancels mpp tasks.
|
|
CancelMPPTasks(CancelMPPTasksParam)
|
|
|
|
// CheckVisibility checks if it is safe to read using given ts.
|
|
CheckVisibility(startTime uint64) error
|
|
|
|
// GetMPPStoreCount returns number of TiFlash stores if there is no error, else return (0, error).
|
|
GetMPPStoreCount() (int, error)
|
|
}
|
|
|
|
// ReportStatusRequest wraps mpp ReportStatusRequest
|
|
type ReportStatusRequest struct {
|
|
Request *mpp.ReportTaskStatusRequest
|
|
}
|
|
|
|
// MppCoordinator describes the basic api for executing mpp physical plan.
|
|
type MppCoordinator interface {
|
|
// Execute generates and executes mpp tasks for mpp physical plan.
|
|
Execute(ctx context.Context) (Response, []KeyRange, error)
|
|
// Next returns next data
|
|
Next(ctx context.Context) (ResultSubset, error)
|
|
// ReportStatus report task execution info to coordinator
|
|
// It shouldn't change any state outside coordinator itself, since the query which generated the coordinator may not exist
|
|
ReportStatus(info ReportStatusRequest) error
|
|
// Close and release the used resources.
|
|
Close() error
|
|
// IsClosed returns whether mpp coordinator is closed or not
|
|
IsClosed() bool
|
|
// GetComputationCnt returns the number of node cnt that involved in the MPP computation.
|
|
GetNodeCnt() int
|
|
}
|
|
|
|
// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
|
|
// However, the request doesn't contain the particular plan, because only key ranges take effect on the location assignment.
|
|
type MPPBuildTasksRequest struct {
|
|
KeyRanges []KeyRange
|
|
StartTS uint64
|
|
|
|
PartitionIDAndRanges []PartitionIDAndRanges
|
|
}
|
|
|
|
// ToString returns a string representation of MPPBuildTasksRequest. Used for CacheKey.
|
|
func (req *MPPBuildTasksRequest) ToString() string {
|
|
sb := strings.Builder{}
|
|
if req.KeyRanges != nil { // Non-partiton
|
|
for i, keyRange := range req.KeyRanges {
|
|
sb.WriteString("range_id" + strconv.Itoa(i))
|
|
sb.WriteString(keyRange.StartKey.String())
|
|
sb.WriteString(keyRange.EndKey.String())
|
|
}
|
|
return sb.String()
|
|
}
|
|
// Partition
|
|
for _, partitionIDAndRange := range req.PartitionIDAndRanges {
|
|
sb.WriteString("partition_id" + strconv.Itoa(int(partitionIDAndRange.ID)))
|
|
for i, keyRange := range partitionIDAndRange.KeyRanges {
|
|
sb.WriteString("range_id" + strconv.Itoa(i))
|
|
sb.WriteString(keyRange.StartKey.String())
|
|
sb.WriteString(keyRange.EndKey.String())
|
|
}
|
|
}
|
|
return sb.String()
|
|
}
|