coprocessor: Update KvProto (#20966)

Signed-off-by: Breezewish <me@breeswish.org>
This commit is contained in:
Wenxuan
2020-11-25 11:06:55 +08:00
committed by GitHub
parent 72794461eb
commit 07ff41d0c4
5 changed files with 58 additions and 40 deletions

2
go.mod
View File

@ -45,7 +45,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb
github.com/pingcap/kvproto v0.0.0-20201120081251-756b1447ba12
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8
github.com/pingcap/parser v0.0.0-20201123080035-8f4c6ab94e11
github.com/pingcap/sysutil v0.0.0-20201021075216-f93ced2829e2

5
go.sum
View File

@ -614,6 +614,7 @@ github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/otiai10/copy v1.2.0 h1:HvG945u96iNadPoG2/Ja2+AUJeW5YuFQMixq9yirC+k=
github.com/otiai10/copy v1.2.0/go.mod h1:rrF5dJ5F0t/EWSYODDu4j9/vEeYHMkc8jt0zJChqQWw=
github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE=
github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs=
@ -680,6 +681,8 @@ github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb h1:K3r4KjVQeD4nLnfj44ibdLIXnUh58aQpkgVNWuBO9z0=
github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20201120081251-756b1447ba12 h1:f33y/pngBI525jqytoSZevpmmq43XiIHoeElx3BppNQ=
github.com/pingcap/kvproto v0.0.0-20201120081251-756b1447ba12/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
@ -709,6 +712,7 @@ github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNx
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20201026044621-45e60c77588f h1:J+0TAI+7Hvebz4bM4GnRCRT4MpjYnUxbyi9ky5ZQUsU=
github.com/pingcap/tipb v0.0.0-20201026044621-45e60c77588f/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tiup v1.2.3 h1:8OCQF7sHhT6VqE8pZU1JTSogPA90OFuWWM/B746x0YY=
github.com/pingcap/tiup v1.2.3/go.mod h1:q8WzflNHjE1U49k2qstTL0clx2pKh8pkOzUFV4RTvQo=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -752,6 +756,7 @@ github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLk
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/prom2json v1.3.0/go.mod h1:rMN7m0ApCowcoDlypBHlkNbp5eJQf/+1isKykIP5ZnM=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/r3labs/diff v0.0.0-20200627101315-aecd9dd05dd2 h1:786HUIrynbbk5PzUf9Rp3aAUkNRksUiiipSAlyJ68As=
github.com/r3labs/diff v0.0.0-20200627101315-aecd9dd05dd2/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig=
github.com/rakyll/statik v0.1.6/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=

View File

@ -77,11 +77,11 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt
resp.CanBeCached = true
resp.CacheLastVersion = uint64(cacheVersion.(int))
if resp.ExecDetails == nil {
resp.ExecDetails = &kvrpcpb.ExecDetails{HandleTime: &kvrpcpb.HandleTime{ProcessMs: 500}}
} else if resp.ExecDetails.HandleTime == nil {
resp.ExecDetails.HandleTime = &kvrpcpb.HandleTime{ProcessMs: 500}
resp.ExecDetails = &kvrpcpb.ExecDetails{TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: 500}}
} else if resp.ExecDetails.TimeDetail == nil {
resp.ExecDetails.TimeDetail = &kvrpcpb.TimeDetail{ProcessWallTimeMs: 500}
} else {
resp.ExecDetails.HandleTime.ProcessMs = 500
resp.ExecDetails.TimeDetail.ProcessWallTimeMs = 500
}
}()
}
@ -352,7 +352,10 @@ func buildResp(chunks []tipb.Chunk, closureExecutor *closureExecutor, dagReq *ti
}
}
resp.ExecDetails = &kvrpcpb.ExecDetails{
HandleTime: &kvrpcpb.HandleTime{ProcessMs: int64(dur / time.Millisecond)},
TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: int64(dur / time.Millisecond)},
}
resp.ExecDetailsV2 = &kvrpcpb.ExecDetailsV2{
TimeDetail: resp.ExecDetails.TimeDetail,
}
data, err := proto.Marshal(selResp)
if err != nil {

View File

@ -345,8 +345,8 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
IsolationLevel: pbIsolationLevel(b.req.IsolationLevel),
Priority: kvPriorityToCommandPri(b.req.Priority),
NotFillCache: b.req.NotFillCache,
HandleTime: true,
ScanDetail: true,
RecordTimeStat: true,
RecordScanStat: true,
TaskId: b.req.TaskID,
})
req.StoreTp = kv.TiFlash

View File

@ -848,8 +848,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel),
Priority: kvPriorityToCommandPri(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
HandleTime: true,
ScanDetail: true,
RecordTimeStat: true,
RecordScanStat: true,
TaskId: worker.req.TaskID,
})
req.StoreTp = task.storeType
@ -974,14 +974,17 @@ func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *co
backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.types), " ", ",", -1)
logStr += fmt.Sprintf(" backoff_ms:%d backoff_types:%s", bo.totalSleep, backoffTypes)
}
var detailV2 *kvrpcpb.ExecDetailsV2
var detail *kvrpcpb.ExecDetails
if resp.Resp != nil {
switch r := resp.Resp.(type) {
case *coprocessor.Response:
detailV2 = r.ExecDetailsV2
detail = r.ExecDetails
case *tikvrpc.CopStreamResponse:
// streaming request returns io.EOF, so the first CopStreamResponse.Response maybe nil.
if r.Response != nil {
detailV2 = r.Response.ExecDetailsV2
detail = r.Response.ExecDetails
}
default:
@ -989,33 +992,33 @@ func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *co
}
}
if detail != nil && detail.HandleTime != nil {
processMs := detail.HandleTime.ProcessMs
waitMs := detail.HandleTime.WaitMs
if processMs > minLogKVProcessTime {
logStr += fmt.Sprintf(" kv_process_ms:%d", processMs)
if detail.ScanDetail != nil {
logStr = appendScanDetail(logStr, "write", detail.ScanDetail.Write)
logStr = appendScanDetail(logStr, "data", detail.ScanDetail.Data)
logStr = appendScanDetail(logStr, "lock", detail.ScanDetail.Lock)
}
if detail.ScanDetailV2 != nil {
logStr += fmt.Sprintf(" processed versions: %d", detail.ScanDetailV2.ProcessedVersions)
logStr += fmt.Sprintf(" total versions: %d", detail.ScanDetailV2.TotalVersions)
logStr += fmt.Sprintf(" delete skipped count: %d", detail.ScanDetailV2.RocksdbDeleteSkippedCount)
logStr += fmt.Sprintf(" key skipped count: %d", detail.ScanDetailV2.RocksdbKeySkippedCount)
logStr += fmt.Sprintf(" cache hit count: %d", detail.ScanDetailV2.RocksdbBlockCacheHitCount)
logStr += fmt.Sprintf(" read count: %d", detail.ScanDetailV2.RocksdbBlockReadCount)
logStr += fmt.Sprintf(" read byte: %d", detail.ScanDetailV2.RocksdbBlockReadByte)
}
}
if waitMs > minLogKVWaitTime {
logStr += fmt.Sprintf(" kv_wait_ms:%d", waitMs)
if processMs <= minLogKVProcessTime {
logStr = strings.Replace(logStr, "TIME_COP_PROCESS", "TIME_COP_WAIT", 1)
}
var timeDetail *kvrpcpb.TimeDetail
if detailV2 != nil && detailV2.TimeDetail != nil {
timeDetail = detailV2.TimeDetail
} else if detail != nil && detail.TimeDetail != nil {
timeDetail = detail.TimeDetail
}
if timeDetail != nil {
logStr += fmt.Sprintf(" kv_process_ms:%d", timeDetail.ProcessWallTimeMs)
logStr += fmt.Sprintf(" kv_wait_ms:%d", timeDetail.WaitWallTimeMs)
if timeDetail.ProcessWallTimeMs <= minLogKVProcessTime {
logStr = strings.Replace(logStr, "TIME_COP_PROCESS", "TIME_COP_WAIT", 1)
}
}
if detailV2 != nil && detailV2.ScanDetailV2 != nil {
logStr += fmt.Sprintf(" processed_versions:%d", detailV2.ScanDetailV2.ProcessedVersions)
logStr += fmt.Sprintf(" total_versions:%d", detailV2.ScanDetailV2.TotalVersions)
logStr += fmt.Sprintf(" rocksdb_delete_skipped_count:%d", detailV2.ScanDetailV2.RocksdbDeleteSkippedCount)
logStr += fmt.Sprintf(" rocksdb_key_skipped_count:%d", detailV2.ScanDetailV2.RocksdbKeySkippedCount)
logStr += fmt.Sprintf(" rocksdb_cache_hit_count:%d", detailV2.ScanDetailV2.RocksdbBlockCacheHitCount)
logStr += fmt.Sprintf(" rocksdb_read_count:%d", detailV2.ScanDetailV2.RocksdbBlockReadCount)
logStr += fmt.Sprintf(" rocksdb_read_byte:%d", detailV2.ScanDetailV2.RocksdbBlockReadByte)
} else if detail != nil && detail.ScanDetail != nil {
logStr = appendScanDetail(logStr, "write", detail.ScanDetail.Write)
logStr = appendScanDetail(logStr, "data", detail.ScanDetail.Data)
logStr = appendScanDetail(logStr, "lock", detail.ScanDetail.Lock)
}
logutil.Logger(bo.ctx).Info(logStr)
}
@ -1130,10 +1133,11 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
resp.detail.CalleeAddress = rpcCtx.Addr
}
resp.respTime = costTime
if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if handleTime := pbDetails.HandleTime; handleTime != nil {
resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
resp.detail.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond
if pbDetails := resp.pbResp.ExecDetailsV2; pbDetails != nil {
// Take values in `ExecDetailsV2` first.
if timeDetail := pbDetails.TimeDetail; timeDetail != nil {
resp.detail.WaitTime = time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond
resp.detail.ProcessTime = time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond
}
if scanDetailV2 := pbDetails.ScanDetailV2; scanDetailV2 != nil {
copDetail := &execdetails.CopDetails{
@ -1146,7 +1150,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
RocksdbBlockReadByte: scanDetailV2.RocksdbBlockReadByte,
}
resp.detail.CopDetail = copDetail
} else if scanDetail := pbDetails.ScanDetail; scanDetail != nil {
}
} else if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if timeDetail := pbDetails.TimeDetail; timeDetail != nil {
resp.detail.WaitTime = time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond
resp.detail.ProcessTime = time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond
}
if scanDetail := pbDetails.ScanDetail; scanDetail != nil {
if scanDetail.Write != nil {
resp.detail.CopDetail = &execdetails.CopDetails{
ProcessedKeys: scanDetail.Write.Processed,