From 2f0bafe98bed8fbbe0dbf8317f4e7efb0139fcb9 Mon Sep 17 00:00:00 2001 From: CbcWestwolf <1004626265@qq.com> Date: Tue, 5 Aug 2025 17:52:27 +0800 Subject: [PATCH] util/etcd: remove useless code (#62833) ref pingcap/tidb#59737 --- pkg/disttask/importinto/BUILD.bazel | 2 +- pkg/disttask/importinto/scheduler.go | 6 +- pkg/executor/importer/BUILD.bazel | 1 - pkg/executor/importer/precheck.go | 15 +- pkg/executor/importer/precheck_test.go | 5 +- pkg/util/etcd/BUILD.bazel | 2 - pkg/util/etcd/etcd.go | 317 --------------- pkg/util/etcd/etcd_test.go | 371 ------------------ .../importintotest/import_into_test.go | 4 +- .../importintotest/precheck_test.go | 10 +- 10 files changed, 23 insertions(+), 710 deletions(-) diff --git a/pkg/disttask/importinto/BUILD.bazel b/pkg/disttask/importinto/BUILD.bazel index 748f0ff275..6b07dee50d 100644 --- a/pkg/disttask/importinto/BUILD.bazel +++ b/pkg/disttask/importinto/BUILD.bazel @@ -61,7 +61,6 @@ go_library( "//pkg/util", "//pkg/util/backoff", "//pkg/util/disttask", - "//pkg/util/etcd", "//pkg/util/logutil", "//pkg/util/promutil", "@com_github_docker_go_units//:go-units", @@ -70,6 +69,7 @@ go_library( "@com_github_pingcap_failpoint//:failpoint", "@com_github_prometheus_client_golang//prometheus", "@com_github_tikv_client_go_v2//util", + "@io_etcd_go_etcd_client_v3//:client", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 435ec10efa..449b5aedec 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -41,8 +41,8 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/backoff" disttaskutil "github.com/pingcap/tidb/pkg/util/disttask" - "github.com/pingcap/tidb/pkg/util/etcd" "github.com/pingcap/tidb/pkg/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -64,7 +64,7 @@ type taskInfo struct { lastRegisterTime time.Time // initialized lazily in register() - etcdClient *etcd.Client + etcdClient *clientv3.Client taskRegister utils.TaskRegister } @@ -84,7 +84,7 @@ func (t *taskInfo) register(ctx context.Context) { return } t.etcdClient = client - t.taskRegister = NewTaskRegisterWithTTL(client.GetClient(), registerTaskTTL, + t.taskRegister = NewTaskRegisterWithTTL(client, registerTaskTTL, utils.RegisterImportInto, strconv.FormatInt(t.taskID, 10)) } timeoutCtx, cancel := context.WithTimeout(ctx, registerTimeout) diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 3313db4bff..c7f0a1a36b 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -153,7 +153,6 @@ go_test( "//pkg/util/chunk", "//pkg/util/dbterror/exeerrors", "//pkg/util/dbterror/plannererrors", - "//pkg/util/etcd", "//pkg/util/logutil", "//pkg/util/mock", "//pkg/util/promutil", diff --git a/pkg/executor/importer/precheck.go b/pkg/executor/importer/precheck.go index 0cc35e3b84..d2991e664c 100644 --- a/pkg/executor/importer/precheck.go +++ b/pkg/executor/importer/precheck.go @@ -28,9 +28,9 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/cdcutil" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" - "github.com/pingcap/tidb/pkg/util/etcd" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/sqlexec" + clientv3 "go.etcd.io/etcd/client/v3" ) const ( @@ -112,7 +112,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error { } defer terror.Call(cli.Close) - pitrCli := streamhelper.NewMetaDataClient(cli.GetClient()) + pitrCli := streamhelper.NewMetaDataClient(cli) tasks, err := pitrCli.GetAllTasks(ctx) if err != nil { return err @@ -125,7 +125,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error { return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs(fmt.Sprintf("found PiTR log streaming task(s): %v,", names)) } - nameSet, err := cdcutil.GetRunningChangefeeds(ctx, cli.GetClient()) + nameSet, err := cdcutil.GetRunningChangefeeds(ctx, cli) if err != nil { return errors.Trace(err) } @@ -171,7 +171,7 @@ func (e *LoadDataController) checkGlobalSortStorePrivilege(ctx context.Context) return nil } -func getEtcdClient() (*etcd.Client, error) { +func getEtcdClient() (cli *clientv3.Client, err error) { tidbCfg := tidb.GetGlobalConfig() tls, err := util.NewTLSConfig( util.WithCAPath(tidbCfg.Security.ClusterSSLCA), @@ -184,5 +184,10 @@ func getEtcdClient() (*etcd.Client, error) { if err != nil { return nil, err } - return etcd.NewClientFromCfg(ectdEndpoints, etcdDialTimeout, "", tls) + return clientv3.New(clientv3.Config{ + Endpoints: ectdEndpoints, + DialTimeout: etcdDialTimeout, + TLS: tls, + AutoSyncInterval: 30 * time.Second, + }) } diff --git a/pkg/executor/importer/precheck_test.go b/pkg/executor/importer/precheck_test.go index 337dafc85e..c18b415a3f 100644 --- a/pkg/executor/importer/precheck_test.go +++ b/pkg/executor/importer/precheck_test.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/cdcutil" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" - "github.com/pingcap/tidb/pkg/util/etcd" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" clientv3 "go.etcd.io/etcd/client/v3" @@ -126,12 +125,12 @@ func TestCheckRequirements(t *testing.T) { embedEtcd.Close() }) backup := importer.GetEtcdClient - importer.GetEtcdClient = func() (*etcd.Client, error) { + importer.GetEtcdClient = func() (*clientv3.Client, error) { etcdCli, err := clientv3.New(clientv3.Config{ Endpoints: []string{clientAddr}, }) require.NoError(t, err) - return etcd.NewClient(etcdCli, ""), nil + return etcdCli, nil } t.Cleanup(func() { importer.GetEtcdClient = backup diff --git a/pkg/util/etcd/BUILD.bazel b/pkg/util/etcd/BUILD.bazel index d53b6d7758..30bef978d3 100644 --- a/pkg/util/etcd/BUILD.bazel +++ b/pkg/util/etcd/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/util/etcd", visibility = ["//visibility:public"], deps = [ - "@com_github_pingcap_errors//:errors", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//namespace", ], @@ -19,7 +18,6 @@ go_test( embed = [":etcd"], flaky = True, deps = [ - "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_tests_v3//integration", diff --git a/pkg/util/etcd/etcd.go b/pkg/util/etcd/etcd.go index bb4dac73f0..2e6569aa66 100644 --- a/pkg/util/etcd/etcd.go +++ b/pkg/util/etcd/etcd.go @@ -15,327 +15,10 @@ package etcd import ( - "context" - "crypto/tls" - "fmt" - "path" - "strings" - "time" - - "github.com/pingcap/errors" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/namespace" ) -// Node organizes the ectd query result as a Trie tree -type Node struct { - Childs map[string]*Node - Value []byte -} - -// OpType is operation's type in etcd -type OpType string - -var ( - // CreateOp is create operation type - CreateOp OpType = "create" - - // UpdateOp is update operation type - UpdateOp OpType = "update" - - // DeleteOp is delete operation type - DeleteOp OpType = "delete" -) - -// Operation represents an operation in etcd, include create, update and delete. -type Operation struct { - Tp OpType - Key string - Value string - Opts []clientv3.OpOption - TTL int64 - WithPrefix bool -} - -// String implements Stringer interface. -func (o *Operation) String() string { - return fmt.Sprintf("{Tp: %s, Key: %s, Value: %s, TTL: %d, WithPrefix: %v, Opts: %v}", o.Tp, o.Key, o.Value, o.TTL, o.WithPrefix, o.Opts) -} - -// Client is a wrapped etcd client that support some simple method -type Client struct { - client *clientv3.Client - rootPath string -} - -// NewClient returns a wrapped etcd client -func NewClient(cli *clientv3.Client, root string) *Client { - return &Client{ - client: cli, - rootPath: root, - } -} - -// NewClientFromCfg returns a wrapped etcd client -func NewClientFromCfg(endpoints []string, dialTimeout time.Duration, root string, security *tls.Config) (*Client, error) { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: dialTimeout, - TLS: security, - AutoSyncInterval: 30 * time.Second, - }) - if err != nil { - return nil, errors.Trace(err) - } - - return &Client{ - client: cli, - rootPath: root, - }, nil -} - -// Close shutdowns the connection to etcd -func (e *Client) Close() error { - if err := e.client.Close(); err != nil { - return errors.Trace(err) - } - return nil -} - -// GetClient returns client -func (e *Client) GetClient() *clientv3.Client { - return e.client -} - -// Create guarantees to set a key = value with some options(like ttl) -func (e *Client) Create(ctx context.Context, key string, val string, opts []clientv3.OpOption) (int64, error) { - key = keyWithPrefix(e.rootPath, key) - txnResp, err := e.client.KV.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(key), "=", 0), - ).Then( - clientv3.OpPut(key, val, opts...), - ).Commit() - if err != nil { - return 0, errors.Trace(err) - } - - if !txnResp.Succeeded { - return 0, errors.AlreadyExistsf("key %s in etcd", key) - } - - if txnResp.Header != nil { - return txnResp.Header.Revision, nil - } - - // impossible to happen - return 0, errors.New("revision is unknown") -} - -// Get returns a key/value matchs the given key -func (e *Client) Get(ctx context.Context, key string) (value []byte, revision int64, err error) { - key = keyWithPrefix(e.rootPath, key) - resp, err := e.client.KV.Get(ctx, key) - if err != nil { - return nil, -1, errors.Trace(err) - } - - if len(resp.Kvs) == 0 { - return nil, -1, errors.NotFoundf("key %s in etcd", key) - } - - return resp.Kvs[0].Value, resp.Header.Revision, nil -} - -// Update updates a key/value. -// set ttl 0 to disable the Lease ttl feature -func (e *Client) Update(ctx context.Context, key string, val string, ttl int64) error { - key = keyWithPrefix(e.rootPath, key) - - var opts []clientv3.OpOption - if ttl > 0 { - lcr, err := e.client.Lease.Grant(ctx, ttl) - if err != nil { - return errors.Trace(err) - } - - opts = []clientv3.OpOption{clientv3.WithLease(lcr.ID)} - } - - txnResp, err := e.client.KV.Txn(ctx).If( - clientv3.Compare(clientv3.ModRevision(key), ">", 0), - ).Then( - clientv3.OpPut(key, val, opts...), - ).Commit() - if err != nil { - return errors.Trace(err) - } - - if !txnResp.Succeeded { - return errors.NotFoundf("key %s in etcd", key) - } - - return nil -} - -// UpdateOrCreate updates a key/value, if the key does not exist then create, or update -func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl int64) error { - key = keyWithPrefix(e.rootPath, key) - - var opts []clientv3.OpOption - if ttl > 0 { - lcr, err := e.client.Lease.Grant(ctx, ttl) - if err != nil { - return errors.Trace(err) - } - - opts = []clientv3.OpOption{clientv3.WithLease(lcr.ID)} - } - - _, err := e.client.KV.Do(ctx, clientv3.OpPut(key, val, opts...)) - if err != nil { - return errors.Trace(err) - } - return nil -} - -// List returns the trie struct that constructed by the key/value with same prefix -func (e *Client) List(ctx context.Context, key string) (node *Node, revision int64, err error) { - key = keyWithPrefix(e.rootPath, key) - if !strings.HasSuffix(key, "/") { - key += "/" - } - - resp, err := e.client.KV.Get(ctx, key, clientv3.WithPrefix()) - if err != nil { - return nil, -1, errors.Trace(err) - } - - root := new(Node) - length := len(key) - for _, kv := range resp.Kvs { - key := string(kv.Key) - if len(key) <= length { - continue - } - - keyTail := key[length:] - tailNode := parseToDirTree(root, keyTail) - tailNode.Value = kv.Value - } - - return root, resp.Header.Revision, nil -} - -// Delete deletes the key/values with matching prefix or key -func (e *Client) Delete(ctx context.Context, key string, withPrefix bool) error { - key = keyWithPrefix(e.rootPath, key) - var opts []clientv3.OpOption - if withPrefix { - opts = []clientv3.OpOption{clientv3.WithPrefix()} - } - - _, err := e.client.KV.Delete(ctx, key, opts...) - if err != nil { - return errors.Trace(err) - } - - return nil -} - -// Watch watchs the events of key with prefix. -func (e *Client) Watch(ctx context.Context, prefix string, revision int64) clientv3.WatchChan { - if revision > 0 { - return e.client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision)) - } - return e.client.Watch(ctx, prefix, clientv3.WithPrefix()) -} - -// DoTxn does some operation in one transaction. -// Note: should only have one opereration for one key, otherwise will get duplicate key error. -func (e *Client) DoTxn(ctx context.Context, operations []*Operation) (int64, error) { - cmps := make([]clientv3.Cmp, 0, len(operations)) - ops := make([]clientv3.Op, 0, len(operations)) - - for _, operation := range operations { - operation.Key = keyWithPrefix(e.rootPath, operation.Key) - - if operation.TTL > 0 { - if operation.Tp == DeleteOp { - return 0, errors.Errorf("unexpected TTL in delete operation") - } - - lcr, err := e.client.Lease.Grant(ctx, operation.TTL) - if err != nil { - return 0, errors.Trace(err) - } - operation.Opts = append(operation.Opts, clientv3.WithLease(lcr.ID)) - } - - if operation.WithPrefix { - operation.Opts = append(operation.Opts, clientv3.WithPrefix()) - } - - switch operation.Tp { - case CreateOp: - cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(operation.Key), "=", 0)) - ops = append(ops, clientv3.OpPut(operation.Key, operation.Value, operation.Opts...)) - case UpdateOp: - cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(operation.Key), ">", 0)) - ops = append(ops, clientv3.OpPut(operation.Key, operation.Value, operation.Opts...)) - case DeleteOp: - ops = append(ops, clientv3.OpDelete(operation.Key, operation.Opts...)) - default: - return 0, errors.Errorf("unknown operation type %s", operation.Tp) - } - } - - txnResp, err := e.client.KV.Txn(ctx).If( - cmps..., - ).Then( - ops..., - ).Commit() - if err != nil { - return 0, errors.Trace(err) - } - - if !txnResp.Succeeded { - return 0, errors.Errorf("do transaction failed, operations: %+v", operations) - } - - return txnResp.Header.Revision, nil -} - -func parseToDirTree(root *Node, p string) *Node { - pathDirs := strings.Split(p, "/") - current := root - var next *Node - var ok bool - - for _, dir := range pathDirs { - if current.Childs == nil { - current.Childs = make(map[string]*Node) - } - - next, ok = current.Childs[dir] - if !ok { - current.Childs[dir] = new(Node) - next = current.Childs[dir] - } - - current = next - } - - return current -} - -func keyWithPrefix(prefix, key string) string { - if strings.HasPrefix(key, prefix) { - return key - } - - return path.Join(prefix, key) -} - // SetEtcdCliByNamespace is used to add an etcd namespace prefix before etcd path. func SetEtcdCliByNamespace(cli *clientv3.Client, namespacePrefix string) { cli.KV = namespace.NewKV(cli.KV, namespacePrefix) diff --git a/pkg/util/etcd/etcd_test.go b/pkg/util/etcd/etcd_test.go index bbb810b0a2..e09bd645ef 100644 --- a/pkg/util/etcd/etcd_test.go +++ b/pkg/util/etcd/etcd_test.go @@ -17,383 +17,12 @@ package etcd import ( "context" "testing" - "time" - "github.com/pingcap/errors" "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/integration" ) -var ( - etcdMockCluster *integration.ClusterV3 - etcdCli *Client - ctx context.Context -) - -func TestCreate(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - etcdClient := etcdMockCluster.RandClient() - - key := "binlogcreate/testkey" - obj := "test" - - // verify that kv pair is empty before set - getResp, err := etcdClient.KV.Get(ctx, key) - require.NoError(t, err) - require.Len(t, getResp.Kvs, 0) - - _, err = etcdCli.Create(ctx, key, obj, nil) - require.NoError(t, err) - - getResp, err = etcdClient.KV.Get(ctx, key) - require.NoError(t, err) - require.Len(t, getResp.Kvs, 1) -} - -func TestCreateWithTTL(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - key := "binlogttl/ttlkey" - obj := "ttltest" - - lcr, err := etcdCli.client.Lease.Grant(ctx, 1) - require.NoError(t, err) - opts := []clientv3.OpOption{clientv3.WithLease(lcr.ID)} - - _, err = etcdCli.Create(ctx, key, obj, opts) - require.NoError(t, err) - - time.Sleep(2 * time.Second) - _, _, err = etcdCli.Get(ctx, key) - require.True(t, errors.IsNotFound(err)) -} - -func TestCreateWithKeyExist(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - obj := "existtest" - key := "binlogexist/exist" - - etcdClient := etcdMockCluster.RandClient() - _, err := etcdClient.KV.Put(ctx, key, obj, nil...) - require.NoError(t, err) - - _, err = etcdCli.Create(ctx, key, obj, nil) - require.True(t, errors.IsAlreadyExists(err)) -} - -func TestUpdate(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - obj1 := "updatetest" - obj2 := "updatetest2" - key := "binlogupdate/updatekey" - - lcr, err := etcdCli.client.Lease.Grant(ctx, 2) - require.NoError(t, err) - - opts := []clientv3.OpOption{clientv3.WithLease(lcr.ID)} - revision0, err := etcdCli.Create(ctx, key, obj1, opts) - require.NoError(t, err) - - res, revision1, err := etcdCli.Get(ctx, key) - require.NoError(t, err) - require.Equal(t, obj1, string(res)) - require.Equal(t, revision1, revision0) - - err = etcdCli.Update(ctx, key, obj2, 3) - require.NoError(t, err) - - time.Sleep(2 * time.Second) - - // the new revision should greater than the old - res, revision2, err := etcdCli.Get(ctx, key) - require.NoError(t, err) - require.Equal(t, obj2, string(res)) - require.Greater(t, revision2, revision1) - - time.Sleep(2 * time.Second) - _, _, err = etcdCli.Get(ctx, key) - require.True(t, errors.IsNotFound(err)) -} - -func TestUpdateOrCreate(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - obj := "updatetest" - key := "binlogupdatecreate/updatekey" - err := etcdCli.UpdateOrCreate(ctx, key, obj, 3) - require.NoError(t, err) -} - -func TestList(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - key := "binloglist/testkey" - - k1 := key + "/level1" - k2 := key + "/level2" - k3 := key + "/level3" - k11 := key + "/level1/level1" - - revision1, err := etcdCli.Create(ctx, k1, k1, nil) - require.NoError(t, err) - - revision2, err := etcdCli.Create(ctx, k2, k2, nil) - require.NoError(t, err) - require.True(t, revision2 > revision1) - - revision3, err := etcdCli.Create(ctx, k3, k3, nil) - require.NoError(t, err) - require.True(t, revision3 > revision2) - - revision4, err := etcdCli.Create(ctx, k11, k11, nil) - require.NoError(t, err) - require.True(t, revision4 > revision3) - - root, revision5, err := etcdCli.List(ctx, key) - require.NoError(t, err) - require.Equal(t, k1, string(root.Childs["level1"].Value)) - require.Equal(t, k11, string(root.Childs["level1"].Childs["level1"].Value)) - require.Equal(t, k2, string(root.Childs["level2"].Value)) - require.Equal(t, k3, string(root.Childs["level3"].Value)) - - // the revision of list should equal to the latest update's revision - _, revision6, err := etcdCli.Get(ctx, k11) - require.NoError(t, err) - require.Equal(t, revision6, revision5) -} - -func TestDelete(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - key := "binlogdelete/testkey" - keys := []string{key + "/level1", key + "/level2", key + "/level1" + "/level1"} - for _, k := range keys { - _, err := etcdCli.Create(ctx, k, k, nil) - require.NoError(t, err) - } - - root, _, err := etcdCli.List(ctx, key) - require.NoError(t, err) - require.Len(t, root.Childs, 2) - - err = etcdCli.Delete(ctx, keys[1], false) - require.NoError(t, err) - - root, _, err = etcdCli.List(ctx, key) - require.NoError(t, err) - require.Len(t, root.Childs, 1) - - err = etcdCli.Delete(ctx, key, true) - require.NoError(t, err) - - root, _, err = etcdCli.List(ctx, key) - require.NoError(t, err) - require.Len(t, root.Childs, 0) -} - -func TestDoTxn(t *testing.T) { - integration.BeforeTestExternal(t) - ctx, etcdCli, etcdMockCluster = testSetup(t) - defer etcdMockCluster.Terminate(t) - // case1: create two keys in one transaction - ops := []*Operation{ - { - Tp: CreateOp, - Key: "test1", - Value: "1", - }, { - Tp: CreateOp, - Key: "test2", - Value: "2", - }, - } - revision, err := etcdCli.DoTxn(context.Background(), ops) - require.NoError(t, err) - - value1, revision1, err := etcdCli.Get(context.Background(), "test1") - require.NoError(t, err) - require.Equal(t, "1", string(value1)) - require.Equal(t, revision, revision1) - - value2, revision2, err := etcdCli.Get(context.Background(), "test2") - require.NoError(t, err) - require.Equal(t, "2", string(value2)) - require.Equal(t, revision, revision2) - - // case2: delete, update and create in one transaction - ops = []*Operation{ - { - Tp: DeleteOp, - Key: "test1", - }, { - Tp: UpdateOp, - Key: "test2", - Value: "22", - }, { - Tp: CreateOp, - Key: "test3", - Value: "3", - }, - } - - revision, err = etcdCli.DoTxn(context.Background(), ops) - require.NoError(t, err) - - _, _, err = etcdCli.Get(context.Background(), "test1") - require.Regexp(t, ".* not found", err) - - value2, revision2, err = etcdCli.Get(context.Background(), "test2") - require.NoError(t, err) - require.Equal(t, "22", string(value2)) - require.Equal(t, revision, revision2) - - value3, revision3, err := etcdCli.Get(context.Background(), "test3") - require.NoError(t, err) - require.Equal(t, "3", string(value3)) - require.Equal(t, revision, revision3) - - // case3: create keys with TTL - ops = []*Operation{ - { - Tp: CreateOp, - Key: "test4", - Value: "4", - TTL: 1, - }, { - Tp: CreateOp, - Key: "test5", - Value: "5", - }, - } - revision, err = etcdCli.DoTxn(context.Background(), ops) - require.NoError(t, err) - - value4, revision4, err := etcdCli.Get(context.Background(), "test4") - require.NoError(t, err) - require.Equal(t, "4", string(value4)) - require.Equal(t, revision, revision4) - - value5, revision5, err := etcdCli.Get(context.Background(), "test5") - require.NoError(t, err) - require.Equal(t, "5", string(value5)) - require.Equal(t, revision, revision5) - - // sleep 2 seconds and this key will be deleted - time.Sleep(2 * time.Second) - _, _, err = etcdCli.Get(context.Background(), "test4") - require.Regexp(t, ".* not found", err) - - // case4: do transaction failed because key is deleted, so can't update - ops = []*Operation{ - { - Tp: CreateOp, - Key: "test4", - Value: "4", - }, { - Tp: UpdateOp, // key test1 is deleted, so will update failed - Key: "test1", - Value: "11", - }, - } - - _, err = etcdCli.DoTxn(context.Background(), ops) - require.Regexp(t, "do transaction failed.*", err) - - _, _, err = etcdCli.Get(context.Background(), "test4") - require.Regexp(t, ".* not found", err) - - // case5: do transaction failed because can't operate one key in one transaction - ops = []*Operation{ - { - Tp: CreateOp, - Key: "test6", - Value: "6", - }, { - Tp: UpdateOp, - Key: "test6", - Value: "66", - }, - } - - _, err = etcdCli.DoTxn(context.Background(), ops) - require.Regexp(t, "etcdserver: duplicate key given in txn request", err) - - _, _, err = etcdCli.Get(context.Background(), "test6") - require.Regexp(t, ".* not found", err) - - // case6: do transaction failed because can't create an existing key - ops = []*Operation{ - { - Tp: CreateOp, - Key: "test2", // already exist - Value: "222", - }, { - Tp: UpdateOp, - Key: "test5", - Value: "555", - }, - } - - _, err = etcdCli.DoTxn(context.Background(), ops) - require.Regexp(t, "do transaction failed.*", err) - - value2, _, err = etcdCli.Get(context.Background(), "test2") - require.NoError(t, err) - require.Equal(t, "22", string(value2)) - - value5, _, err = etcdCli.Get(context.Background(), "test5") - require.NoError(t, err) - require.Equal(t, "5", string(value5)) - - // case7: delete not exist key but will do transaction success - ops = []*Operation{ - { - Tp: DeleteOp, - Key: "test7", // not exist - }, { - Tp: CreateOp, - Key: "test8", - Value: "8", - }, - } - - _, err = etcdCli.DoTxn(context.Background(), ops) - require.NoError(t, err) - - value8, _, err := etcdCli.Get(context.Background(), "test8") - require.NoError(t, err) - require.Equal(t, "8", string(value8)) - - // case8: do transaction failed because can't set TTL for delete operation - ops = []*Operation{ - { - Tp: DeleteOp, - Key: "test8", - TTL: 1, - }, - } - - _, err = etcdCli.DoTxn(context.Background(), ops) - require.Regexp(t, "unexpected TTL in delete operation", err) -} - -func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3) { - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - etcd := NewClient(cluster.RandClient(), "binlog") - return context.Background(), etcd, cluster -} - func testSetupOriginal(t *testing.T) (context.Context, *clientv3.Client, *integration.ClusterV3) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) return context.Background(), cluster.RandClient(), cluster diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index 8bfbb26a60..c7553af7fc 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -1052,7 +1052,7 @@ func (s *mockGCSSuite) TestRegisterTask() { s.ErrorContains(err, "there is active job on the target table already") etcdKey = fmt.Sprintf("/tidb/brie/import/import-into/%d", storage.TestLastTaskID.Load()) s.Eventually(func() bool { - resp, err2 := client.GetClient().Get(context.Background(), etcdKey) + resp, err2 := client.Get(context.Background(), etcdKey) s.NoError(err2) return len(resp.Kvs) == 1 }, maxWaitTime, 300*time.Millisecond) @@ -1069,7 +1069,7 @@ func (s *mockGCSSuite) TestRegisterTask() { s.tk.MustQuery("SELECT * FROM load_data.register_task;").Sort().Check(testkit.Rows("1 11 111")) // the task should be unregistered - resp, err2 := client.GetClient().Get(context.Background(), etcdKey) + resp, err2 := client.Get(context.Background(), etcdKey) s.NoError(err2) s.Len(resp.Kvs, 0) } diff --git a/tests/realtikvtest/importintotest/precheck_test.go b/tests/realtikvtest/importintotest/precheck_test.go index 6c5fac46b5..f552cb48b0 100644 --- a/tests/realtikvtest/importintotest/precheck_test.go +++ b/tests/realtikvtest/importintotest/precheck_test.go @@ -92,10 +92,10 @@ func (s *mockGCSSuite) TestPreCheckCDCPiTRTasks() { pitrTaskInfo := brpb.StreamBackupTaskInfo{Name: "dummy-task"} data, err := pitrTaskInfo.Marshal() s.NoError(err) - _, err = client.GetClient().Put(context.Background(), pitrKey, string(data)) + _, err = client.Put(context.Background(), pitrKey, string(data)) s.NoError(err) s.T().Cleanup(func() { - _, err2 := client.GetClient().Delete(context.Background(), pitrKey) + _, err2 := client.Delete(context.Background(), pitrKey) s.NoError(err2) }) sql := fmt.Sprintf(`IMPORT INTO t FROM 'gs://precheck-cdc-pitr/file.csv?endpoint=%s'`, gcsEndpoint) @@ -116,13 +116,13 @@ func (s *mockGCSSuite) TestPreCheckCDCPiTRTasks() { s.tk.MustExec("import into dst from select * from t with disable_precheck") s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 test1 11")) - _, err2 := client.GetClient().Delete(context.Background(), pitrKey) + _, err2 := client.Delete(context.Background(), pitrKey) s.NoError(err2) cdcKey := "/tidb/cdc/cluster-123/test/changefeed/info/feed-test" - _, err = client.GetClient().Put(context.Background(), cdcKey, `{"state": "normal"}`) + _, err = client.Put(context.Background(), cdcKey, `{"state": "normal"}`) s.NoError(err) s.T().Cleanup(func() { - _, err2 := client.GetClient().Delete(context.Background(), cdcKey) + _, err2 := client.Delete(context.Background(), cdcKey) s.NoError(err2) }) s.tk.MustExec("truncate table t")