From 449e83236d097ec0100700e10bbb07ff84756dd9 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 26 Aug 2021 17:18:04 +0800 Subject: [PATCH] infosync: support label rule manager (#27551) --- domain/infosync/info.go | 112 +++++++--------------- domain/infosync/label_manager.go | 155 +++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+), 80 deletions(-) create mode 100644 domain/infosync/label_manager.go diff --git a/domain/infosync/info.go b/domain/infosync/info.go index e885bbeb17..dbf62e276d 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -87,16 +87,17 @@ var ErrPrometheusAddrIsNotSet = dbterror.ClassDomain.NewStd(errno.ErrPrometheusA // InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down. type InfoSyncer struct { - etcdCli *clientv3.Client - info *ServerInfo - serverInfoPath string - minStartTS uint64 - minStartTSPath string - manager util2.SessionManager - session *concurrency.Session - topologySession *concurrency.Session - prometheusAddr string - modifyTime time.Time + etcdCli *clientv3.Client + info *ServerInfo + serverInfoPath string + minStartTS uint64 + minStartTSPath string + manager util2.SessionManager + session *concurrency.Session + topologySession *concurrency.Session + prometheusAddr string + modifyTime time.Time + labelRuleManager LabelRuleManager } // ServerInfo is server static information. @@ -175,6 +176,11 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() if err != nil { return nil, err } + if etcdCli != nil { + is.labelRuleManager = initLabelRuleManager(etcdCli.Endpoints()) + } else { + is.labelRuleManager = initLabelRuleManager([]string{}) + } setGlobalInfoSyncer(is) return is, nil } @@ -201,6 +207,13 @@ func (is *InfoSyncer) GetSessionManager() util2.SessionManager { return is.manager } +func initLabelRuleManager(addrs []string) LabelRuleManager { + if len(addrs) == 0 { + return &mockLabelManager{labelRules: map[string]*label.Rule{}} + } + return &PDLabelManager{addrs: addrs} +} + // GetServerInfo gets self server static information. func GetServerInfo() (*ServerInfo, error) { is, err := getGlobalInfoSyncer() @@ -817,24 +830,10 @@ func PutLabelRule(ctx context.Context, rule *label.Rule) error { if err != nil { return err } - - if is.etcdCli == nil { + if is.labelRuleManager == nil { return nil } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return errors.Errorf("pd unavailable") - } - - r, err := json.Marshal(rule) - if err != nil { - return err - } - - _, err = doRequest(ctx, addrs, path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r)) - return err + return is.labelRuleManager.PutLabelRule(ctx, rule) } // UpdateLabelRules synchronizes the label rule to PD. @@ -847,24 +846,10 @@ func UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error { if err != nil { return err } - - if is.etcdCli == nil { + if is.labelRuleManager == nil { return nil } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return errors.Errorf("pd unavailable") - } - - r, err := json.Marshal(patch) - if err != nil { - return err - } - - _, err = doRequest(ctx, addrs, path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r)) - return err + return is.labelRuleManager.UpdateLabelRules(ctx, patch) } // GetAllLabelRules gets all label rules from PD. @@ -873,24 +858,10 @@ func GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { if err != nil { return nil, err } - - if is.etcdCli == nil { - return nil, err + if is.labelRuleManager == nil { + return nil, nil } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return nil, errors.Errorf("pd unavailable") - } - - rules := []*label.Rule{} - res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "region-label", "rules"), "GET", nil) - - if err == nil && res != nil { - err = json.Unmarshal(res, &rules) - } - return rules, err + return is.labelRuleManager.GetAllLabelRules(ctx) } // GetLabelRules gets the label rules according to the given IDs from PD. @@ -903,27 +874,8 @@ func GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) if err != nil { return nil, err } - - if is.etcdCli == nil { + if is.labelRuleManager == nil { return nil, nil } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return nil, errors.Errorf("pd unavailable") - } - - ids, err := json.Marshal(ruleIDs) - if err != nil { - return nil, err - } - - rules := []*label.Rule{} - res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) - - if err == nil && res != nil { - err = json.Unmarshal(res, &rules) - } - return rules, err + return is.labelRuleManager.GetLabelRules(ctx, ruleIDs) } diff --git a/domain/infosync/label_manager.go b/domain/infosync/label_manager.go new file mode 100644 index 0000000000..bf6ba634fd --- /dev/null +++ b/domain/infosync/label_manager.go @@ -0,0 +1,155 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infosync + +import ( + "bytes" + "context" + "encoding/json" + "path" + "sync" + + "github.com/pingcap/tidb/ddl/label" + "github.com/pingcap/tidb/util/pdapi" +) + +// LabelRuleManager manages label rules +type LabelRuleManager interface { + PutLabelRule(ctx context.Context, rule *label.Rule) error + UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error + GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) + GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) +} + +// PDLabelManager manages rules with pd +type PDLabelManager struct { + addrs []string +} + +// PutLabelRule implements PutLabelRule +func (lm *PDLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) error { + r, err := json.Marshal(rule) + if err != nil { + return err + } + _, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r)) + return err +} + +// UpdateLabelRules implements UpdateLabelRules +func (lm *PDLabelManager) UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error { + r, err := json.Marshal(patch) + if err != nil { + return err + } + + _, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r)) + return err +} + +// GetAllLabelRules implements GetAllLabelRules +func (lm *PDLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { + var rules []*label.Rule + res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "GET", nil) + + if err == nil && res != nil { + err = json.Unmarshal(res, &rules) + } + return rules, err +} + +// GetLabelRules implements GetLabelRules +func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) { + ids, err := json.Marshal(ruleIDs) + if err != nil { + return nil, err + } + + rules := []*label.Rule{} + res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) + + if err == nil && res != nil { + err = json.Unmarshal(res, &rules) + } + return rules, err +} + +type mockLabelManager struct { + sync.RWMutex + labelRules map[string]*label.Rule +} + +// PutLabelRule implements PutLabelRule +func (mm *mockLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) error { + mm.Lock() + defer mm.Unlock() + if rule == nil { + return nil + } + mm.labelRules[rule.ID] = rule + return nil +} + +// UpdateLabelRules implements UpdateLabelRules +func (mm *mockLabelManager) UpdateLabelRules(ctx context.Context, patch *label.RulePatch) error { + mm.Lock() + defer mm.Unlock() + if patch == nil { + return nil + } + for _, p := range patch.DeleteRules { + delete(mm.labelRules, p) + } + for _, p := range patch.SetRules { + if p == nil { + continue + } + mm.labelRules[p.ID] = p + } + return nil +} + +// mockLabelManager implements GetAllLabelRules +func (mm *mockLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { + mm.RLock() + defer mm.RUnlock() + r := make([]*label.Rule, 0, len(mm.labelRules)) + for _, labelRule := range mm.labelRules { + if labelRule == nil { + continue + } + r = append(r, labelRule) + } + return r, nil +} + +// mockLabelManager implements GetLabelRules +func (mm *mockLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) ([]*label.Rule, error) { + mm.RLock() + defer mm.RUnlock() + r := make([]*label.Rule, 0, len(ruleIDs)) + for _, ruleID := range ruleIDs { + for _, labelRule := range mm.labelRules { + if labelRule.ID == ruleID { + if labelRule == nil { + continue + } + r = append(r, labelRule) + break + } + } + } + return r, nil +}