From 897c45b0e0aed8dd28b7cf84f50892e182dfd5c1 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Wed, 17 Jan 2024 17:49:46 +0800 Subject: [PATCH] statistics/handle/usage: add a collector utility based on channel to collect stats from sessions (#50437) close pingcap/tidb#50416 --- .../handle/usage/collector/BUILD.bazel | 18 ++ .../handle/usage/collector/collector.go | 172 ++++++++++++++++++ .../handle/usage/collector/collector_test.go | 99 ++++++++++ 3 files changed, 289 insertions(+) create mode 100644 pkg/statistics/handle/usage/collector/BUILD.bazel create mode 100644 pkg/statistics/handle/usage/collector/collector.go create mode 100644 pkg/statistics/handle/usage/collector/collector_test.go diff --git a/pkg/statistics/handle/usage/collector/BUILD.bazel b/pkg/statistics/handle/usage/collector/BUILD.bazel new file mode 100644 index 0000000000..d6cf063009 --- /dev/null +++ b/pkg/statistics/handle/usage/collector/BUILD.bazel @@ -0,0 +1,18 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "collector", + srcs = ["collector.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/usage/collector", + visibility = ["//visibility:public"], +) + +go_test( + name = "collector_test", + timeout = "short", + srcs = ["collector_test.go"], + embed = [":collector"], + flaky = True, + shard_count = 3, + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/pkg/statistics/handle/usage/collector/collector.go b/pkg/statistics/handle/usage/collector/collector.go new file mode 100644 index 0000000000..8fa98c284a --- /dev/null +++ b/pkg/statistics/handle/usage/collector/collector.go @@ -0,0 +1,172 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "sync" + "time" +) + +const ( + defaultTimeout = 5 * time.Minute + defaultChannelSize = 10 +) + +// GlobalCollector provides a utility to collect stats data from each session +type GlobalCollector[T any] interface { + SpawnSession() SessionCollector[T] + Close() + StartWorker() +} + +var _ GlobalCollector[int] = &globalCollector[int]{} + +// globalCollector is an implementation of `GlobalCollector` +type globalCollector[T any] struct { + mergeFn func(T) + dataCh chan T + highPriorityDataCh chan T + closeCh chan struct{} + wg sync.WaitGroup + timeout time.Duration + + closeOnce sync.Once +} + +// NewGlobalCollector creates a new global collector +func NewGlobalCollector[T any](mergeFn func(T)) GlobalCollector[T] { + g := &globalCollector[T]{ + mergeFn: mergeFn, + // Now the timeout and channel size is not configurable for the simplicity. + // If there is a scenario in which tuning timeout and channel size is necessary, feel free to expand this + // constructor. + timeout: defaultTimeout, + dataCh: make(chan T, defaultChannelSize), + highPriorityDataCh: make(chan T, defaultChannelSize), + closeCh: make(chan struct{}), + } + return g +} + +// SpawnSession creates a related session collector from the global collector +func (g *globalCollector[T]) SpawnSession() SessionCollector[T] { + return &sessionCollector[T]{ + timeout: g.timeout, + dataCh: g.dataCh, + highPriorityDataCh: g.highPriorityDataCh, + lastUpdate: time.Now(), + } +} + +// StartWorker spawns a goroutine to merge the data +func (g *globalCollector[T]) StartWorker() { + g.wg.Add(1) + go func() { + defer g.wg.Done() + + loop: + for { + // nested selection to make sure `highPriorityDataCh` is selected before the normal `dataCh` + select { + case data := <-g.highPriorityDataCh: + g.mergeFn(data) + case <-g.closeCh: + break loop + default: + select { + case data := <-g.dataCh: + g.mergeFn(data) + case data := <-g.highPriorityDataCh: + g.mergeFn(data) + case <-g.closeCh: + break loop + } + } + } + + // drain out the data from channel + g.flush() + }() +} + +// flush reads all data from the channel, until the channel is empty +func (g *globalCollector[T]) flush() { + for { + select { + case data := <-g.highPriorityDataCh: + g.mergeFn(data) + case data := <-g.dataCh: + g.mergeFn(data) + default: + return + } + } +} + +// Close closes the background worker of the global collector +func (g *globalCollector[T]) Close() { + g.closeOnce.Do(func() { + close(g.closeCh) + g.wg.Wait() + }) +} + +// SessionCollector is an interface to send stats data to the global collector +type SessionCollector[T any] interface { + // SendDelta sends the data to the global collector. This function will not block (unless the `timeout` reached). It + // returns a bool to represent whether the data has been sent successfully. + SendDelta(data T) bool + // SendDeltaSync sends the data to the global collector. Unlike `SendDelta`, this function will always block and + // wait until the data has been received by the global collector. + SendDeltaSync(data T) bool +} + +var _ SessionCollector[int] = &sessionCollector[int]{} + +// sessionCollector is the collector attached to each session to send the data to global collector +type sessionCollector[T any] struct { + lastUpdate time.Time + dataCh chan<- T + highPriorityDataCh chan<- T + closeCh <-chan struct{} + timeout time.Duration +} + +// SendDelta implements `SessionCollector[T]` interface +func (s *sessionCollector[T]) SendDelta(data T) bool { + if time.Since(s.lastUpdate) > s.timeout { + return s.SendDeltaSync(data) + } + + // don't block on the channel + select { + case s.dataCh <- data: + s.lastUpdate = time.Now() + return true + default: + return false + } +} + +// SendDeltaSync implements `SessionCollector[T]` interface +func (s *sessionCollector[T]) SendDeltaSync(data T) bool { + select { + case s.highPriorityDataCh <- data: + s.lastUpdate = time.Now() + return true + case <-s.closeCh: + return false + } +} diff --git a/pkg/statistics/handle/usage/collector/collector_test.go b/pkg/statistics/handle/usage/collector/collector_test.go new file mode 100644 index 0000000000..e7118e326e --- /dev/null +++ b/pkg/statistics/handle/usage/collector/collector_test.go @@ -0,0 +1,99 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSessionSendDelta(t *testing.T) { + num := 0 + mergeFn := func(delta int) { + num += delta + } + + g := NewGlobalCollector(mergeFn) + g.StartWorker() + s := g.SpawnSession() + expect := 0 + for i := 0; i < 256; i++ { + if s.SendDelta(1) { + expect += 1 + } + } + + g.Close() + require.Equal(t, expect, num) +} + +func TestSessionParallelSendDelta(t *testing.T) { + num := 0 + var expect atomic.Int64 + mergeFn := func(delta int) { + num += delta + } + + g := NewGlobalCollector(mergeFn) + g.StartWorker() + sessionCount := 256 + var wg sync.WaitGroup + for i := 0; i < sessionCount; i++ { + s := g.SpawnSession() + wg.Add(1) + go func() { + for i := 0; i < 256; i++ { + if s.SendDelta(1) { + expect.Add(1) + } + } + wg.Done() + }() + } + + wg.Wait() + g.Close() + require.Equal(t, expect.Load(), int64(num)) +} + +func TestSessionParallelSendDeltaSync(t *testing.T) { + num := 0 + mergeFn := func(delta int) { + num += delta + } + + g := NewGlobalCollector(mergeFn) + g.StartWorker() + sessionCount := 256 + var wg sync.WaitGroup + + for i := 0; i < sessionCount; i++ { + wg.Add(1) + s := g.SpawnSession() + go func() { + for i := 0; i < 256; i++ { + s.SendDeltaSync(1) + } + wg.Done() + }() + } + wg.Wait() + + g.Close() + require.Equal(t, sessionCount*256, num) +}