From bd5c1945819e0829dbefd1d8e4e981c9a565e650 Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:48:12 +0800 Subject: [PATCH] ImportInto: add import sdk for cloud (#61545) ref pingcap/tidb#61264 --- pkg/executor/importer/main_test.go | 2 + pkg/importsdk/BUILD.bazel | 37 ++ pkg/importsdk/sdk.go | 490 +++++++++++++++ pkg/importsdk/sdk_test.go | 573 ++++++++++++++++++ pkg/lightning/mydump/loader.go | 16 + .../realtikvtest/importintotest4/BUILD.bazel | 3 + .../importintotest4/cloud_sdk_test.go | 166 +++++ 7 files changed, 1287 insertions(+) create mode 100644 pkg/importsdk/BUILD.bazel create mode 100644 pkg/importsdk/sdk.go create mode 100644 pkg/importsdk/sdk_test.go create mode 100644 tests/realtikvtest/importintotest4/cloud_sdk_test.go diff --git a/pkg/executor/importer/main_test.go b/pkg/executor/importer/main_test.go index 3a0a32f8d6..b8ee073c35 100644 --- a/pkg/executor/importer/main_test.go +++ b/pkg/executor/importer/main_test.go @@ -31,6 +31,8 @@ func TestMain(m *testing.M) { goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("syscall.syscall"), + goleak.IgnoreTopFunction("net.(*netFD).connect.func2"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), } goleak.VerifyTestMain(m, opts...) } diff --git a/pkg/importsdk/BUILD.bazel b/pkg/importsdk/BUILD.bazel new file mode 100644 index 0000000000..fa6759a8d5 --- /dev/null +++ b/pkg/importsdk/BUILD.bazel @@ -0,0 +1,37 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "importsdk", + srcs = ["sdk.go"], + importpath = "github.com/pingcap/tidb/pkg/importsdk", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/storage", + "//pkg/lightning/config", + "//pkg/lightning/log", + "//pkg/lightning/mydump", + "//pkg/parser/mysql", + "@com_github_pingcap_errors//:errors", + ], +) + +go_test( + name = "importsdk_test", + timeout = "short", + srcs = ["sdk_test.go"], + embed = [":importsdk"], + flaky = True, + shard_count = 9, + deps = [ + "//pkg/lightning/config", + "//pkg/lightning/log", + "//pkg/lightning/mydump", + "//pkg/parser/mysql", + "//pkg/util/table-filter", + "@com_github_data_dog_go_sqlmock//:go-sqlmock", + "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_ngaut_pools//:pools", + "@com_github_stretchr_testify//require", + "@com_github_stretchr_testify//suite", + ], +) diff --git a/pkg/importsdk/sdk.go b/pkg/importsdk/sdk.go new file mode 100644 index 0000000000..b14a5e7247 --- /dev/null +++ b/pkg/importsdk/sdk.go @@ -0,0 +1,490 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importsdk + +import ( + "context" + "database/sql" + "path/filepath" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/lightning/config" + "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/lightning/mydump" + "github.com/pingcap/tidb/pkg/parser/mysql" +) + +// SDK defines the interface for cloud import services +type SDK interface { + // CreateSchemasAndTables creates all database schemas and tables from source path + CreateSchemasAndTables(ctx context.Context) error + + // GetTableMetas returns metadata for all tables in the source path + GetTableMetas(ctx context.Context) ([]*TableMeta, error) + + // GetTableMetaByName returns metadata for a specific table + GetTableMetaByName(ctx context.Context, schema, table string) (*TableMeta, error) + + // GetTotalSize returns the cumulative size (in bytes) of all data files under the source path + GetTotalSize(ctx context.Context) int64 + + // Close releases resources used by the SDK + Close() error +} + +// TableMeta contains metadata for a table to be imported +type TableMeta struct { + Database string + Table string + DataFiles []DataFileMeta + TotalSize int64 // In bytes + WildcardPath string // Wildcard pattern that matches only this table's data files + SchemaFile string // Path to the table schema file, if available +} + +// DataFileMeta contains metadata for a data file +type DataFileMeta struct { + Path string + Size int64 + Format mydump.SourceType + Compression mydump.Compression +} + +// ImportSDK implements SDK interface +type ImportSDK struct { + sourcePath string + db *sql.DB + store storage.ExternalStorage + loader *mydump.MDLoader + logger log.Logger + config *sdkConfig +} + +// NewImportSDK creates a new CloudImportSDK instance +func NewImportSDK(ctx context.Context, sourcePath string, db *sql.DB, options ...SDKOption) (SDK, error) { + cfg := defaultSDKConfig() + for _, opt := range options { + opt(cfg) + } + + u, err := storage.ParseBackend(sourcePath, nil) + if err != nil { + return nil, errors.Annotatef(err, "failed to parse storage backend URL (source=%s). Please verify the URL format and credentials", sourcePath) + } + store, err := storage.New(ctx, u, &storage.ExternalStorageOptions{}) + if err != nil { + return nil, errors.Annotatef(err, "failed to create external storage (source=%s). Check network/connectivity and permissions", sourcePath) + } + + ldrCfg := mydump.LoaderConfig{ + SourceURL: sourcePath, + Filter: cfg.filter, + FileRouters: cfg.fileRouteRules, + DefaultFileRules: len(cfg.fileRouteRules) == 0, + CharacterSet: cfg.charset, + } + + loader, err := mydump.NewLoaderWithStore(ctx, ldrCfg, store) + if err != nil { + return nil, errors.Annotatef(err, "failed to create MyDump loader (source=%s, charset=%s, filter=%v). Please check dump layout and router rules", sourcePath, cfg.charset, cfg.filter) + } + + return &ImportSDK{ + sourcePath: sourcePath, + db: db, + store: store, + loader: loader, + logger: cfg.logger, + config: cfg, + }, nil +} + +// SDKOption customizes the SDK configuration +type SDKOption func(*sdkConfig) + +type sdkConfig struct { + // Loader options + concurrency int + sqlMode mysql.SQLMode + fileRouteRules []*config.FileRouteRule + filter []string + charset string + + // General options + logger log.Logger +} + +func defaultSDKConfig() *sdkConfig { + return &sdkConfig{ + concurrency: 4, + filter: config.GetDefaultFilter(), + logger: log.L(), + charset: "auto", + } +} + +// WithConcurrency sets the number of concurrent DB/Table creation workers. +func WithConcurrency(n int) SDKOption { + return func(cfg *sdkConfig) { + if n > 0 { + cfg.concurrency = n + } + } +} + +// WithLogger specifies a custom logger +func WithLogger(logger log.Logger) SDKOption { + return func(cfg *sdkConfig) { + cfg.logger = logger + } +} + +// WithSQLMode specifies the SQL mode for schema parsing +func WithSQLMode(mode mysql.SQLMode) SDKOption { + return func(cfg *sdkConfig) { + cfg.sqlMode = mode + } +} + +// WithFilter specifies a filter for the loader +func WithFilter(filter []string) SDKOption { + return func(cfg *sdkConfig) { + cfg.filter = filter + } +} + +// WithFileRouters specifies custom file routing rules +func WithFileRouters(routers []*config.FileRouteRule) SDKOption { + return func(cfg *sdkConfig) { + cfg.fileRouteRules = routers + } +} + +// WithCharset specifies the character set for import (default "auto"). +func WithCharset(cs string) SDKOption { + return func(cfg *sdkConfig) { + if cs != "" { + cfg.charset = cs + } + } +} + +// CreateSchemasAndTables implements the CloudImportSDK interface +func (sdk *ImportSDK) CreateSchemasAndTables(ctx context.Context) error { + dbMetas := sdk.loader.GetDatabases() + if len(dbMetas) == 0 { + return errors.Annotatef(ErrNoDatabasesFound, "source=%s. Ensure the path contains valid dump files (*.sql, *.csv, *.parquet, etc.) and filter rules are correct", sdk.sourcePath) + } + + // Create all schemas and tables + importer := mydump.NewSchemaImporter( + sdk.logger, + sdk.config.sqlMode, + sdk.db, + sdk.store, + sdk.config.concurrency, + ) + + err := importer.Run(ctx, dbMetas) + if err != nil { + return errors.Annotatef(err, "creating schemas and tables failed (source=%s, db_count=%d, concurrency=%d)", sdk.sourcePath, len(dbMetas), sdk.config.concurrency) + } + + return nil +} + +// GetTableMetas implements the CloudImportSDK interface +func (sdk *ImportSDK) GetTableMetas(context.Context) ([]*TableMeta, error) { + dbMetas := sdk.loader.GetDatabases() + allFiles := sdk.loader.GetAllFiles() + var results []*TableMeta + for _, dbMeta := range dbMetas { + for _, tblMeta := range dbMeta.Tables { + tableMeta, err := sdk.buildTableMeta(dbMeta, tblMeta, allFiles) + if err != nil { + return nil, errors.Wrapf(err, "failed to build metadata for table %s.%s", + dbMeta.Name, tblMeta.Name) + } + results = append(results, tableMeta) + } + } + + return results, nil +} + +// GetTableMetaByName implements CloudImportSDK interface +func (sdk *ImportSDK) GetTableMetaByName(_ context.Context, schema, table string) (*TableMeta, error) { + dbMetas := sdk.loader.GetDatabases() + allFiles := sdk.loader.GetAllFiles() + // Find the specific table + for _, dbMeta := range dbMetas { + if dbMeta.Name != schema { + continue + } + + for _, tblMeta := range dbMeta.Tables { + if tblMeta.Name != table { + continue + } + + return sdk.buildTableMeta(dbMeta, tblMeta, allFiles) + } + + return nil, errors.Annotatef(ErrTableNotFound, "schema=%s, table=%s", schema, table) + } + + return nil, errors.Annotatef(ErrSchemaNotFound, "schema=%s", schema) +} + +// GetTotalSize implements CloudImportSDK interface +func (sdk *ImportSDK) GetTotalSize(ctx context.Context) int64 { + var total int64 + dbMetas := sdk.loader.GetDatabases() + for _, dbMeta := range dbMetas { + for _, tblMeta := range dbMeta.Tables { + total += tblMeta.TotalSize + } + } + return total +} + +// buildTableMeta creates a TableMeta from database and table metadata +func (sdk *ImportSDK) buildTableMeta( + dbMeta *mydump.MDDatabaseMeta, + tblMeta *mydump.MDTableMeta, + allDataFiles map[string]mydump.FileInfo, +) (*TableMeta, error) { + tableMeta := &TableMeta{ + Database: dbMeta.Name, + Table: tblMeta.Name, + DataFiles: make([]DataFileMeta, 0, len(tblMeta.DataFiles)), + SchemaFile: tblMeta.SchemaFile.FileMeta.Path, + } + + // Process data files + dataFiles, totalSize := processDataFiles(tblMeta.DataFiles) + tableMeta.DataFiles = dataFiles + tableMeta.TotalSize = totalSize + + wildcard, err := generateWildcardPath(tblMeta.DataFiles, allDataFiles) + if err != nil { + return nil, errors.Annotatef(err, "failed to build wildcard for table=%s.%s", dbMeta.Name, tblMeta.Name) + } + tableMeta.WildcardPath = strings.TrimSuffix(sdk.store.URI(), "/") + "/" + wildcard + + return tableMeta, nil +} + +// Close implements CloudImportSDK interface +func (sdk *ImportSDK) Close() error { + // close external storage + if sdk.store != nil { + sdk.store.Close() + } + return nil +} + +// processDataFiles converts mydump data files to DataFileMeta and calculates total size +func processDataFiles(files []mydump.FileInfo) ([]DataFileMeta, int64) { + dataFiles := make([]DataFileMeta, 0, len(files)) + var totalSize int64 + + for _, dataFile := range files { + fileMeta := createDataFileMeta(dataFile) + dataFiles = append(dataFiles, fileMeta) + totalSize += dataFile.FileMeta.RealSize + } + + return dataFiles, totalSize +} + +// createDataFileMeta creates a DataFileMeta from a mydump.DataFile +func createDataFileMeta(file mydump.FileInfo) DataFileMeta { + return DataFileMeta{ + Path: file.FileMeta.Path, + Size: file.FileMeta.RealSize, + Format: file.FileMeta.Type, + Compression: file.FileMeta.Compression, + } +} + +// generateWildcardPath creates a wildcard pattern path that matches only this table's files +func generateWildcardPath( + files []mydump.FileInfo, + allFiles map[string]mydump.FileInfo, +) (string, error) { + tableFiles := make(map[string]struct{}, len(files)) + for _, df := range files { + tableFiles[df.FileMeta.Path] = struct{}{} + } + + if len(files) == 0 { + return "", errors.Annotate(ErrNoTableDataFiles, "cannot generate wildcard pattern because the table has no data files") + } + + // If there's only one file, we can just return its path + if len(files) == 1 { + return files[0].FileMeta.Path, nil + } + + // Try Mydumper-specific pattern first + p := generateMydumperPattern(files[0]) + if p != "" && isValidPattern(p, tableFiles, allFiles) { + return p, nil + } + + // Fallback to generic prefix/suffix pattern + paths := make([]string, 0, len(files)) + for _, file := range files { + paths = append(paths, file.FileMeta.Path) + } + p = generatePrefixSuffixPattern(paths) + if p != "" && isValidPattern(p, tableFiles, allFiles) { + return p, nil + } + return "", errors.Annotatef(ErrWildcardNotSpecific, "failed to find a wildcard that matches all and only the table's files.") +} + +// isValidPattern checks if a wildcard pattern matches only the table's files +func isValidPattern(pattern string, tableFiles map[string]struct{}, allFiles map[string]mydump.FileInfo) bool { + if pattern == "" { + return false + } + + for path := range allFiles { + isMatch, err := filepath.Match(pattern, path) + if err != nil { + return false // Invalid pattern + } + _, isTableFile := tableFiles[path] + + // If pattern matches a file that's not from our table, it's invalid + if isMatch && !isTableFile { + return false + } + + // If pattern doesn't match our table's file, it's also invalid + if !isMatch && isTableFile { + return false + } + } + + return true +} + +// generateMydumperPattern generates a wildcard pattern for Mydumper-formatted data files +// belonging to a specific table, based on their naming convention. +// It returns a pattern string that matches all data files for the table, or an empty string if not applicable. +func generateMydumperPattern(file mydump.FileInfo) string { + dbName, tableName := file.TableName.Schema, file.TableName.Name + if dbName == "" || tableName == "" { + return "" + } + + // compute dirPrefix and basename + full := file.FileMeta.Path + dirPrefix, name := "", full + if idx := strings.LastIndex(full, "/"); idx >= 0 { + dirPrefix = full[:idx+1] + name = full[idx+1:] + } + + // compression ext from filename when compression exists (last suffix like .gz/.zst) + compExt := "" + if file.FileMeta.Compression != mydump.CompressionNone { + compExt = filepath.Ext(name) + } + + // data ext after stripping compression ext + base := strings.TrimSuffix(name, compExt) + dataExt := filepath.Ext(base) + return dirPrefix + dbName + "." + tableName + ".*" + dataExt + compExt +} + +// longestCommonPrefix finds the longest string that is a prefix of all strings in the slice +func longestCommonPrefix(strs []string) string { + if len(strs) == 0 { + return "" + } + + prefix := strs[0] + for _, s := range strs[1:] { + i := 0 + for i < len(prefix) && i < len(s) && prefix[i] == s[i] { + i++ + } + prefix = prefix[:i] + if prefix == "" { + break + } + } + + return prefix +} + +// longestCommonSuffix finds the longest string that is a suffix of all strings in the slice, starting after the given prefix length +func longestCommonSuffix(strs []string, prefixLen int) string { + if len(strs) == 0 { + return "" + } + + suffix := strs[0][prefixLen:] + for _, s := range strs[1:] { + remaining := s[prefixLen:] + i := 0 + for i < len(suffix) && i < len(remaining) && suffix[len(suffix)-i-1] == remaining[len(remaining)-i-1] { + i++ + } + suffix = suffix[len(suffix)-i:] + if suffix == "" { + break + } + } + + return suffix +} + +// generatePrefixSuffixPattern returns a wildcard pattern that matches all and only the given paths +// by finding the longest common prefix and suffix among them, and placing a '*' wildcard in between. +func generatePrefixSuffixPattern(paths []string) string { + if len(paths) == 0 { + return "" + } + if len(paths) == 1 { + return paths[0] + } + + prefix := longestCommonPrefix(paths) + suffix := longestCommonSuffix(paths, len(prefix)) + + return prefix + "*" + suffix +} + +// TODO: add error code and doc for cloud sdk +// Sentinel errors to categorize common failure scenarios for clearer user messages. +var ( + // ErrNoDatabasesFound indicates that the dump source contains no recognizable databases. + ErrNoDatabasesFound = errors.New("no databases found in the source path") + // ErrSchemaNotFound indicates the target schema doesn't exist in the dump source. + ErrSchemaNotFound = errors.New("schema not found") + // ErrTableNotFound indicates the target table doesn't exist in the dump source. + ErrTableNotFound = errors.New("table not found") + // ErrNoTableDataFiles indicates a table has zero data files and thus cannot proceed. + ErrNoTableDataFiles = errors.New("no data files for table") + // ErrWildcardNotSpecific indicates a wildcard cannot uniquely match the table's files. + ErrWildcardNotSpecific = errors.New("cannot generate a unique wildcard pattern for the table's data files") +) diff --git a/pkg/importsdk/sdk_test.go b/pkg/importsdk/sdk_test.go new file mode 100644 index 0000000000..aee9ef1642 --- /dev/null +++ b/pkg/importsdk/sdk_test.go @@ -0,0 +1,573 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importsdk + +import ( + "context" + "fmt" + "regexp" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/ngaut/pools" + "github.com/pingcap/tidb/pkg/lightning/config" + "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/lightning/mydump" + "github.com/pingcap/tidb/pkg/parser/mysql" + filter "github.com/pingcap/tidb/pkg/util/table-filter" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type mockGCSSuite struct { + suite.Suite + + server *fakestorage.Server + pool *pools.ResourcePool +} + +var ( + gcsHost = "127.0.0.1" + gcsPort = uint16(4443) + // for fake gcs server, we must use this endpoint format + // NOTE: must end with '/' + gcsEndpointFormat = "http://%s:%d/storage/v1/" + gcsEndpoint = fmt.Sprintf(gcsEndpointFormat, gcsHost, gcsPort) +) + +func TestCloudSDK(t *testing.T) { + suite.Run(t, &mockGCSSuite{}) +} + +func (s *mockGCSSuite) SetupSuite() { + var err error + opt := fakestorage.Options{ + Scheme: "http", + Host: gcsHost, + Port: gcsPort, + PublicHost: gcsHost, + } + s.server, err = fakestorage.NewServerWithOptions(opt) + s.NoError(err) +} + +func (s *mockGCSSuite) TearDownSuite() { + s.server.Stop() +} + +func (s *mockGCSSuite) TestDumplingSource() { + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1-schema-create.sql"}, + Content: []byte("CREATE DATABASE IF NOT EXISTS db1;\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2-schema-create.sql"}, + Content: []byte("CREATE DATABASE IF NOT EXISTS db2;\n"), + }) + // table1 in db1 + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1.tb1-schema.sql"}, + Content: []byte("CREATE TABLE IF NOT EXISTS db1.tb1 (a INT, b VARCHAR(10));\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1.tb1.001.sql"}, + Content: []byte("INSERT INTO db1.tb1 VALUES (1,'a'),(2,'b');\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1.tb1.002.sql"}, + Content: []byte("INSERT INTO db1.tb1 VALUES (3,'c'),(4,'d');\n"), + }) + // table2 in db2 + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2.tb2-schema.sql"}, + Content: []byte("CREATE TABLE IF NOT EXISTS db2.tb2 (x INT, y VARCHAR(10));\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2.tb2.001.sql"}, + Content: []byte("INSERT INTO db2.tb2 VALUES (5,'e'),(6,'f');\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2.tb2.002.sql"}, + Content: []byte("INSERT INTO db2.tb2 VALUES (7,'g'),(8,'h');\n"), + }) + + db, mock, err := sqlmock.New() + s.NoError(err) + defer db.Close() + + mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`). + WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"})) + mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db1`;"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db2`;"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db1`.`tb1` (`a` INT,`b` VARCHAR(10));")). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db2`.`tb2` (`x` INT,`y` VARCHAR(10));")). + WillReturnResult(sqlmock.NewResult(0, 1)) + + importSDK, err := NewImportSDK( + context.Background(), + fmt.Sprintf("gs://dumpling?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint), + db, + WithConcurrency(1), + ) + s.NoError(err) + defer importSDK.Close() + + err = importSDK.CreateSchemasAndTables(context.Background()) + s.NoError(err) + + tablesMeta, err := importSDK.GetTableMetas(context.Background()) + s.NoError(err) + s.Len(tablesMeta, 2) + + expected1 := &TableMeta{ + Database: "db1", + Table: "tb1", + SchemaFile: "db1.tb1-schema.sql", + DataFiles: []DataFileMeta{ + {Path: "db1.tb1.001.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone}, + {Path: "db1.tb1.002.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone}, + }, + TotalSize: 88, + WildcardPath: "gcs://dumpling/db1.tb1.*.sql", + } + expected2 := &TableMeta{ + Database: "db2", + Table: "tb2", + SchemaFile: "db2.tb2-schema.sql", + DataFiles: []DataFileMeta{ + {Path: "db2.tb2.001.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone}, + {Path: "db2.tb2.002.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone}, + }, + TotalSize: 88, + WildcardPath: "gcs://dumpling/db2.tb2.*.sql", + } + s.Equal(expected1, tablesMeta[0]) + s.Equal(expected2, tablesMeta[1]) + + // verify GetTableMetaByName for each db + tm1, err := importSDK.GetTableMetaByName(context.Background(), "db1", "tb1") + s.NoError(err) + s.Equal(tm1, expected1) + tm2, err := importSDK.GetTableMetaByName(context.Background(), "db2", "tb2") + s.NoError(err) + s.Equal(tm2, expected2) + + totalSize := importSDK.GetTotalSize(context.Background()) + s.Equal(totalSize, int64(176)) + + // check meets expectations + err = mock.ExpectationsWereMet() + s.NoError(err) +} + +func (s *mockGCSSuite) TestCSVSource() { + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1-schema-create.sql"}, + Content: []byte("CREATE DATABASE IF NOT EXISTS db1;\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2-schema-create.sql"}, + Content: []byte("CREATE DATABASE IF NOT EXISTS db2;\n"), + }) + // table1 in db1 + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1.tb1-schema.sql"}, + Content: []byte("CREATE TABLE IF NOT EXISTS db1.tb1 (a INT, b VARCHAR(10));\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1.tb1.001.csv"}, + Content: []byte("1,a\n2,b\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1.tb1.002.csv"}, + Content: []byte("3,c\n4,d\n"), + }) + // table2 in db2 + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2.tb2-schema.sql"}, + Content: []byte("CREATE TABLE IF NOT EXISTS db2.tb2 (x INT, y VARCHAR(10));\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2.tb2.001.csv"}, + Content: []byte("5,e\n6,f\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2.tb2.002.csv"}, + Content: []byte("7,g\n8,h\n"), + }) + + db, mock, err := sqlmock.New() + s.NoError(err) + defer db.Close() + + mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`). + WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"})) + mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db1`;"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db2`;"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db1`.`tb1` (`a` INT,`b` VARCHAR(10));")). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db2`.`tb2` (`x` INT,`y` VARCHAR(10));")). + WillReturnResult(sqlmock.NewResult(0, 1)) + + importSDK, err := NewImportSDK( + context.Background(), + fmt.Sprintf("gs://csv?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint), + db, + WithConcurrency(1), + ) + s.NoError(err) + defer importSDK.Close() + + err = importSDK.CreateSchemasAndTables(context.Background()) + s.NoError(err) + + tablesMeta, err := importSDK.GetTableMetas(context.Background()) + s.NoError(err) + s.Len(tablesMeta, 2) + + expected1 := &TableMeta{ + Database: "db1", + Table: "tb1", + SchemaFile: "db1.tb1-schema.sql", + DataFiles: []DataFileMeta{ + {Path: "db1.tb1.001.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone}, + {Path: "db1.tb1.002.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone}, + }, + TotalSize: 16, + WildcardPath: "gcs://csv/db1.tb1.*.csv", + } + expected2 := &TableMeta{ + Database: "db2", + Table: "tb2", + SchemaFile: "db2.tb2-schema.sql", + DataFiles: []DataFileMeta{ + {Path: "db2.tb2.001.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone}, + {Path: "db2.tb2.002.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone}, + }, + TotalSize: 16, + WildcardPath: "gcs://csv/db2.tb2.*.csv", + } + s.Equal(expected1, tablesMeta[0]) + s.Equal(expected2, tablesMeta[1]) + + // verify GetTableMetaByName for each db + tm1, err := importSDK.GetTableMetaByName(context.Background(), "db1", "tb1") + s.NoError(err) + s.Equal(tm1, expected1) + tm2, err := importSDK.GetTableMetaByName(context.Background(), "db2", "tb2") + s.NoError(err) + s.Equal(tm2, expected2) + + totalSize := importSDK.GetTotalSize(context.Background()) + s.Equal(totalSize, int64(32)) + + // check meets expectations + err = mock.ExpectationsWereMet() + s.NoError(err) +} + +func (s *mockGCSSuite) TestOnlyDataFiles() { + s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "onlydata"}) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "onlydata", Name: "part1.csv"}, + Content: []byte("a,b\n1,a\n2,b\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "onlydata", Name: "part2.csv"}, + Content: []byte("a,b\n3,c\n4,d\n"), + }) + + db, mock, err := sqlmock.New() + s.NoError(err) + defer db.Close() + + mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`). + WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"})) + mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db`;"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SHOW CREATE TABLE `db`.`tb`"). + WillReturnRows(sqlmock.NewRows([]string{"Create Table"}).AddRow("CREATE TABLE `db`.`tb` (`a` INT, `b` VARCHAR(10));")) + importSDK, err := NewImportSDK( + context.Background(), + fmt.Sprintf("gs://onlydata?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint), + db, + WithCharset("utf8"), + WithConcurrency(8), + WithFilter([]string{"*.*"}), + WithSQLMode(mysql.ModeANSIQuotes), + WithLogger(log.L()), + WithFileRouters([]*config.FileRouteRule{ + { + Pattern: `.*\.csv$`, + Schema: "db", + Table: "tb", + Type: "csv", + }, + }), + ) + s.NoError(err) + defer importSDK.Close() + + err = importSDK.CreateSchemasAndTables(context.Background()) + s.NoError(err) + + tablesMeta, err := importSDK.GetTableMetas(context.Background()) + s.NoError(err) + s.Len(tablesMeta, 1) + s.Equal(&TableMeta{ + Database: "db", + Table: "tb", + SchemaFile: "", + DataFiles: []DataFileMeta{ + {Path: "part1.csv", Size: 12, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone}, + {Path: "part2.csv", Size: 12, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone}, + }, + TotalSize: 24, + WildcardPath: "gcs://onlydata/part*.csv", + }, tablesMeta[0]) + + tableMeta, err := importSDK.GetTableMetaByName(context.Background(), "db", "tb") + s.NoError(err) + s.Equal(tableMeta, tablesMeta[0]) + + totalSize := importSDK.GetTotalSize(context.Background()) + s.Equal(totalSize, int64(24)) + + err = mock.ExpectationsWereMet() + s.NoError(err) +} + +func TestLongestCommonPrefix(t *testing.T) { + strs := []string{"s3://bucket/foo/bar/baz1", "s3://bucket/foo/bar/baz2", "s3://bucket/foo/bar/baz"} + p := longestCommonPrefix(strs) + require.Equal(t, "s3://bucket/foo/bar/baz", p) + + // no common prefix + require.Equal(t, "", longestCommonPrefix([]string{"a", "b"})) + + // empty inputs + require.Equal(t, "", longestCommonPrefix(nil)) + require.Equal(t, "", longestCommonPrefix([]string{})) +} + +func TestLongestCommonSuffix(t *testing.T) { + strs := []string{"abcXYZ", "defXYZ", "XYZ"} + s := longestCommonSuffix(strs, 0) + require.Equal(t, "XYZ", s) + + // no common suffix + require.Equal(t, "", longestCommonSuffix([]string{"a", "b"}, 0)) + + // empty inputs + require.Equal(t, "", longestCommonSuffix(nil, 0)) + require.Equal(t, "", longestCommonSuffix([]string{}, 0)) + + // same prefix + require.Equal(t, "", longestCommonSuffix([]string{"abc", "abc"}, 3)) + require.Equal(t, "f", longestCommonSuffix([]string{"abcdf", "abcef"}, 3)) +} + +func TestGeneratePrefixSuffixPattern(t *testing.T) { + paths := []string{"pre_middle_suf", "pre_most_suf"} + pattern := generatePrefixSuffixPattern(paths) + // common prefix "pre_m", suffix "_suf" + require.Equal(t, "pre_m*_suf", pattern) + + // empty inputs + require.Equal(t, "", generatePrefixSuffixPattern(nil)) + require.Equal(t, "", generatePrefixSuffixPattern([]string{})) + + // only one file + require.Equal(t, "pre_middle_suf", generatePrefixSuffixPattern([]string{"pre_middle_suf"})) + + // no common prefix/suffix + paths2 := []string{"foo", "bar"} + require.Equal(t, "*", generatePrefixSuffixPattern(paths2)) + + // overlapping prefix/suffix + paths3 := []string{"aaabaaa", "aaa"} + require.Equal(t, "aaa*", generatePrefixSuffixPattern(paths3)) +} + +func generateFileMetas(t *testing.T, paths []string) []mydump.FileInfo { + t.Helper() + + files := make([]mydump.FileInfo, 0, len(paths)) + fileRouter, err := mydump.NewDefaultFileRouter(log.L()) + require.NoError(t, err) + for _, p := range paths { + res, err := fileRouter.Route(p) + require.NoError(t, err) + files = append(files, mydump.FileInfo{ + TableName: res.Table, + FileMeta: mydump.SourceFileMeta{ + Path: p, + Type: res.Type, + Compression: res.Compression, + SortKey: res.Key, + }, + }) + } + return files +} + +func TestGenerateMydumperPattern(t *testing.T) { + paths := []string{"db.tb.0001.sql", "db.tb.0002.sql"} + p := generateMydumperPattern(generateFileMetas(t, paths)[0]) + require.Equal(t, "db.tb.*.sql", p) + + paths2 := []string{"s3://bucket/dir/db.tb.0001.sql", "s3://bucket/dir/db.tb.0002.sql"} + p2 := generateMydumperPattern(generateFileMetas(t, paths2)[0]) + require.Equal(t, "s3://bucket/dir/db.tb.*.sql", p2) + + // not mydumper pattern + require.Equal(t, "", generateMydumperPattern(mydump.FileInfo{ + TableName: filter.Table{}, + })) +} + +func TestCreateDataFileMeta(t *testing.T) { + fi := mydump.FileInfo{ + TableName: filter.Table{ + Schema: "db", + Name: "table", + }, + FileMeta: mydump.SourceFileMeta{ + Path: "s3://bucket/path/to/f", + FileSize: 123, + Type: mydump.SourceTypeCSV, + Compression: mydump.CompressionGZ, + RealSize: 456, + }, + } + df := createDataFileMeta(fi) + require.Equal(t, "s3://bucket/path/to/f", df.Path) + require.Equal(t, int64(456), df.Size) + require.Equal(t, mydump.SourceTypeCSV, df.Format) + require.Equal(t, mydump.CompressionGZ, df.Compression) +} + +func TestProcessDataFiles(t *testing.T) { + files := []mydump.FileInfo{ + {FileMeta: mydump.SourceFileMeta{Path: "s3://bucket/a", RealSize: 10}}, + {FileMeta: mydump.SourceFileMeta{Path: "s3://bucket/b", RealSize: 20}}, + } + dfm, total := processDataFiles(files) + require.Len(t, dfm, 2) + require.Equal(t, int64(30), total) + require.Equal(t, "s3://bucket/a", dfm[0].Path) + require.Equal(t, "s3://bucket/b", dfm[1].Path) +} + +func TestValidatePattern(t *testing.T) { + tableFiles := map[string]struct{}{ + "a.txt": {}, "b.txt": {}, + } + // only table files in allFiles + smallAll := map[string]mydump.FileInfo{ + "a.txt": {}, "b.txt": {}, + } + require.True(t, isValidPattern("*.txt", tableFiles, smallAll)) + + // allFiles includes an extra file => invalid + fullAll := map[string]mydump.FileInfo{ + "a.txt": {}, "b.txt": {}, "c.txt": {}, + } + require.False(t, isValidPattern("*.txt", tableFiles, fullAll)) + + // If pattern doesn't match our table's file, it's also invalid + require.False(t, isValidPattern("*.csv", tableFiles, smallAll)) + + // empty pattern => invalid + require.False(t, isValidPattern("", tableFiles, smallAll)) +} + +func TestGenerateWildcardPath(t *testing.T) { + // Helper to create allFiles map + createAllFiles := func(paths []string) map[string]mydump.FileInfo { + allFiles := make(map[string]mydump.FileInfo) + for _, p := range paths { + allFiles[p] = mydump.FileInfo{ + FileMeta: mydump.SourceFileMeta{Path: p}, + } + } + return allFiles + } + + // No files + files1 := []mydump.FileInfo{} + allFiles1 := createAllFiles([]string{}) + _, err := generateWildcardPath(files1, allFiles1) + require.Error(t, err) + require.Contains(t, err.Error(), "no data files for table") + + // Single file + files2 := generateFileMetas(t, []string{"db.tb.0001.sql"}) + allFiles2 := createAllFiles([]string{"db.tb.0001.sql"}) + path2, err := generateWildcardPath(files2, allFiles2) + require.NoError(t, err) + require.Equal(t, "db.tb.0001.sql", path2) + + // Mydumper pattern succeeds + files3 := generateFileMetas(t, []string{"db.tb.0001.sql.gz", "db.tb.0002.sql.gz"}) + allFiles3 := createAllFiles([]string{"db.tb.0001.sql.gz", "db.tb.0002.sql.gz"}) + path3, err := generateWildcardPath(files3, allFiles3) + require.NoError(t, err) + require.Equal(t, "db.tb.*.sql.gz", path3) + + // Mydumper pattern fails, fallback to prefix/suffix succeeds + files4 := []mydump.FileInfo{ + { + TableName: filter.Table{Schema: "db", Name: "tb"}, + FileMeta: mydump.SourceFileMeta{ + Path: "a.sql", + Type: mydump.SourceTypeSQL, + Compression: mydump.CompressionNone, + }, + }, + { + TableName: filter.Table{Schema: "db", Name: "tb"}, + FileMeta: mydump.SourceFileMeta{ + Path: "b.sql", + Type: mydump.SourceTypeSQL, + Compression: mydump.CompressionNone, + }, + }, + } + allFiles4 := map[string]mydump.FileInfo{ + files4[0].FileMeta.Path: files4[0], + files4[1].FileMeta.Path: files4[1], + } + path4, err := generateWildcardPath(files4, allFiles4) + require.NoError(t, err) + require.Equal(t, "*.sql", path4) + + allFiles4["db-schema.sql"] = mydump.FileInfo{ + FileMeta: mydump.SourceFileMeta{ + Path: "db-schema.sql", + Type: mydump.SourceTypeSQL, + Compression: mydump.CompressionNone, + }, + } + _, err = generateWildcardPath(files4, allFiles4) + require.Error(t, err) + require.Contains(t, err.Error(), "cannot generate a unique wildcard pattern") +} diff --git a/pkg/lightning/mydump/loader.go b/pkg/lightning/mydump/loader.go index cf299b8d01..d150b97011 100644 --- a/pkg/lightning/mydump/loader.go +++ b/pkg/lightning/mydump/loader.go @@ -824,6 +824,22 @@ func (l *MDLoader) GetStore() storage.ExternalStorage { return l.store } +// GetAllFiles gets all the files for the loader. +func (l *MDLoader) GetAllFiles() map[string]FileInfo { + allFiles := make(map[string]FileInfo) + for _, dbMeta := range l.dbs { + for _, tblMeta := range dbMeta.Tables { + for _, file := range tblMeta.DataFiles { + allFiles[file.FileMeta.Path] = file + } + if tblMeta.SchemaFile.FileMeta.Path != "" { + allFiles[tblMeta.SchemaFile.FileMeta.Path] = tblMeta.SchemaFile + } + } + } + return allFiles +} + func calculateFileBytes(ctx context.Context, dataFile string, compressType storage.CompressType, diff --git a/tests/realtikvtest/importintotest4/BUILD.bazel b/tests/realtikvtest/importintotest4/BUILD.bazel index 6dc90d1d2e..22f05922bd 100644 --- a/tests/realtikvtest/importintotest4/BUILD.bazel +++ b/tests/realtikvtest/importintotest4/BUILD.bazel @@ -4,6 +4,7 @@ go_test( name = "importintotest4_test", timeout = "moderate", srcs = [ + "cloud_sdk_test.go", "global_sort_test.go", "import_summary_test.go", "main_test.go", @@ -20,6 +21,7 @@ go_test( "//pkg/disttask/framework/testutil", "//pkg/disttask/importinto", "//pkg/executor/importer", + "//pkg/importsdk", "//pkg/kv", "//pkg/lightning/config", "//pkg/planner/core", @@ -27,6 +29,7 @@ go_test( "//pkg/testkit/testfailpoint", "//tests/realtikvtest", "//tests/realtikvtest/testutils", + "@com_github_data_dog_go_sqlmock//:go-sqlmock", "@com_github_fsouza_fake_gcs_server//fakestorage", "@com_github_stretchr_testify//require", "@com_github_stretchr_testify//suite", diff --git a/tests/realtikvtest/importintotest4/cloud_sdk_test.go b/tests/realtikvtest/importintotest4/cloud_sdk_test.go new file mode 100644 index 0000000000..6d42da6a71 --- /dev/null +++ b/tests/realtikvtest/importintotest4/cloud_sdk_test.go @@ -0,0 +1,166 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importintotest + +import ( + "context" + "fmt" + "regexp" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/pingcap/tidb/pkg/importsdk" + "github.com/pingcap/tidb/pkg/lightning/config" + "github.com/pingcap/tidb/pkg/testkit" +) + +func (s *mockGCSSuite) TestCSVSource() { + // prepare source data + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_csv", Name: "t.1.csv"}, + Content: []byte("1,foo1,bar1,123\n2,foo2,bar2,456\n3,foo3,bar3,789\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_csv", Name: "t.2.csv"}, + Content: []byte("4,foo4,bar4,123\n5,foo5,bar5,223\n6,foo6,bar6,323\n"), + }) + s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "sorted"}) + + sortStorageURI := fmt.Sprintf("gs://sorted/cloud_csv?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint) + sourceURI := fmt.Sprintf("gs://cloud_csv/?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint) + + // create database and table + s.prepareAndUseDB("cloud_csv") + s.tk.MustExec(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int, + key(a), key(c,d), key(d));`) + + db, mock, err := sqlmock.New() + s.Require().NoError(err) + defer db.Close() + mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`). + WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}).AddRow("cloud_csv")) + mock.ExpectQuery("SHOW CREATE TABLE `cloud_csv`.`t`"). + WillReturnRows(sqlmock.NewRows([]string{"Create Table"}).AddRow(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int, + key(a), key(c,d), key(d));`)) + + cloudSDK, err := importsdk.NewImportSDK(context.Background(), sourceURI, db, + importsdk.WithFileRouters([]*config.FileRouteRule{ + {Pattern: ".*", Table: "t", Schema: "cloud_csv", Type: "csv"}, + })) + s.Require().NoError(err) + defer cloudSDK.Close() + s.Require().NoError(cloudSDK.CreateSchemasAndTables(context.Background())) + tableMetas, err := cloudSDK.GetTableMetas(context.Background()) + s.Require().NoError(err) + s.Len(tableMetas, 1) + tableMeta := tableMetas[0] + path := fmt.Sprintf("%s?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", tableMeta.WildcardPath, gcsEndpoint) + importSQL := fmt.Sprintf("import into %s.%s from '%s' with cloud_storage_uri='%s'", tableMeta.Database, tableMeta.Table, path, sortStorageURI) + result := s.tk.MustQuery(importSQL).Rows() + s.Len(result, 1) + s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows( + "1 foo1 bar1 123", "2 foo2 bar2 456", "3 foo3 bar3 789", + "4 foo4 bar4 123", "5 foo5 bar5 223", "6 foo6 bar6 323", + )) +} + +func (s *mockGCSSuite) TestDumplingSource() { + // prepare source data + s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "cloud_dumpling"}) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1-schema-create.sql"}, + Content: []byte("CREATE DATABASE IF NOT EXISTS cloud_dumpling1;\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2-schema-create.sql"}, + Content: []byte("CREATE DATABASE IF NOT EXISTS cloud_dumpling2;\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1.tb1-schema.sql"}, + Content: []byte("CREATE TABLE IF NOT EXISTS cloud_dumpling1.tb1 (a INT, b VARCHAR(10));\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1.tb1.001.sql"}, + Content: []byte("INSERT INTO cloud_dumpling1.tb1 VALUES (1,'a'),(2,'b');\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1.tb1.002.sql"}, + Content: []byte("INSERT INTO cloud_dumpling1.tb1 VALUES (3,'c'),(4,'d');\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2.tb2-schema.sql"}, + Content: []byte("CREATE TABLE IF NOT EXISTS cloud_dumpling2.tb2 (x INT, y VARCHAR(10));\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2.tb2.001.sql"}, + Content: []byte("INSERT INTO cloud_dumpling2.tb2 VALUES (5,'e'),(6,'f');\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2.tb2.002.sql"}, + Content: []byte("INSERT INTO cloud_dumpling2.tb2 VALUES (7,'g'),(8,'h');\n"), + }) + s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "sorted"}) + + sourceURI := fmt.Sprintf("gs://cloud_dumpling?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint) + sortStorageURI := fmt.Sprintf("gs://sorted/cloud_dumpling?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint) + + db, mock, err := sqlmock.New() + s.Require().NoError(err) + defer db.Close() + mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`). + WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"})) + mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `cloud_dumpling1`;"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `cloud_dumpling2`;"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `cloud_dumpling1`.`tb1` (`a` INT,`b` VARCHAR(10));")). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `cloud_dumpling2`.`tb2` (`x` INT,`y` VARCHAR(10));")). + WillReturnResult(sqlmock.NewResult(0, 1)) + + cloudSDK, err := importsdk.NewImportSDK(context.Background(), sourceURI, db, importsdk.WithConcurrency(1)) + s.Require().NoError(err) + defer cloudSDK.Close() + + s.Require().NoError(cloudSDK.CreateSchemasAndTables(context.Background())) + tableMetas, err := cloudSDK.GetTableMetas(context.Background()) + s.Require().NoError(err) + s.Len(tableMetas, 2) + + s.prepareAndUseDB("cloud_dumpling1") + s.prepareAndUseDB("cloud_dumpling2") + s.tk.MustExec("CREATE TABLE IF NOT EXISTS cloud_dumpling1.tb1 (a INT, b VARCHAR(10));") + s.tk.MustExec("CREATE TABLE IF NOT EXISTS cloud_dumpling2.tb2 (x INT, y VARCHAR(10));") + // import and validate data for each table + for _, tm := range tableMetas { + path := fmt.Sprintf("%s?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", + tm.WildcardPath, gcsEndpoint) + importSQL := fmt.Sprintf("import into %s.%s from '%s' format 'sql' with cloud_storage_uri='%s'", tm.Database, tm.Table, path, sortStorageURI) + result := s.tk.MustQuery(importSQL).Rows() + s.Len(result, 1) + // verify contents + fullQuery := fmt.Sprintf("select * from %s.%s", tm.Database, tm.Table) + switch tm.Table { + case "tb1": + s.tk.MustQuery(fullQuery).Sort().Check(testkit.Rows( + "1 a", "2 b", "3 c", "4 d")) + case "tb2": + s.tk.MustQuery(fullQuery).Sort().Check(testkit.Rows( + "5 e", "6 f", "7 g", "8 h")) + } + } + + s.Require().NoError(mock.ExpectationsWereMet()) +}