From 07ff41d0c43ec18fc4bbc16b59f7473fedbf701d Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Wed, 25 Nov 2020 11:06:55 +0800 Subject: [PATCH] coprocessor: Update KvProto (#20966) Signed-off-by: Breezewish --- go.mod | 2 +- go.sum | 5 ++ .../unistore/cophandler/cop_handler.go | 13 ++-- store/tikv/batch_coprocessor.go | 4 +- store/tikv/coprocessor.go | 74 +++++++++++-------- 5 files changed, 58 insertions(+), 40 deletions(-) diff --git a/go.mod b/go.mod index 02ad52373a..1f165f9e24 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb + github.com/pingcap/kvproto v0.0.0-20201120081251-756b1447ba12 github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 github.com/pingcap/parser v0.0.0-20201123080035-8f4c6ab94e11 github.com/pingcap/sysutil v0.0.0-20201021075216-f93ced2829e2 diff --git a/go.sum b/go.sum index 3df09bc124..6f1b49086e 100644 --- a/go.sum +++ b/go.sum @@ -614,6 +614,7 @@ github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/otiai10/copy v1.2.0 h1:HvG945u96iNadPoG2/Ja2+AUJeW5YuFQMixq9yirC+k= github.com/otiai10/copy v1.2.0/go.mod h1:rrF5dJ5F0t/EWSYODDu4j9/vEeYHMkc8jt0zJChqQWw= github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE= github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs= @@ -680,6 +681,8 @@ github.com/pingcap/kvproto v0.0.0-20200420075417-e0c6e8842f22/go.mod h1:IOdRDPLy github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb h1:K3r4KjVQeD4nLnfj44ibdLIXnUh58aQpkgVNWuBO9z0= github.com/pingcap/kvproto v0.0.0-20201113092725-08f2872278eb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20201120081251-756b1447ba12 h1:f33y/pngBI525jqytoSZevpmmq43XiIHoeElx3BppNQ= +github.com/pingcap/kvproto v0.0.0-20201120081251-756b1447ba12/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -709,6 +712,7 @@ github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNx github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20201026044621-45e60c77588f h1:J+0TAI+7Hvebz4bM4GnRCRT4MpjYnUxbyi9ky5ZQUsU= github.com/pingcap/tipb v0.0.0-20201026044621-45e60c77588f/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tiup v1.2.3 h1:8OCQF7sHhT6VqE8pZU1JTSogPA90OFuWWM/B746x0YY= github.com/pingcap/tiup v1.2.3/go.mod h1:q8WzflNHjE1U49k2qstTL0clx2pKh8pkOzUFV4RTvQo= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -752,6 +756,7 @@ github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLk github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/prom2json v1.3.0/go.mod h1:rMN7m0ApCowcoDlypBHlkNbp5eJQf/+1isKykIP5ZnM= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/r3labs/diff v0.0.0-20200627101315-aecd9dd05dd2 h1:786HUIrynbbk5PzUf9Rp3aAUkNRksUiiipSAlyJ68As= github.com/r3labs/diff v0.0.0-20200627101315-aecd9dd05dd2/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig= github.com/rakyll/statik v0.1.6/go.mod h1:OEi9wJV/fMUAGx1eNjq75DKDsJVuEv1U0oYdX6GX8Zs= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index d34ba9f35e..7f63a8734d 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -77,11 +77,11 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt resp.CanBeCached = true resp.CacheLastVersion = uint64(cacheVersion.(int)) if resp.ExecDetails == nil { - resp.ExecDetails = &kvrpcpb.ExecDetails{HandleTime: &kvrpcpb.HandleTime{ProcessMs: 500}} - } else if resp.ExecDetails.HandleTime == nil { - resp.ExecDetails.HandleTime = &kvrpcpb.HandleTime{ProcessMs: 500} + resp.ExecDetails = &kvrpcpb.ExecDetails{TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: 500}} + } else if resp.ExecDetails.TimeDetail == nil { + resp.ExecDetails.TimeDetail = &kvrpcpb.TimeDetail{ProcessWallTimeMs: 500} } else { - resp.ExecDetails.HandleTime.ProcessMs = 500 + resp.ExecDetails.TimeDetail.ProcessWallTimeMs = 500 } }() } @@ -352,7 +352,10 @@ func buildResp(chunks []tipb.Chunk, closureExecutor *closureExecutor, dagReq *ti } } resp.ExecDetails = &kvrpcpb.ExecDetails{ - HandleTime: &kvrpcpb.HandleTime{ProcessMs: int64(dur / time.Millisecond)}, + TimeDetail: &kvrpcpb.TimeDetail{ProcessWallTimeMs: int64(dur / time.Millisecond)}, + } + resp.ExecDetailsV2 = &kvrpcpb.ExecDetailsV2{ + TimeDetail: resp.ExecDetails.TimeDetail, } data, err := proto.Marshal(selResp) if err != nil { diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index a716ae3816..c3dfe72ccd 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -345,8 +345,8 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta IsolationLevel: pbIsolationLevel(b.req.IsolationLevel), Priority: kvPriorityToCommandPri(b.req.Priority), NotFillCache: b.req.NotFillCache, - HandleTime: true, - ScanDetail: true, + RecordTimeStat: true, + RecordScanStat: true, TaskId: b.req.TaskID, }) req.StoreTp = kv.TiFlash diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index fcb0f85c64..702f4198e8 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -848,8 +848,8 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel), Priority: kvPriorityToCommandPri(worker.req.Priority), NotFillCache: worker.req.NotFillCache, - HandleTime: true, - ScanDetail: true, + RecordTimeStat: true, + RecordScanStat: true, TaskId: worker.req.TaskID, }) req.StoreTp = task.storeType @@ -974,14 +974,17 @@ func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *co backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.types), " ", ",", -1) logStr += fmt.Sprintf(" backoff_ms:%d backoff_types:%s", bo.totalSleep, backoffTypes) } + var detailV2 *kvrpcpb.ExecDetailsV2 var detail *kvrpcpb.ExecDetails if resp.Resp != nil { switch r := resp.Resp.(type) { case *coprocessor.Response: + detailV2 = r.ExecDetailsV2 detail = r.ExecDetails case *tikvrpc.CopStreamResponse: // streaming request returns io.EOF, so the first CopStreamResponse.Response maybe nil. if r.Response != nil { + detailV2 = r.Response.ExecDetailsV2 detail = r.Response.ExecDetails } default: @@ -989,33 +992,33 @@ func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *co } } - if detail != nil && detail.HandleTime != nil { - processMs := detail.HandleTime.ProcessMs - waitMs := detail.HandleTime.WaitMs - if processMs > minLogKVProcessTime { - logStr += fmt.Sprintf(" kv_process_ms:%d", processMs) - if detail.ScanDetail != nil { - logStr = appendScanDetail(logStr, "write", detail.ScanDetail.Write) - logStr = appendScanDetail(logStr, "data", detail.ScanDetail.Data) - logStr = appendScanDetail(logStr, "lock", detail.ScanDetail.Lock) - } - if detail.ScanDetailV2 != nil { - logStr += fmt.Sprintf(" processed versions: %d", detail.ScanDetailV2.ProcessedVersions) - logStr += fmt.Sprintf(" total versions: %d", detail.ScanDetailV2.TotalVersions) - logStr += fmt.Sprintf(" delete skipped count: %d", detail.ScanDetailV2.RocksdbDeleteSkippedCount) - logStr += fmt.Sprintf(" key skipped count: %d", detail.ScanDetailV2.RocksdbKeySkippedCount) - logStr += fmt.Sprintf(" cache hit count: %d", detail.ScanDetailV2.RocksdbBlockCacheHitCount) - logStr += fmt.Sprintf(" read count: %d", detail.ScanDetailV2.RocksdbBlockReadCount) - logStr += fmt.Sprintf(" read byte: %d", detail.ScanDetailV2.RocksdbBlockReadByte) - } - } - if waitMs > minLogKVWaitTime { - logStr += fmt.Sprintf(" kv_wait_ms:%d", waitMs) - if processMs <= minLogKVProcessTime { - logStr = strings.Replace(logStr, "TIME_COP_PROCESS", "TIME_COP_WAIT", 1) - } + var timeDetail *kvrpcpb.TimeDetail + if detailV2 != nil && detailV2.TimeDetail != nil { + timeDetail = detailV2.TimeDetail + } else if detail != nil && detail.TimeDetail != nil { + timeDetail = detail.TimeDetail + } + if timeDetail != nil { + logStr += fmt.Sprintf(" kv_process_ms:%d", timeDetail.ProcessWallTimeMs) + logStr += fmt.Sprintf(" kv_wait_ms:%d", timeDetail.WaitWallTimeMs) + if timeDetail.ProcessWallTimeMs <= minLogKVProcessTime { + logStr = strings.Replace(logStr, "TIME_COP_PROCESS", "TIME_COP_WAIT", 1) } } + + if detailV2 != nil && detailV2.ScanDetailV2 != nil { + logStr += fmt.Sprintf(" processed_versions:%d", detailV2.ScanDetailV2.ProcessedVersions) + logStr += fmt.Sprintf(" total_versions:%d", detailV2.ScanDetailV2.TotalVersions) + logStr += fmt.Sprintf(" rocksdb_delete_skipped_count:%d", detailV2.ScanDetailV2.RocksdbDeleteSkippedCount) + logStr += fmt.Sprintf(" rocksdb_key_skipped_count:%d", detailV2.ScanDetailV2.RocksdbKeySkippedCount) + logStr += fmt.Sprintf(" rocksdb_cache_hit_count:%d", detailV2.ScanDetailV2.RocksdbBlockCacheHitCount) + logStr += fmt.Sprintf(" rocksdb_read_count:%d", detailV2.ScanDetailV2.RocksdbBlockReadCount) + logStr += fmt.Sprintf(" rocksdb_read_byte:%d", detailV2.ScanDetailV2.RocksdbBlockReadByte) + } else if detail != nil && detail.ScanDetail != nil { + logStr = appendScanDetail(logStr, "write", detail.ScanDetail.Write) + logStr = appendScanDetail(logStr, "data", detail.ScanDetail.Data) + logStr = appendScanDetail(logStr, "lock", detail.ScanDetail.Lock) + } logutil.Logger(bo.ctx).Info(logStr) } @@ -1130,10 +1133,11 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon resp.detail.CalleeAddress = rpcCtx.Addr } resp.respTime = costTime - if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil { - if handleTime := pbDetails.HandleTime; handleTime != nil { - resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond - resp.detail.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond + if pbDetails := resp.pbResp.ExecDetailsV2; pbDetails != nil { + // Take values in `ExecDetailsV2` first. + if timeDetail := pbDetails.TimeDetail; timeDetail != nil { + resp.detail.WaitTime = time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond + resp.detail.ProcessTime = time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond } if scanDetailV2 := pbDetails.ScanDetailV2; scanDetailV2 != nil { copDetail := &execdetails.CopDetails{ @@ -1146,7 +1150,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon RocksdbBlockReadByte: scanDetailV2.RocksdbBlockReadByte, } resp.detail.CopDetail = copDetail - } else if scanDetail := pbDetails.ScanDetail; scanDetail != nil { + } + } else if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil { + if timeDetail := pbDetails.TimeDetail; timeDetail != nil { + resp.detail.WaitTime = time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond + resp.detail.ProcessTime = time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond + } + if scanDetail := pbDetails.ScanDetail; scanDetail != nil { if scanDetail.Write != nil { resp.detail.CopDetail = &execdetails.CopDetails{ ProcessedKeys: scanDetail.Write.Processed,