diff --git a/ddl/partition.go b/ddl/partition.go index ef81b7da5b..b9d0d72d91 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -130,7 +130,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v return ver, errors.Trace(err) } - if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil { + if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } @@ -1040,7 +1040,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( if job.Type == model.ActionAddTablePartition { // It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo. physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo) - err = infosync.PutRuleBundles(context.TODO(), rollbackBundles) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles) if err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") @@ -1208,7 +1208,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e return ver, errors.Trace(err) } - err = infosync.PutRuleBundles(context.TODO(), bundles) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) if err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") @@ -1412,7 +1412,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil { + if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } diff --git a/ddl/placement_policy.go b/ddl/placement_policy.go index 0fcd17270c..65a445ca62 100644 --- a/ddl/placement_policy.go +++ b/ddl/placement_policy.go @@ -262,7 +262,7 @@ func onAlterPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, cp := bundle.Clone() bundles = append(bundles, cp.Reset(placement.RuleIndexPartition, []int64{id})) } - err = infosync.PutRuleBundles(context.TODO(), bundles) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) if err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") diff --git a/ddl/table.go b/ddl/table.go index c2f217fc6e..906db66f3a 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -101,7 +101,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } // Send the placement bundle to PD. - err = infosync.PutRuleBundles(context.TODO(), bundles) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) if err != nil { job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") @@ -580,7 +580,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro return ver, errors.Trace(err) } - err = infosync.PutRuleBundles(context.TODO(), bundles) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) if err != nil { job.State = model.JobStateCancelled return 0, errors.Wrapf(err, "failed to notify PD the placement rules") @@ -1302,7 +1302,7 @@ func onAlterTablePartitionOptions(d *ddlCtx, t *meta.Meta, job *model.Job) (ver // Send the placement bundle to PD. if bundle != nil { - err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle}) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle}) } if err != nil { @@ -1353,7 +1353,7 @@ func onAlterTablePlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, // Send the placement bundle to PD. if bundle != nil { - err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle}) + err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle}) } if err != nil { diff --git a/domain/infosync/error.go b/domain/infosync/error.go new file mode 100644 index 0000000000..53c1fedcd9 --- /dev/null +++ b/domain/infosync/error.go @@ -0,0 +1,28 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infosync + +import ( + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/util/dbterror" +) + +var ( + // ErrHTTPServiceError means we got a http response with a status code which is not '2xx' + ErrHTTPServiceError = dbterror.ClassDomain.NewStdErr( + errno.ErrHTTPServiceError, mysql.Message("HTTP request failed with status %s", nil), + ) +) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 26bd49d57e..fc58783ff3 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -80,6 +80,10 @@ const ( TopologyPrometheus = "/topology/prometheus" // TablePrometheusCacheExpiry is the expiry time for prometheus address cache. TablePrometheusCacheExpiry = 10 * time.Second + // RequestRetryInterval is the sleep time before next retry for http request + RequestRetryInterval = 200 * time.Millisecond + // SyncBundlesMaxRetry is the max retry times for sync placement bundles + SyncBundlesMaxRetry = 3 ) // ErrPrometheusAddrIsNotSet is the error that Prometheus address is not set in PD and etcd @@ -353,7 +357,7 @@ func doRequest(ctx context.Context, addrs []string, route, method string, body i return nil, err } if res.StatusCode != http.StatusOK { - err = errors.Errorf("%s", bodyBytes) + err = ErrHTTPServiceError.FastGen("%s", bodyBytes) if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusPreconditionFailed { err = nil bodyBytes = nil @@ -427,6 +431,16 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) // PutRuleBundles is used to post specific rule bundles to PD. func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error { + failpoint.Inject("putRuleBundlesError", func(isServiceError failpoint.Value) { + var err error + if isServiceError.(bool) { + err = ErrHTTPServiceError.FastGen("mock service error") + } else { + err = errors.New("mock other error") + } + failpoint.Return(err) + }) + is, err := getGlobalInfoSyncer() if err != nil { return err @@ -435,6 +449,31 @@ func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error { return is.placementManager.PutRuleBundles(ctx, bundles) } +// PutRuleBundlesWithRetry will retry for specified times when PutRuleBundles failed +func PutRuleBundlesWithRetry(ctx context.Context, bundles []*placement.Bundle, maxRetry int, interval time.Duration) (err error) { + if maxRetry < 0 { + maxRetry = 0 + } + + for i := 0; i <= maxRetry; i++ { + if err = PutRuleBundles(ctx, bundles); err == nil || ErrHTTPServiceError.Equal(err) { + return err + } + + if i != maxRetry { + logutil.BgLogger().Warn("Error occurs when PutRuleBundles, retry", zap.Error(err)) + time.Sleep(interval) + } + } + + return +} + +// PutRuleBundlesWithDefaultRetry will retry for default times +func PutRuleBundlesWithDefaultRetry(ctx context.Context, bundles []*placement.Bundle) (err error) { + return PutRuleBundlesWithRetry(ctx, bundles, SyncBundlesMaxRetry, RequestRetryInterval) +} + func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { allInfo := make(map[string]*ServerInfo) if is.etcdCli == nil { diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index e839b5daa8..001a106632 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -26,8 +26,10 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/owner" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/util/testbridge" "github.com/stretchr/testify/require" "go.etcd.io/etcd/integration" @@ -145,3 +147,66 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } return len(resp.Kvs) == 1, nil } + +func TestPutBundlesRetry(t *testing.T) { + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false) + require.NoError(t, err) + + bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) + require.NoError(t, err) + bundle = bundle.Reset(placement.RuleIndexTable, []int64{1024}) + + t.Run("serviceErrorShouldNotRetry", func(t *testing.T) { + require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}})) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "1*return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError")) + }() + + err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond) + require.Error(t, err) + require.Equal(t, "[domain:8243]mock service error", err.Error()) + + got, err := GetRuleBundle(context.TODO(), bundle.ID) + require.NoError(t, err) + require.True(t, got.IsEmpty()) + }) + + t.Run("nonServiceErrorShouldRetry", func(t *testing.T) { + require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}})) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "3*return(false)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError")) + }() + + err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond) + require.NoError(t, err) + + got, err := GetRuleBundle(context.TODO(), bundle.ID) + require.NoError(t, err) + + gotJSON, err := json.Marshal(got) + require.NoError(t, err) + + expectJSON, err := json.Marshal(bundle) + require.NoError(t, err) + + require.Equal(t, expectJSON, gotJSON) + }) + + t.Run("nonServiceErrorRetryAndFail", func(t *testing.T) { + require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}})) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "4*return(false)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError")) + }() + + err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond) + require.Error(t, err) + require.Equal(t, "mock other error", err.Error()) + + got, err := GetRuleBundle(context.TODO(), bundle.ID) + require.NoError(t, err) + require.True(t, got.IsEmpty()) + }) +} diff --git a/errno/errcode.go b/errno/errcode.go index e0c5f7cccb..ea1fb86ea7 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1055,6 +1055,7 @@ const ( ErrPlacementPolicyWithDirectOption = 8240 ErrPlacementPolicyInUse = 8241 ErrOptOnCacheTable = 8242 + ErrHTTPServiceError = 8243 // TiKV/PD/TiFlash errors. ErrPDServerTimeout = 9001 ErrTiKVServerTimeout = 9002 diff --git a/errors.toml b/errors.toml index 0f27a704f6..8709460393 100644 --- a/errors.toml +++ b/errors.toml @@ -726,6 +726,11 @@ error = ''' Information schema is changed during the execution of the statement(for example, table definition may be updated by other DDL ran in parallel). If you see this error often, try increasing `tidb_max_delta_schema_count`. [try again later] ''' +["domain:8243"] +error = ''' +HTTP request failed with status %s +''' + ["domain:9009"] error = ''' Prometheus address is not set in PD and etcd diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index e238c85ee2..854408f082 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -1914,7 +1914,7 @@ func (w *GCWorker) doGCPlacementRules(dr util.DelRangeTask) (err error) { for _, id := range physicalTableIDs { bundles = append(bundles, placement.NewBundle(id)) } - return infosync.PutRuleBundles(context.TODO(), bundles) + return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) } func (w *GCWorker) doGCLabelRules(dr util.DelRangeTask) (err error) {