434 lines
13 KiB
Go
434 lines
13 KiB
Go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package logutil
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/errors"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/kvproto/pkg/import_sstpb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/metric"
|
|
"github.com/pingcap/tidb/pkg/util/redact"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
)
|
|
|
|
// AbbreviatedArrayMarshaler abbreviates an array of elements.
|
|
type AbbreviatedArrayMarshaler []string
|
|
|
|
// MarshalLogArray implements zapcore.ArrayMarshaler.
|
|
func (abb AbbreviatedArrayMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
|
|
if len(abb) <= 4 {
|
|
for _, e := range abb {
|
|
encoder.AppendString(e)
|
|
}
|
|
} else {
|
|
total := len(abb)
|
|
encoder.AppendString(abb[0])
|
|
encoder.AppendString(fmt.Sprintf("(skip %d)", total-2))
|
|
encoder.AppendString(abb[total-1])
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AbbreviatedArray constructs a field that abbreviates an array of elements.
|
|
func AbbreviatedArray(
|
|
key string, elements any, marshalFunc func(any) []string,
|
|
) zap.Field {
|
|
return zap.Array(key, AbbreviatedArrayMarshaler(marshalFunc(elements)))
|
|
}
|
|
|
|
type zapFileMarshaler struct{ *backuppb.File }
|
|
|
|
func (file zapFileMarshaler) MarshalLogObject(enc zapcore.ObjectEncoder) error {
|
|
enc.AddString("name", file.GetName())
|
|
enc.AddString("CF", file.GetCf())
|
|
enc.AddString("sha256", hex.EncodeToString(file.GetSha256()))
|
|
enc.AddString("startKey", redact.Key(file.GetStartKey()))
|
|
enc.AddString("endKey", redact.Key(file.GetEndKey()))
|
|
enc.AddUint64("startVersion", file.GetStartVersion())
|
|
enc.AddUint64("endVersion", file.GetEndVersion())
|
|
enc.AddUint64("totalKvs", file.GetTotalKvs())
|
|
enc.AddUint64("totalBytes", file.GetTotalBytes())
|
|
enc.AddUint64("CRC64Xor", file.GetCrc64Xor())
|
|
return nil
|
|
}
|
|
|
|
func AbbreviatedStringers[T fmt.Stringer](key string, stringers []T) zap.Field {
|
|
if len(stringers) < 4 {
|
|
return zap.Stringers(key, stringers)
|
|
}
|
|
return zap.Array(key, zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
|
|
ae.AppendString(stringers[0].String())
|
|
ae.AppendString(fmt.Sprintf("(skip %d)", len(stringers)-2))
|
|
ae.AppendString(stringers[len(stringers)-1].String())
|
|
return nil
|
|
}))
|
|
}
|
|
|
|
type zapFilesMarshaler []*backuppb.File
|
|
|
|
// MarshalLogObjectForFiles is an internal util function to zap something having `Files` field.
|
|
func MarshalLogObjectForFiles(files []*backuppb.File, encoder zapcore.ObjectEncoder) error {
|
|
return zapFilesMarshaler(files).MarshalLogObject(encoder)
|
|
}
|
|
|
|
func (fs zapFilesMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
|
|
total := len(fs)
|
|
encoder.AddInt("total", total)
|
|
elements := make([]string, 0, total)
|
|
for _, f := range fs {
|
|
elements = append(elements, f.GetName())
|
|
}
|
|
_ = encoder.AddArray("files", AbbreviatedArrayMarshaler(elements))
|
|
|
|
totalKVs := uint64(0)
|
|
totalBytes := uint64(0)
|
|
totalSize := uint64(0)
|
|
for _, file := range fs {
|
|
totalKVs += file.GetTotalKvs()
|
|
totalBytes += file.GetTotalBytes()
|
|
totalSize += file.GetSize_()
|
|
}
|
|
encoder.AddUint64("totalKVs", totalKVs)
|
|
encoder.AddUint64("totalBytes", totalBytes)
|
|
encoder.AddUint64("totalSize", totalSize)
|
|
return nil
|
|
}
|
|
|
|
// File make the zap fields for a file.
|
|
func File(file *backuppb.File) zap.Field {
|
|
return zap.Object("file", zapFileMarshaler{file})
|
|
}
|
|
|
|
// Files make the zap field for a set of file.
|
|
func Files(fs []*backuppb.File) zap.Field {
|
|
return zap.Object("files", zapFilesMarshaler(fs))
|
|
}
|
|
|
|
type zapStreamBackupTaskInfo struct{ *backuppb.StreamBackupTaskInfo }
|
|
|
|
func (t zapStreamBackupTaskInfo) MarshalLogObject(enc zapcore.ObjectEncoder) error {
|
|
enc.AddString("taskName", t.Name)
|
|
enc.AddUint64("startTs", t.StartTs)
|
|
enc.AddUint64("endTS", t.EndTs)
|
|
enc.AddString("tableFilter", strings.Join(t.TableFilter, ","))
|
|
return nil
|
|
}
|
|
|
|
// StreamBackupTaskInfo makes the zap fields for a stream backup task info.
|
|
func StreamBackupTaskInfo(t *backuppb.StreamBackupTaskInfo) zap.Field {
|
|
return zap.Object("streamTaskInfo", zapStreamBackupTaskInfo{t})
|
|
}
|
|
|
|
type zapRewriteRuleMarshaler struct{ *import_sstpb.RewriteRule }
|
|
|
|
func (rewriteRule zapRewriteRuleMarshaler) MarshalLogObject(enc zapcore.ObjectEncoder) error {
|
|
enc.AddString("oldKeyPrefix", hex.EncodeToString(rewriteRule.GetOldKeyPrefix()))
|
|
enc.AddString("newKeyPrefix", hex.EncodeToString(rewriteRule.GetNewKeyPrefix()))
|
|
enc.AddUint64("newTimestamp", rewriteRule.GetNewTimestamp())
|
|
return nil
|
|
}
|
|
|
|
// RewriteRule make the zap fields for a rewrite rule.
|
|
func RewriteRule(rewriteRule *import_sstpb.RewriteRule) zap.Field {
|
|
return zap.Object("rewriteRule", zapRewriteRuleMarshaler{rewriteRule})
|
|
}
|
|
|
|
// RewriteRuleObject make zap object marshaler for a rewrite rule.
|
|
func RewriteRuleObject(rewriteRule *import_sstpb.RewriteRule) zapcore.ObjectMarshaler {
|
|
return zapRewriteRuleMarshaler{rewriteRule}
|
|
}
|
|
|
|
type zapMarshalRegionMarshaler struct{ *metapb.Region }
|
|
|
|
func (region zapMarshalRegionMarshaler) MarshalLogObject(enc zapcore.ObjectEncoder) error {
|
|
peers := make([]string, 0, len(region.GetPeers()))
|
|
for _, peer := range region.GetPeers() {
|
|
peers = append(peers, peer.String())
|
|
}
|
|
enc.AddUint64("ID", region.GetId())
|
|
enc.AddString("startKey", redact.Key(region.GetStartKey()))
|
|
enc.AddString("endKey", redact.Key(region.GetEndKey()))
|
|
enc.AddString("epoch", region.GetRegionEpoch().String())
|
|
enc.AddString("peers", strings.Join(peers, ","))
|
|
return nil
|
|
}
|
|
|
|
// Region make the zap fields for a region.
|
|
func Region(region *metapb.Region) zap.Field {
|
|
return zap.Object("region", zapMarshalRegionMarshaler{region})
|
|
}
|
|
|
|
// RegionBy make the zap fields for a region with name.
|
|
func RegionBy(key string, region *metapb.Region) zap.Field {
|
|
return zap.Object(key, zapMarshalRegionMarshaler{region})
|
|
}
|
|
|
|
// Leader make the zap fields for a peer as leader.
|
|
// nolint:interfacer
|
|
func Leader(peer *metapb.Peer) zap.Field {
|
|
return zap.String("leader", peer.String())
|
|
}
|
|
|
|
// Peer make the zap fields for a peer.
|
|
func Peer(peer *metapb.Peer) zap.Field {
|
|
return zap.String("peer", peer.String())
|
|
}
|
|
|
|
type zapSSTMetaMarshaler struct{ *import_sstpb.SSTMeta }
|
|
|
|
func (sstMeta zapSSTMetaMarshaler) MarshalLogObject(enc zapcore.ObjectEncoder) error {
|
|
enc.AddString("CF", sstMeta.GetCfName())
|
|
enc.AddBool("endKeyExclusive", sstMeta.EndKeyExclusive)
|
|
enc.AddUint32("CRC32", sstMeta.Crc32)
|
|
enc.AddUint64("length", sstMeta.Length)
|
|
enc.AddUint64("regionID", sstMeta.RegionId)
|
|
enc.AddString("regionEpoch", sstMeta.RegionEpoch.String())
|
|
enc.AddString("startKey", redact.Key(sstMeta.GetRange().GetStart()))
|
|
enc.AddString("endKey", redact.Key(sstMeta.GetRange().GetEnd()))
|
|
|
|
sstUUID, err := uuid.FromBytes(sstMeta.GetUuid())
|
|
if err != nil {
|
|
enc.AddString("UUID", fmt.Sprintf("invalid UUID %s", hex.EncodeToString(sstMeta.GetUuid())))
|
|
} else {
|
|
enc.AddString("UUID", sstUUID.String())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SSTMeta make the zap fields for a SST meta.
|
|
func SSTMeta(sstMeta *import_sstpb.SSTMeta) zap.Field {
|
|
return zap.Object("sstMeta", zapSSTMetaMarshaler{sstMeta})
|
|
}
|
|
|
|
type zapSSTMetasMarshaler []*import_sstpb.SSTMeta
|
|
|
|
func (m zapSSTMetasMarshaler) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
|
|
for _, meta := range m {
|
|
if err := encoder.AppendObject(zapSSTMetaMarshaler{meta}); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Describes the overall range of the SST metas and their size.
|
|
func BriefSSTMetas(key string, sstMetas []*import_sstpb.SSTMeta) zap.Field {
|
|
var (
|
|
startKey, endKey []byte
|
|
total int
|
|
totalSize uint64
|
|
totalKv uint64
|
|
totalKvSize uint64
|
|
)
|
|
|
|
for _, meta := range sstMetas {
|
|
if total == 0 {
|
|
startKey = meta.GetRange().GetStart()
|
|
endKey = meta.GetRange().GetEnd()
|
|
}
|
|
if bytes.Compare(meta.GetRange().GetStart(), startKey) < 0 {
|
|
startKey = meta.GetRange().GetStart()
|
|
}
|
|
// NOTE: SST meta shouldn't has an empty end key?
|
|
if bytes.Compare(meta.GetRange().GetEnd(), endKey) > 0 {
|
|
endKey = meta.GetRange().GetEnd()
|
|
}
|
|
totalSize += meta.GetLength()
|
|
totalKv += meta.GetTotalKvs()
|
|
totalKvSize += meta.GetTotalBytes()
|
|
total++
|
|
}
|
|
return zap.Object(key, zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error {
|
|
enc.AddInt("total", total)
|
|
enc.AddString("startKey", redact.Key(startKey))
|
|
enc.AddString("endKey", redact.Key(endKey))
|
|
enc.AddUint64("totalSize", totalSize)
|
|
enc.AddUint64("totalKvs", totalKv)
|
|
enc.AddUint64("totalKvSize", totalKvSize)
|
|
return nil
|
|
}))
|
|
}
|
|
|
|
// SSTMetas make the zap fields for SST metas.
|
|
func SSTMetas(sstMetas []*import_sstpb.SSTMeta) zap.Field {
|
|
return zap.Array("sstMetas", zapSSTMetasMarshaler(sstMetas))
|
|
}
|
|
|
|
type zapKeysMarshaler [][]byte
|
|
|
|
func (keys zapKeysMarshaler) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
|
|
total := len(keys)
|
|
encoder.AddInt("total", total)
|
|
elements := make([]string, 0, total)
|
|
for _, k := range keys {
|
|
elements = append(elements, redact.Key(k))
|
|
}
|
|
_ = encoder.AddArray("keys", AbbreviatedArrayMarshaler(elements))
|
|
return nil
|
|
}
|
|
|
|
// Key constructs a field that carries upper hex format key.
|
|
func Key(fieldKey string, key []byte) zap.Field {
|
|
return zap.String(fieldKey, redact.Key(key))
|
|
}
|
|
|
|
// Keys constructs a field that carries upper hex format keys.
|
|
func Keys(keys [][]byte) zap.Field {
|
|
return zap.Object("keys", zapKeysMarshaler(keys))
|
|
}
|
|
|
|
// AShortError make the zap field with key to display error without verbose representation (e.g. the stack trace).
|
|
func AShortError(key string, err error) zap.Field {
|
|
if err == nil {
|
|
return zap.Skip()
|
|
}
|
|
return zap.String(key, err.Error())
|
|
}
|
|
|
|
// ShortError make the zap field to display error without verbose representation (e.g. the stack trace).
|
|
func ShortError(err error) zap.Field {
|
|
if err == nil {
|
|
return zap.Skip()
|
|
}
|
|
return zap.String("error", err.Error())
|
|
}
|
|
|
|
var loggerToTerm, _, _ = log.InitLogger(new(log.Config), zap.AddCallerSkip(1))
|
|
|
|
// WarnTerm put a log both to terminal and to the log file.
|
|
func WarnTerm(message string, fields ...zap.Field) {
|
|
log.Warn(message, fields...)
|
|
if loggerToTerm != nil {
|
|
loggerToTerm.Warn(message, fields...)
|
|
}
|
|
}
|
|
|
|
// RedactAny constructs a redacted field that carries an interface{}.
|
|
func RedactAny(fieldKey string, key any) zap.Field {
|
|
if redact.NeedRedact() {
|
|
return zap.String(fieldKey, "?")
|
|
}
|
|
return zap.Any(fieldKey, key)
|
|
}
|
|
|
|
// Redact replaces the zap field by a '?' if redaction is turned on.
|
|
func Redact(field zap.Field) zap.Field {
|
|
if redact.NeedRedact() {
|
|
return zap.String(field.Key, "?")
|
|
}
|
|
return field
|
|
}
|
|
|
|
func StringifyRangeOf(start, end []byte) StringifyRange {
|
|
return StringifyRange{StartKey: start, EndKey: end}
|
|
}
|
|
|
|
// StringifyKeys wraps the key range into a stringer.
|
|
type StringifyKeys []kv.KeyRange
|
|
|
|
func (kr StringifyKeys) String() string {
|
|
sb := new(strings.Builder)
|
|
sb.WriteString("{")
|
|
for i, rng := range kr {
|
|
if i > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
sb.WriteString(StringifyRange(rng).String())
|
|
}
|
|
sb.WriteString("}")
|
|
return sb.String()
|
|
}
|
|
|
|
// StringifyRange is the wrapper for displaying a key range.
|
|
type StringifyRange kv.KeyRange
|
|
|
|
func (rng StringifyRange) String() string {
|
|
sb := new(strings.Builder)
|
|
sb.WriteString("[")
|
|
sb.WriteString(redact.Key(rng.StartKey))
|
|
sb.WriteString(", ")
|
|
var endKey string
|
|
if len(rng.EndKey) == 0 {
|
|
endKey = "inf"
|
|
} else {
|
|
endKey = redact.Key(rng.EndKey)
|
|
}
|
|
sb.WriteString(redact.Value(endKey))
|
|
sb.WriteString(")")
|
|
return sb.String()
|
|
}
|
|
|
|
// StringifyMany returns an array marshaler for a slice of stringers.
|
|
func StringifyMany[T fmt.Stringer](items []T) zapcore.ArrayMarshaler {
|
|
return zapcore.ArrayMarshalerFunc(func(ae zapcore.ArrayEncoder) error {
|
|
for _, item := range items {
|
|
ae.AppendString(item.String())
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// HexBytes is a wrapper which make a byte sequence printed by the hex format.
|
|
type HexBytes []byte
|
|
|
|
var (
|
|
_ fmt.Stringer = HexBytes{}
|
|
_ json.Marshaler = HexBytes{}
|
|
)
|
|
|
|
// String implements fmt.Stringer.
|
|
func (b HexBytes) String() string {
|
|
return hex.EncodeToString(b)
|
|
}
|
|
|
|
// MarshalJSON implements json.Marshaler.
|
|
func (b HexBytes) MarshalJSON() ([]byte, error) {
|
|
return json.Marshal(hex.EncodeToString(b))
|
|
}
|
|
|
|
func MarshalHistogram(m prometheus.Histogram) zapcore.ObjectMarshaler {
|
|
return zapcore.ObjectMarshalerFunc(func(mal zapcore.ObjectEncoder) error {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
|
|
met := metric.ReadHistogram(m)
|
|
if met == nil || met.Histogram == nil {
|
|
return nil
|
|
}
|
|
|
|
hist := met.Histogram
|
|
for _, b := range hist.GetBucket() {
|
|
key := fmt.Sprintf("lt_%f", b.GetUpperBound())
|
|
mal.AddUint64(key, b.GetCumulativeCount())
|
|
}
|
|
mal.AddUint64("count", hist.GetSampleCount())
|
|
mal.AddFloat64("total", hist.GetSampleSum())
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// OverrideLevel temporary sets level to the global logger.
|
|
//
|
|
// Don't use this in parallel test cases.
|
|
func OverrideLevelForTest(t *testing.T, lvl zapcore.Level) {
|
|
t.Helper()
|
|
|
|
oldLvl := log.GetLevel()
|
|
t.Cleanup(func() { log.SetLevel(oldLvl) })
|
|
log.SetLevel(lvl)
|
|
}
|