diff --git a/distsql/distsql.go b/distsql/distsql.go index 36dabda427..e69ed5f85e 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -85,11 +85,10 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (SelectResult, error) { sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb) - if err != nil { - return sr, err - } - if selectResult, ok := sr.(*selectResult); ok { - selectResult.copPlanIDs = copPlanIDs + if err == nil { + if selectResult, ok := sr.(*selectResult); ok { + selectResult.copPlanIDs = copPlanIDs + } } return sr, err } diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 478881e477..500bacd69a 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -206,6 +206,14 @@ func (s *testSuite) TestSelectStreaming(c *C) { c.Assert(err, IsNil) } +func (s *testSuite) TestSelectStreamingWithNextRaw(c *C) { + response, _ := s.createSelectStreaming(1, 2, c) + response.Fetch(context.TODO()) + data, err := response.NextRaw(context.TODO()) + c.Assert(err, IsNil) + c.Assert(len(data), Equals, 16) +} + func (s *testSuite) TestSelectStreamingChunkSize(c *C) { response, colTypes := s.createSelectStreaming(100, 1000000, c) response.Fetch(context.TODO()) @@ -285,6 +293,30 @@ func (s *testSuite) TestAnalyze(c *C) { c.Assert(err, IsNil) } +func (s *testSuite) TestChecksum(c *C) { + request, err := (&RequestBuilder{}).SetKeyRanges(nil). + SetChecksumRequest(&tipb.ChecksumRequest{}). + Build() + c.Assert(err, IsNil) + + response, err := Checksum(context.TODO(), s.sctx.GetClient(), request, kv.DefaultVars) + c.Assert(err, IsNil) + + result, ok := response.(*selectResult) + c.Assert(ok, IsTrue) + c.Assert(result.label, Equals, "checksum") + c.Assert(result.sqlType, Equals, "general") + + response.Fetch(context.TODO()) + + bytes, err := response.NextRaw(context.TODO()) + c.Assert(err, IsNil) + c.Assert(len(bytes), Equals, 16) + + err = response.Close() + c.Assert(err, IsNil) +} + // mockResponse implements kv.Response interface. // Used only for test. type mockResponse struct { diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 92532de0b7..9e1ef6c624 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -54,20 +54,18 @@ func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label fmt. // SetTableRanges sets "KeyRanges" for "kv.Request" by converting "tableRanges" // to "KeyRanges" firstly. func (builder *RequestBuilder) SetTableRanges(tid int64, tableRanges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder { - if builder.err != nil { - return builder + if builder.err == nil { + builder.Request.KeyRanges = TableRangesToKVRanges(tid, tableRanges, fb) } - builder.Request.KeyRanges = TableRangesToKVRanges(tid, tableRanges, fb) return builder } // SetIndexRanges sets "KeyRanges" for "kv.Request" by converting index range // "ranges" to "KeyRanges" firstly. func (builder *RequestBuilder) SetIndexRanges(sc *stmtctx.StatementContext, tid, idxID int64, ranges []*ranger.Range) *RequestBuilder { - if builder.err != nil { - return builder + if builder.err == nil { + builder.Request.KeyRanges, builder.err = IndexRangesToKVRanges(sc, tid, idxID, ranges, nil) } - builder.Request.KeyRanges, builder.err = IndexRangesToKVRanges(sc, tid, idxID, ranges, nil) return builder } @@ -80,41 +78,38 @@ func (builder *RequestBuilder) SetTableHandles(tid int64, handles []int64) *Requ // SetDAGRequest sets the request type to "ReqTypeDAG" and construct request data. func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuilder { - if builder.err != nil { - return builder + if builder.err == nil { + builder.Request.Tp = kv.ReqTypeDAG + builder.Request.StartTs = dag.StartTs + builder.Request.Data, builder.err = dag.Marshal() } - builder.Request.Tp = kv.ReqTypeDAG - builder.Request.StartTs = dag.StartTs - builder.Request.Data, builder.err = dag.Marshal() return builder } // SetAnalyzeRequest sets the request type to "ReqTypeAnalyze" and cosntruct request data. func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestBuilder { - if builder.err != nil { - return builder + if builder.err == nil { + builder.Request.Tp = kv.ReqTypeAnalyze + builder.Request.StartTs = ana.StartTs + builder.Request.Data, builder.err = ana.Marshal() + builder.Request.NotFillCache = true + builder.Request.IsolationLevel = kv.RC + builder.Request.Priority = kv.PriorityLow } - builder.Request.Tp = kv.ReqTypeAnalyze - builder.Request.StartTs = ana.StartTs - builder.Request.Data, builder.err = ana.Marshal() - builder.Request.NotFillCache = true - builder.Request.IsolationLevel = kv.RC - builder.Request.Priority = kv.PriorityLow return builder } // SetChecksumRequest sets the request type to "ReqTypeChecksum" and construct request data. func (builder *RequestBuilder) SetChecksumRequest(checksum *tipb.ChecksumRequest) *RequestBuilder { - if builder.err != nil { - return builder + if builder.err == nil { + builder.Request.Tp = kv.ReqTypeChecksum + builder.Request.StartTs = checksum.StartTs + builder.Request.Data, builder.err = checksum.Marshal() + builder.Request.NotFillCache = true } - builder.Request.Tp = kv.ReqTypeChecksum - builder.Request.StartTs = checksum.StartTs - builder.Request.Data, builder.err = checksum.Marshal() - builder.Request.NotFillCache = true return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index b64dd63218..8485e49de8 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -18,12 +18,15 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" @@ -549,3 +552,64 @@ func (s *testSuite) TestRequestBuilder6(c *C) { c.Assert(actual, DeepEquals, expect) } + +func (s *testSuite) TestTableRangesToKVRangesWithFbs(c *C) { + ranges := []*ranger.Range{ + { + LowVal: []types.Datum{types.NewIntDatum(1)}, + HighVal: []types.Datum{types.NewIntDatum(4)}, + }, + } + hist := statistics.NewHistogram(1, 30, 30, 0, types.NewFieldType(mysql.TypeLonglong), chunk.InitialCapacity, 0) + for i := 0; i < 10; i++ { + hist.Bounds.AppendInt64(0, int64(i)) + hist.Bounds.AppendInt64(0, int64(i+2)) + hist.Buckets = append(hist.Buckets, statistics.Bucket{Repeat: 10, Count: int64(i + 30)}) + } + fb := statistics.NewQueryFeedback(0, hist, 0, false) + lower, upper := types.NewIntDatum(2), types.NewIntDatum(3) + fb.Feedback = []statistics.Feedback{ + {Lower: &lower, Upper: &upper, Count: 1, Repeat: 1}, + } + actual := TableRangesToKVRanges(0, ranges, fb) + expect := []kv.KeyRange{ + { + StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5}, + }, + } + for i := 0; i < len(actual); i++ { + c.Assert(actual[i], DeepEquals, expect[i]) + } +} + +func (s *testSuite) TestIndexRangesToKVRangesWithFbs(c *C) { + ranges := []*ranger.Range{ + { + LowVal: []types.Datum{types.NewIntDatum(1)}, + HighVal: []types.Datum{types.NewIntDatum(4)}, + }, + } + hist := statistics.NewHistogram(1, 30, 30, 0, types.NewFieldType(mysql.TypeLonglong), chunk.InitialCapacity, 0) + for i := 0; i < 10; i++ { + hist.Bounds.AppendInt64(0, int64(i)) + hist.Bounds.AppendInt64(0, int64(i+2)) + hist.Buckets = append(hist.Buckets, statistics.Bucket{Repeat: 10, Count: int64(i + 30)}) + } + fb := statistics.NewQueryFeedback(0, hist, 0, false) + lower, upper := types.NewIntDatum(2), types.NewIntDatum(3) + fb.Feedback = []statistics.Feedback{ + {Lower: &lower, Upper: &upper, Count: 1, Repeat: 1}, + } + actual, err := IndexRangesToKVRanges(new(stmtctx.StatementContext), 0, 0, ranges, fb) + c.Assert(err, IsNil) + expect := []kv.KeyRange{ + { + StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5}, + }, + } + for i := 0; i < len(actual); i++ { + c.Assert(actual[i], DeepEquals, expect[i]) + } +} diff --git a/distsql/select_result.go b/distsql/select_result.go index 72a283d1b2..9af38c8706 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -123,14 +123,14 @@ func (r *selectResult) fetch(ctx context.Context) { } // NextRaw returns the next raw partial result. -func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) { +func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) { re := <-r.results r.partialCount++ r.feedback.Invalidate() - if re.result == nil || re.err != nil { - return nil, errors.Trace(re.err) + if re.result != nil && re.err == nil { + data = re.result.GetData() } - return re.result.GetData(), nil + return data, re.err } // Next reads data to the chunk. diff --git a/distsql/select_result_test.go b/distsql/select_result_test.go new file mode 100644 index 0000000000..c543df0507 --- /dev/null +++ b/distsql/select_result_test.go @@ -0,0 +1,55 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package distsql + +import ( + "fmt" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/mock" + "github.com/pingcap/tipb/go-tipb" +) + +func (s *testSuite) TestUpdateCopRuntimeStats(c *C) { + ctx := mock.NewContext() + ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext) + sr := selectResult{ctx: ctx} + c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil) + sr.updateCopRuntimeStats("a") + + ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl() + t := uint64(1) + sr.selectResp = &tipb.SelectResponse{ + ExecutionSummaries: []*tipb.ExecutorExecutionSummary{ + {TimeProcessedNs: &t, NumProducedRows: &t, NumIterations: &t}, + }, + } + c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue) + sr.updateCopRuntimeStats("callee") + c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse) + + sr.copPlanIDs = []fmt.Stringer{copPlan{}} + c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil) + c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs)) + sr.updateCopRuntimeStats("callee") + c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1, rows:1") +} + +type copPlan struct{} + +func (p copPlan) String() string { + return "callee" +} diff --git a/distsql/stream.go b/distsql/stream.go index 17e6df70cc..ac83afc914 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -123,10 +123,9 @@ func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) { } } } + r.curr.RowsData = remainRowsData if len(remainRowsData) == 0 { r.curr = nil // Current chunk is finished. - } else { - r.curr.RowsData = remainRowsData } return nil }