@ -367,25 +367,29 @@ func (s *StmtSummary) rotateLoop() {
|
||||
case <-tick.C:
|
||||
now := timeNow()
|
||||
s.windowLock.Lock()
|
||||
w := s.window
|
||||
// The current window has expired and needs to be refreshed and persisted.
|
||||
if now.After(w.begin.Add(time.Duration(s.RefreshInterval()) * time.Second)) {
|
||||
s.window = newStmtWindow(now, uint(s.MaxStmtCount()))
|
||||
size := w.lru.Size()
|
||||
if size > 0 {
|
||||
// Persist window asynchronously.
|
||||
s.closeWg.Add(1)
|
||||
go func() {
|
||||
defer s.closeWg.Done()
|
||||
s.storage.persist(w, now)
|
||||
}()
|
||||
}
|
||||
if now.After(s.window.begin.Add(time.Duration(s.RefreshInterval()) * time.Second)) {
|
||||
s.rotate(now)
|
||||
}
|
||||
s.windowLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StmtSummary) rotate(now time.Time) {
|
||||
w := s.window
|
||||
s.window = newStmtWindow(now, uint(s.MaxStmtCount()))
|
||||
size := w.lru.Size()
|
||||
if size > 0 {
|
||||
// Persist window asynchronously.
|
||||
s.closeWg.Add(1)
|
||||
go func() {
|
||||
defer s.closeWg.Done()
|
||||
s.storage.persist(w, now)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// stmtWindow represents a single statistical window, which has a begin
|
||||
// time and an end time. Data within a single window is eliminated
|
||||
// according to the LRU strategy. All evicted data will be aggregated
|
||||
|
||||
@ -15,9 +15,7 @@
|
||||
package stmtsummary
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -47,7 +45,6 @@ func TestStmtSummary(t *testing.T) {
|
||||
ss := NewStmtSummary4Test(3)
|
||||
defer ss.Close()
|
||||
|
||||
ss.storage = &waitableMockStmtStorage{mockStmtStorage: ss.storage.(*mockStmtStorage)}
|
||||
w := ss.window
|
||||
ss.Add(GenerateStmtExecInfo4Test("digest1"))
|
||||
ss.Add(GenerateStmtExecInfo4Test("digest2"))
|
||||
@ -57,14 +54,8 @@ func TestStmtSummary(t *testing.T) {
|
||||
require.Equal(t, 3, w.lru.Size())
|
||||
require.Equal(t, 2, w.evicted.count())
|
||||
|
||||
ss.storage.(*waitableMockStmtStorage).Add(1)
|
||||
newEnd := w.begin.Add(time.Duration(ss.RefreshInterval()+1) * time.Second)
|
||||
timeNow = func() time.Time {
|
||||
return newEnd
|
||||
}
|
||||
ss.storage.(*waitableMockStmtStorage).Wait()
|
||||
ss.rotate(timeNow())
|
||||
|
||||
timeNow = time.Now
|
||||
ss.Add(GenerateStmtExecInfo4Test("digest6"))
|
||||
ss.Add(GenerateStmtExecInfo4Test("digest7"))
|
||||
w = ss.window
|
||||
@ -74,13 +65,3 @@ func TestStmtSummary(t *testing.T) {
|
||||
ss.Clear()
|
||||
require.Equal(t, 0, w.lru.Size())
|
||||
}
|
||||
|
||||
type waitableMockStmtStorage struct {
|
||||
sync.WaitGroup
|
||||
*mockStmtStorage
|
||||
}
|
||||
|
||||
func (s *waitableMockStmtStorage) persist(w *stmtWindow, end time.Time) {
|
||||
defer s.Done()
|
||||
s.mockStmtStorage.persist(w, end)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user