// Copyright 2025 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package join import ( "bytes" "fmt" "strconv" "sync/atomic" "time" "github.com/pingcap/tidb/pkg/util/execdetails" ) type hashJoinRuntimeStats struct { fetchAndBuildHashTable time.Duration hashStat hashStatistic fetchAndProbe int64 probe int64 concurrent int maxFetchAndProbe int64 } // Tp implements the RuntimeStats interface. func (*hashJoinRuntimeStats) Tp() int { return execdetails.TpHashJoinRuntimeStats } func (e *hashJoinRuntimeStats) String() string { buf := bytes.NewBuffer(make([]byte, 0, 128)) if e.fetchAndBuildHashTable > 0 { buf.WriteString("build_hash_table:{total:") buf.WriteString(execdetails.FormatDuration(e.fetchAndBuildHashTable)) buf.WriteString(", fetch:") buf.WriteString(execdetails.FormatDuration(e.fetchAndBuildHashTable - e.hashStat.buildTableElapse)) buf.WriteString(", build:") buf.WriteString(execdetails.FormatDuration(e.hashStat.buildTableElapse)) buf.WriteString("}") } if e.probe > 0 { buf.WriteString(", probe:{concurrency:") buf.WriteString(strconv.Itoa(e.concurrent)) buf.WriteString(", total:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe))) buf.WriteString(", max:") buf.WriteString(execdetails.FormatDuration(time.Duration(atomic.LoadInt64(&e.maxFetchAndProbe)))) buf.WriteString(", probe:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.probe))) // fetch time is the time wait fetch result from its child executor, // wait time is the time wait its parent executor to fetch the joined result buf.WriteString(", fetch and wait:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe - e.probe))) if e.hashStat.probeCollision > 0 { buf.WriteString(", probe_collision:") buf.WriteString(strconv.FormatInt(e.hashStat.probeCollision, 10)) } buf.WriteString("}") } return buf.String() } func (e *hashJoinRuntimeStats) Clone() execdetails.RuntimeStats { return &hashJoinRuntimeStats{ fetchAndBuildHashTable: e.fetchAndBuildHashTable, hashStat: e.hashStat, fetchAndProbe: e.fetchAndProbe, probe: e.probe, concurrent: e.concurrent, maxFetchAndProbe: e.maxFetchAndProbe, } } func (e *hashJoinRuntimeStats) Merge(rs execdetails.RuntimeStats) { tmp, ok := rs.(*hashJoinRuntimeStats) if !ok { return } e.fetchAndBuildHashTable += tmp.fetchAndBuildHashTable e.hashStat.buildTableElapse += tmp.hashStat.buildTableElapse e.hashStat.probeCollision += tmp.hashStat.probeCollision e.fetchAndProbe += tmp.fetchAndProbe e.probe += tmp.probe if e.maxFetchAndProbe < tmp.maxFetchAndProbe { e.maxFetchAndProbe = tmp.maxFetchAndProbe } } type hashStatistic struct { // NOTE: probeCollision may be accessed from multiple goroutines concurrently. probeCollision int64 buildTableElapse time.Duration } func (s *hashStatistic) String() string { return fmt.Sprintf("probe_collision:%v, build:%v", s.probeCollision, execdetails.FormatDuration(s.buildTableElapse)) } type hashJoinRuntimeStatsV2 struct { concurrent int probeCollision int64 fetchAndBuildHashTable int64 partitionData int64 buildHashTable int64 probe int64 fetchAndProbe int64 maxPartitionData int64 maxBuildHashTable int64 maxProbe int64 maxFetchAndProbe int64 maxPartitionDataForCurrentRound int64 maxBuildHashTableForCurrentRound int64 maxProbeForCurrentRound int64 maxFetchAndProbeForCurrentRound int64 } func setMaxValue(addr *int64, currentValue int64) { for { value := atomic.LoadInt64(addr) if currentValue <= value { return } if atomic.CompareAndSwapInt64(addr, value, currentValue) { return } } } func (e *hashJoinRuntimeStatsV2) reset() { e.probeCollision = 0 e.fetchAndBuildHashTable = 0 e.partitionData = 0 e.buildHashTable = 0 e.probe = 0 e.fetchAndProbe = 0 e.maxPartitionData = 0 e.maxBuildHashTable = 0 e.maxProbe = 0 e.maxFetchAndProbe = 0 e.maxPartitionDataForCurrentRound = 0 e.maxBuildHashTableForCurrentRound = 0 e.maxProbeForCurrentRound = 0 e.maxFetchAndProbeForCurrentRound = 0 } func (e *hashJoinRuntimeStatsV2) resetCurrentRound() { e.maxPartitionData += e.maxPartitionDataForCurrentRound e.maxBuildHashTable += e.maxBuildHashTableForCurrentRound e.maxProbe += e.maxProbeForCurrentRound e.maxFetchAndProbe += e.maxFetchAndProbeForCurrentRound e.maxPartitionDataForCurrentRound = 0 e.maxBuildHashTableForCurrentRound = 0 e.maxProbeForCurrentRound = 0 e.maxFetchAndProbeForCurrentRound = 0 } // Tp implements the RuntimeStats interface. func (*hashJoinRuntimeStatsV2) Tp() int { return execdetails.TpHashJoinRuntimeStatsV2 } func (e *hashJoinRuntimeStatsV2) String() string { buf := bytes.NewBuffer(make([]byte, 0, 128)) if e.fetchAndBuildHashTable > 0 { buf.WriteString("build_hash_table:{total:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable))) buf.WriteString(", fetch:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable - e.maxBuildHashTable - e.maxPartitionData))) buf.WriteString(", build:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxBuildHashTable + e.maxPartitionData))) buf.WriteString("}") } if e.probe > 0 { buf.WriteString(", probe:{concurrency:") buf.WriteString(strconv.Itoa(e.concurrent)) buf.WriteString(", total:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe))) buf.WriteString(", max:") buf.WriteString(execdetails.FormatDuration(time.Duration(atomic.LoadInt64(&e.maxFetchAndProbe)))) buf.WriteString(", probe:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxProbe))) buf.WriteString(", fetch and wait:") buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe - e.maxProbe))) if e.probeCollision > 0 { buf.WriteString(", probe_collision:") buf.WriteString(strconv.FormatInt(e.probeCollision, 10)) } buf.WriteString("}") } return buf.String() } func (e *hashJoinRuntimeStatsV2) Clone() execdetails.RuntimeStats { return &hashJoinRuntimeStatsV2{ concurrent: e.concurrent, probeCollision: e.probeCollision, fetchAndBuildHashTable: e.fetchAndBuildHashTable, partitionData: e.partitionData, buildHashTable: e.buildHashTable, probe: e.probe, fetchAndProbe: e.fetchAndProbe, maxPartitionData: e.maxPartitionData, maxBuildHashTable: e.maxBuildHashTable, maxProbe: e.maxProbe, maxFetchAndProbe: e.maxFetchAndProbe, maxPartitionDataForCurrentRound: e.maxPartitionDataForCurrentRound, maxBuildHashTableForCurrentRound: e.maxBuildHashTableForCurrentRound, maxProbeForCurrentRound: e.maxProbeForCurrentRound, maxFetchAndProbeForCurrentRound: e.maxFetchAndProbeForCurrentRound, } } func (e *hashJoinRuntimeStatsV2) Merge(rs execdetails.RuntimeStats) { tmp, ok := rs.(*hashJoinRuntimeStatsV2) if !ok { return } e.fetchAndBuildHashTable += tmp.fetchAndBuildHashTable e.buildHashTable += tmp.buildHashTable if e.maxBuildHashTable < tmp.maxBuildHashTable { e.maxBuildHashTable = tmp.maxBuildHashTable } e.partitionData += tmp.partitionData if e.maxPartitionData < tmp.maxPartitionData { e.maxPartitionData = tmp.maxPartitionData } e.probeCollision += tmp.probeCollision e.fetchAndProbe += tmp.fetchAndProbe e.probe += tmp.probe if e.maxFetchAndProbe < tmp.maxFetchAndProbe { e.maxFetchAndProbe = tmp.maxFetchAndProbe } }