*: enable linter for executor/aggregate.go (#37015)
This commit is contained in:
@ -154,6 +154,7 @@
|
||||
".*_generated\\.go$": "ignore generated code"
|
||||
},
|
||||
"only_files": {
|
||||
"executor/aggregate.go": "executor/aggregate.go",
|
||||
"types/json/binary_functions.go": "types/json/binary_functions.go",
|
||||
"types/json/binary_test.go": "types/json/binary_test.go",
|
||||
"ddl/backfilling.go": "ddl/backfilling.go",
|
||||
@ -289,6 +290,7 @@
|
||||
".*_generated\\.go$": "ignore generated code"
|
||||
},
|
||||
"only_files": {
|
||||
"executor/aggregate.go": "executor/aggregate.go",
|
||||
"types/json/binary_functions.go": "types/json/binary_functions.go",
|
||||
"types/json/binary_test.go": "types/json/binary_test.go",
|
||||
"ddl/backfilling.go": "ddl/backfilling.go",
|
||||
@ -649,6 +651,7 @@
|
||||
".*_generated\\.go$": "ignore generated code"
|
||||
},
|
||||
"only_files": {
|
||||
"executor/aggregate.go": "executor/aggregate.go",
|
||||
"types/json/binary_functions.go": "types/json/binary_functions.go",
|
||||
"types/json/binary_test.go": "types/json/binary_test.go",
|
||||
"ddl/": "enable to ddl",
|
||||
|
||||
@ -154,6 +154,7 @@ go_library(
|
||||
"//util/admin",
|
||||
"//util/bitmap",
|
||||
"//util/breakpoint",
|
||||
"//util/channel",
|
||||
"//util/chunk",
|
||||
"//util/codec",
|
||||
"//util/collate",
|
||||
|
||||
@ -33,6 +33,7 @@ import (
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/types/json"
|
||||
"github.com/pingcap/tidb/util/channel"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
"github.com/pingcap/tidb/util/disk"
|
||||
@ -231,7 +232,7 @@ type HashAggIntermData struct {
|
||||
}
|
||||
|
||||
// getPartialResultBatch fetches a batch of partial results from HashAggIntermData.
|
||||
func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
|
||||
func (d *HashAggIntermData) getPartialResultBatch(_ *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, _ []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
|
||||
keyStart := d.cursor
|
||||
for ; d.cursor < len(d.groupKeys) && len(prs) < maxChunkSize; d.cursor++ {
|
||||
prs = append(prs, d.partialResultMap[d.groupKeys[d.cursor]])
|
||||
@ -275,15 +276,12 @@ func (e *HashAggExec) Close() error {
|
||||
}
|
||||
close(e.finishCh)
|
||||
for _, ch := range e.partialOutputChs {
|
||||
for range ch {
|
||||
}
|
||||
channel.Clear(ch)
|
||||
}
|
||||
for _, ch := range e.partialInputChs {
|
||||
for range ch {
|
||||
}
|
||||
}
|
||||
for range e.finalOutputCh {
|
||||
channel.Clear(ch)
|
||||
}
|
||||
channel.Clear(e.finalOutputCh)
|
||||
e.executed = false
|
||||
if e.memTracker != nil {
|
||||
e.memTracker.ReplaceBytesUsed(0)
|
||||
@ -295,7 +293,7 @@ func (e *HashAggExec) Close() error {
|
||||
// Open implements the Executor Open interface.
|
||||
func (e *HashAggExec) Open(ctx context.Context) error {
|
||||
failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
if val, _ := val.(bool); val {
|
||||
failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error"))
|
||||
}
|
||||
})
|
||||
@ -352,7 +350,7 @@ func closeBaseExecutor(b *baseExecutor) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
|
||||
func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
|
||||
sessionVars := e.ctx.GetSessionVars()
|
||||
finalConcurrency := sessionVars.HashAggFinalConcurrency()
|
||||
partialConcurrency := sessionVars.HashAggPartialConcurrency()
|
||||
@ -486,7 +484,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG
|
||||
}
|
||||
if w.stats != nil {
|
||||
w.stats.ExecTime += int64(time.Since(execStart))
|
||||
w.stats.TaskNum += 1
|
||||
w.stats.TaskNum++
|
||||
}
|
||||
// The intermData can be promised to be not empty if reaching here,
|
||||
// so we set needShuffle to be true.
|
||||
@ -503,7 +501,7 @@ func getGroupKeyMemUsage(groupKey [][]byte) int64 {
|
||||
return mem
|
||||
}
|
||||
|
||||
func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) {
|
||||
func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, _ int) (err error) {
|
||||
memSize := getGroupKeyMemUsage(w.groupKey)
|
||||
w.groupKey, err = getGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
|
||||
failpoint.Inject("ConsumeRandomPanic", nil)
|
||||
@ -532,7 +530,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s
|
||||
|
||||
// shuffleIntermData shuffles the intermediate data of partial workers to corresponded final workers.
|
||||
// We only support parallel execution for single-machine, so process of encode and decode can be skipped.
|
||||
func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) {
|
||||
func (w *HashAggPartialWorker) shuffleIntermData(_ *stmtctx.StatementContext, finalConcurrency int) {
|
||||
groupKeysSlice := make([][]string, finalConcurrency)
|
||||
for groupKey := range w.partialResultsMap {
|
||||
finalWorkerIdx := int(murmur3.Sum32([]byte(groupKey))) % finalConcurrency
|
||||
@ -605,7 +603,7 @@ func getGroupKey(ctx sessionctx.Context, input *chunk.Chunk, groupKey [][]byte,
|
||||
return groupKey, nil
|
||||
}
|
||||
|
||||
func (w *baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
|
||||
func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
|
||||
n := len(groupKey)
|
||||
partialResults := make([][]aggfuncs.PartialResult, n)
|
||||
allMemDelta := int64(0)
|
||||
@ -706,7 +704,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err err
|
||||
}
|
||||
if w.stats != nil {
|
||||
w.stats.ExecTime += int64(time.Since(execStart))
|
||||
w.stats.TaskNum += 1
|
||||
w.stats.TaskNum++
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -906,7 +904,7 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
|
||||
}
|
||||
|
||||
failpoint.Inject("parallelHashAggError", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
if val, _ := val.(bool); val {
|
||||
failpoint.Return(errors.New("HashAggExec.parallelExec error"))
|
||||
}
|
||||
})
|
||||
@ -1011,7 +1009,7 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
failpoint.Inject("unparallelHashAggError", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
if val, _ := val.(bool); val {
|
||||
failpoint.Return(errors.New("HashAggExec.unparallelExec error"))
|
||||
}
|
||||
})
|
||||
@ -1170,7 +1168,7 @@ func (w *AggWorkerStat) Clone() *AggWorkerStat {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) {
|
||||
func (*HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) {
|
||||
var totalTime, totalWait, totalExec, totalTaskNum int64
|
||||
for _, w := range workerStats {
|
||||
totalTime += w.WorkerTime
|
||||
@ -1231,7 +1229,7 @@ func (e *HashAggRuntimeStats) Merge(other execdetails.RuntimeStats) {
|
||||
}
|
||||
|
||||
// Tp implements the RuntimeStats interface.
|
||||
func (e *HashAggRuntimeStats) Tp() int {
|
||||
func (*HashAggRuntimeStats) Tp() int {
|
||||
return execdetails.TpHashAggRuntimeStat
|
||||
}
|
||||
|
||||
@ -1263,7 +1261,7 @@ type StreamAggExec struct {
|
||||
// Open implements the Executor Open interface.
|
||||
func (e *StreamAggExec) Open(ctx context.Context) error {
|
||||
failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
if val, _ := val.(bool); val {
|
||||
failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error"))
|
||||
}
|
||||
})
|
||||
@ -1950,9 +1948,9 @@ func (a *AggSpillDiskAction) Action(t *memory.Tracker) {
|
||||
}
|
||||
|
||||
// GetPriority get the priority of the Action
|
||||
func (a *AggSpillDiskAction) GetPriority() int64 {
|
||||
func (*AggSpillDiskAction) GetPriority() int64 {
|
||||
return memory.DefSpillPriority
|
||||
}
|
||||
|
||||
// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
|
||||
func (a *AggSpillDiskAction) SetLogHook(hook func(uint64)) {}
|
||||
func (*AggSpillDiskAction) SetLogHook(_ func(uint64)) {}
|
||||
|
||||
8
util/channel/BUILD.bazel
Normal file
8
util/channel/BUILD.bazel
Normal file
@ -0,0 +1,8 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "channel",
|
||||
srcs = ["channel.go"],
|
||||
importpath = "github.com/pingcap/tidb/util/channel",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
22
util/channel/channel.go
Normal file
22
util/channel/channel.go
Normal file
@ -0,0 +1,22 @@
|
||||
// Copyright 2022 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package channel
|
||||
|
||||
// Clear is to clear the channel
|
||||
func Clear[T any](ch chan T) {
|
||||
//nolint:revive,all_revive
|
||||
for range ch {
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user