*: Retry when placement PutBundles failed (#30590)
This commit is contained in:
@ -130,7 +130,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
|
||||
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
}
|
||||
@ -1040,7 +1040,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
|
||||
if job.Type == model.ActionAddTablePartition {
|
||||
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
|
||||
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
|
||||
err = infosync.PutRuleBundles(context.TODO(), rollbackBundles)
|
||||
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
@ -1208,7 +1208,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
err = infosync.PutRuleBundles(context.TODO(), bundles)
|
||||
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
@ -1412,7 +1412,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
|
||||
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
}
|
||||
|
||||
@ -262,7 +262,7 @@ func onAlterPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
|
||||
cp := bundle.Clone()
|
||||
bundles = append(bundles, cp.Reset(placement.RuleIndexPartition, []int64{id}))
|
||||
}
|
||||
err = infosync.PutRuleBundles(context.TODO(), bundles)
|
||||
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
|
||||
@ -101,7 +101,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
|
||||
}
|
||||
|
||||
// Send the placement bundle to PD.
|
||||
err = infosync.PutRuleBundles(context.TODO(), bundles)
|
||||
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
@ -580,7 +580,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
err = infosync.PutRuleBundles(context.TODO(), bundles)
|
||||
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
|
||||
@ -1302,7 +1302,7 @@ func onAlterTablePartitionOptions(d *ddlCtx, t *meta.Meta, job *model.Job) (ver
|
||||
|
||||
// Send the placement bundle to PD.
|
||||
if bundle != nil {
|
||||
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
|
||||
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -1353,7 +1353,7 @@ func onAlterTablePlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
|
||||
|
||||
// Send the placement bundle to PD.
|
||||
if bundle != nil {
|
||||
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
|
||||
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
||||
28
domain/infosync/error.go
Normal file
28
domain/infosync/error.go
Normal file
@ -0,0 +1,28 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package infosync
|
||||
|
||||
import (
|
||||
"github.com/pingcap/tidb/errno"
|
||||
"github.com/pingcap/tidb/parser/mysql"
|
||||
"github.com/pingcap/tidb/util/dbterror"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrHTTPServiceError means we got a http response with a status code which is not '2xx'
|
||||
ErrHTTPServiceError = dbterror.ClassDomain.NewStdErr(
|
||||
errno.ErrHTTPServiceError, mysql.Message("HTTP request failed with status %s", nil),
|
||||
)
|
||||
)
|
||||
@ -80,6 +80,10 @@ const (
|
||||
TopologyPrometheus = "/topology/prometheus"
|
||||
// TablePrometheusCacheExpiry is the expiry time for prometheus address cache.
|
||||
TablePrometheusCacheExpiry = 10 * time.Second
|
||||
// RequestRetryInterval is the sleep time before next retry for http request
|
||||
RequestRetryInterval = 200 * time.Millisecond
|
||||
// SyncBundlesMaxRetry is the max retry times for sync placement bundles
|
||||
SyncBundlesMaxRetry = 3
|
||||
)
|
||||
|
||||
// ErrPrometheusAddrIsNotSet is the error that Prometheus address is not set in PD and etcd
|
||||
@ -353,7 +357,7 @@ func doRequest(ctx context.Context, addrs []string, route, method string, body i
|
||||
return nil, err
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
err = errors.Errorf("%s", bodyBytes)
|
||||
err = ErrHTTPServiceError.FastGen("%s", bodyBytes)
|
||||
if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusPreconditionFailed {
|
||||
err = nil
|
||||
bodyBytes = nil
|
||||
@ -427,6 +431,16 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error)
|
||||
|
||||
// PutRuleBundles is used to post specific rule bundles to PD.
|
||||
func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
|
||||
failpoint.Inject("putRuleBundlesError", func(isServiceError failpoint.Value) {
|
||||
var err error
|
||||
if isServiceError.(bool) {
|
||||
err = ErrHTTPServiceError.FastGen("mock service error")
|
||||
} else {
|
||||
err = errors.New("mock other error")
|
||||
}
|
||||
failpoint.Return(err)
|
||||
})
|
||||
|
||||
is, err := getGlobalInfoSyncer()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -435,6 +449,31 @@ func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
|
||||
return is.placementManager.PutRuleBundles(ctx, bundles)
|
||||
}
|
||||
|
||||
// PutRuleBundlesWithRetry will retry for specified times when PutRuleBundles failed
|
||||
func PutRuleBundlesWithRetry(ctx context.Context, bundles []*placement.Bundle, maxRetry int, interval time.Duration) (err error) {
|
||||
if maxRetry < 0 {
|
||||
maxRetry = 0
|
||||
}
|
||||
|
||||
for i := 0; i <= maxRetry; i++ {
|
||||
if err = PutRuleBundles(ctx, bundles); err == nil || ErrHTTPServiceError.Equal(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
if i != maxRetry {
|
||||
logutil.BgLogger().Warn("Error occurs when PutRuleBundles, retry", zap.Error(err))
|
||||
time.Sleep(interval)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// PutRuleBundlesWithDefaultRetry will retry for default times
|
||||
func PutRuleBundlesWithDefaultRetry(ctx context.Context, bundles []*placement.Bundle) (err error) {
|
||||
return PutRuleBundlesWithRetry(ctx, bundles, SyncBundlesMaxRetry, RequestRetryInterval)
|
||||
}
|
||||
|
||||
func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
|
||||
allInfo := make(map[string]*ServerInfo)
|
||||
if is.etcdCli == nil {
|
||||
|
||||
@ -26,8 +26,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/ddl/placement"
|
||||
"github.com/pingcap/tidb/ddl/util"
|
||||
"github.com/pingcap/tidb/owner"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/util/testbridge"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/etcd/integration"
|
||||
@ -145,3 +147,66 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
|
||||
}
|
||||
return len(resp.Kvs) == 1, nil
|
||||
}
|
||||
|
||||
func TestPutBundlesRetry(t *testing.T) {
|
||||
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
|
||||
require.NoError(t, err)
|
||||
bundle = bundle.Reset(placement.RuleIndexTable, []int64{1024})
|
||||
|
||||
t.Run("serviceErrorShouldNotRetry", func(t *testing.T) {
|
||||
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
|
||||
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "1*return(true)"))
|
||||
defer func() {
|
||||
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
|
||||
}()
|
||||
|
||||
err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "[domain:8243]mock service error", err.Error())
|
||||
|
||||
got, err := GetRuleBundle(context.TODO(), bundle.ID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, got.IsEmpty())
|
||||
})
|
||||
|
||||
t.Run("nonServiceErrorShouldRetry", func(t *testing.T) {
|
||||
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
|
||||
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "3*return(false)"))
|
||||
defer func() {
|
||||
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
|
||||
}()
|
||||
|
||||
err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := GetRuleBundle(context.TODO(), bundle.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
gotJSON, err := json.Marshal(got)
|
||||
require.NoError(t, err)
|
||||
|
||||
expectJSON, err := json.Marshal(bundle)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, expectJSON, gotJSON)
|
||||
})
|
||||
|
||||
t.Run("nonServiceErrorRetryAndFail", func(t *testing.T) {
|
||||
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
|
||||
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "4*return(false)"))
|
||||
defer func() {
|
||||
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
|
||||
}()
|
||||
|
||||
err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "mock other error", err.Error())
|
||||
|
||||
got, err := GetRuleBundle(context.TODO(), bundle.ID)
|
||||
require.NoError(t, err)
|
||||
require.True(t, got.IsEmpty())
|
||||
})
|
||||
}
|
||||
|
||||
@ -1055,6 +1055,7 @@ const (
|
||||
ErrPlacementPolicyWithDirectOption = 8240
|
||||
ErrPlacementPolicyInUse = 8241
|
||||
ErrOptOnCacheTable = 8242
|
||||
ErrHTTPServiceError = 8243
|
||||
// TiKV/PD/TiFlash errors.
|
||||
ErrPDServerTimeout = 9001
|
||||
ErrTiKVServerTimeout = 9002
|
||||
|
||||
@ -726,6 +726,11 @@ error = '''
|
||||
Information schema is changed during the execution of the statement(for example, table definition may be updated by other DDL ran in parallel). If you see this error often, try increasing `tidb_max_delta_schema_count`. [try again later]
|
||||
'''
|
||||
|
||||
["domain:8243"]
|
||||
error = '''
|
||||
HTTP request failed with status %s
|
||||
'''
|
||||
|
||||
["domain:9009"]
|
||||
error = '''
|
||||
Prometheus address is not set in PD and etcd
|
||||
|
||||
@ -1914,7 +1914,7 @@ func (w *GCWorker) doGCPlacementRules(dr util.DelRangeTask) (err error) {
|
||||
for _, id := range physicalTableIDs {
|
||||
bundles = append(bundles, placement.NewBundle(id))
|
||||
}
|
||||
return infosync.PutRuleBundles(context.TODO(), bundles)
|
||||
return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
|
||||
}
|
||||
|
||||
func (w *GCWorker) doGCLabelRules(dr util.DelRangeTask) (err error) {
|
||||
|
||||
Reference in New Issue
Block a user