executor: parallel cancel mpp query (#36161)
ref pingcap/tiflash#5095, close pingcap/tidb#36164
This commit is contained in:
@ -32,6 +32,7 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/driver/backoff"
|
||||
derr "github.com/pingcap/tidb/store/driver/error"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/mathutil"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
@ -341,13 +342,18 @@ func (m *mppIterator) cancelMppTasks() {
|
||||
}
|
||||
|
||||
// send cancel cmd to all stores where tasks run
|
||||
wg := util.WaitGroupWrapper{}
|
||||
for addr := range usedStoreAddrs {
|
||||
_, err := m.store.GetTiKVClient().SendRequest(context.Background(), addr, wrappedReq, tikv.ReadTimeoutShort)
|
||||
logutil.BgLogger().Debug("cancel task ", zap.Uint64("query id ", m.startTs), zap.String(" on addr ", addr))
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("cancel task error: ", zap.Error(err), zap.Uint64(" for query id ", m.startTs), zap.String(" on addr ", addr))
|
||||
}
|
||||
storeAddr := addr
|
||||
wg.Run(func() {
|
||||
_, err := m.store.GetTiKVClient().SendRequest(context.Background(), storeAddr, wrappedReq, tikv.ReadTimeoutShort)
|
||||
logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr))
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr))
|
||||
}
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
|
||||
|
||||
Reference in New Issue
Block a user