Files
tidb/pkg/server/internal/resultset/resultset.go

165 lines
4.7 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resultset
import (
"context"
"sync"
"sync/atomic"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/server/internal/column"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)
// ResultSet is the result set of an query.
type ResultSet interface {
Columns() []*column.Info
NewChunk(chunk.Allocator) *chunk.Chunk
Next(context.Context, *chunk.Chunk) error
Close()
// IsClosed checks whether the result set is closed.
IsClosed() bool
FieldTypes() []*types.FieldType
SetPreparedStmt(stmt *core.PlanCacheStmt)
Finish() error
TryDetach() (ResultSet, bool, error)
}
var _ ResultSet = &tidbResultSet{}
// New creates a new result set
func New(recordSet sqlexec.RecordSet, preparedStmt *core.PlanCacheStmt) ResultSet {
return &tidbResultSet{
recordSet: recordSet,
preparedStmt: preparedStmt,
}
}
type tidbResultSet struct {
recordSet sqlexec.RecordSet
preparedStmt *core.PlanCacheStmt
columns []*column.Info
closed int32
// finishLock is a mutex used to synchronize access to the `Next`,`Finish` and `Close` functions of the adapter.
// It ensures that only one goroutine can access the `Next`,`Finish` and `Close` functions at a time, preventing race conditions.
// When we terminate the current SQL externally (e.g., kill query), an additional goroutine would be used to call the `Finish` function.
finishLock sync.Mutex
}
func (trs *tidbResultSet) NewChunk(alloc chunk.Allocator) *chunk.Chunk {
return trs.recordSet.NewChunk(alloc)
}
func (trs *tidbResultSet) Next(ctx context.Context, req *chunk.Chunk) error {
trs.finishLock.Lock()
defer trs.finishLock.Unlock()
return trs.recordSet.Next(ctx, req)
}
func (trs *tidbResultSet) Finish() error {
if trs.finishLock.TryLock() {
defer trs.finishLock.Unlock()
if x, ok := trs.recordSet.(interface{ Finish() error }); ok {
return x.Finish()
}
}
return nil
}
func (trs *tidbResultSet) Close() {
trs.finishLock.Lock()
defer trs.finishLock.Unlock()
if !atomic.CompareAndSwapInt32(&trs.closed, 0, 1) {
return
}
terror.Call(trs.recordSet.Close)
trs.recordSet = nil
}
// IsClosed implements ResultSet.IsClosed interface.
func (trs *tidbResultSet) IsClosed() bool {
return atomic.LoadInt32(&trs.closed) == 1
}
// OnFetchReturned implements FetchNotifier#OnFetchReturned
func (trs *tidbResultSet) OnFetchReturned() {
if cl, ok := trs.recordSet.(FetchNotifier); ok {
cl.OnFetchReturned()
}
}
// Columns implements ResultSet.Columns interface.
func (trs *tidbResultSet) Columns() []*column.Info {
if trs.columns != nil {
return trs.columns
}
// for prepare statement, try to get cached columnInfo array
if trs.preparedStmt != nil {
ps := trs.preparedStmt
if colInfos, ok := ps.PointGet.ColumnInfos.([]*column.Info); ok {
trs.columns = colInfos
}
}
if trs.columns == nil {
fields := trs.recordSet.Fields()
for _, v := range fields {
trs.columns = append(trs.columns, column.ConvertColumnInfo(v))
}
if trs.preparedStmt != nil {
// if Info struct has allocated object,
// here maybe we need deep copy Info to do caching
trs.preparedStmt.PointGet.ColumnInfos = trs.columns
}
}
return trs.columns
}
// FieldTypes implements ResultSet.FieldTypes interface.
func (trs *tidbResultSet) FieldTypes() []*types.FieldType {
fts := make([]*types.FieldType, 0, len(trs.recordSet.Fields()))
for _, f := range trs.recordSet.Fields() {
fts = append(fts, &f.Column.FieldType)
}
return fts
}
// SetPreparedStmt implements ResultSet.SetPreparedStmt interface.
func (trs *tidbResultSet) SetPreparedStmt(stmt *core.PlanCacheStmt) {
trs.preparedStmt = stmt
}
// TryDetach creates a new `ResultSet` which doesn't depend on the current session context.
func (trs *tidbResultSet) TryDetach() (ResultSet, bool, error) {
detachableRecordSet, ok := trs.recordSet.(sqlexec.DetachableRecordSet)
if !ok {
return nil, false, nil
}
recordSet, detached, err := detachableRecordSet.TryDetach()
if !detached || err != nil {
return nil, detached, err
}
return &tidbResultSet{
recordSet: recordSet,
preparedStmt: trs.preparedStmt,
columns: trs.columns,
}, true, nil
}