ImportInto: add import sdk for cloud (#61545)

ref pingcap/tidb#61264
This commit is contained in:
GMHDBJD
2025-09-05 11:48:12 +08:00
committed by GitHub
parent 57ebc04e58
commit bd5c194581
7 changed files with 1287 additions and 0 deletions

View File

@ -31,6 +31,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("syscall.syscall"),
goleak.IgnoreTopFunction("net.(*netFD).connect.func2"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
}
goleak.VerifyTestMain(m, opts...)
}

37
pkg/importsdk/BUILD.bazel Normal file
View File

@ -0,0 +1,37 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "importsdk",
srcs = ["sdk.go"],
importpath = "github.com/pingcap/tidb/pkg/importsdk",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/storage",
"//pkg/lightning/config",
"//pkg/lightning/log",
"//pkg/lightning/mydump",
"//pkg/parser/mysql",
"@com_github_pingcap_errors//:errors",
],
)
go_test(
name = "importsdk_test",
timeout = "short",
srcs = ["sdk_test.go"],
embed = [":importsdk"],
flaky = True,
shard_count = 9,
deps = [
"//pkg/lightning/config",
"//pkg/lightning/log",
"//pkg/lightning/mydump",
"//pkg/parser/mysql",
"//pkg/util/table-filter",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_ngaut_pools//:pools",
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
],
)

490
pkg/importsdk/sdk.go Normal file
View File

@ -0,0 +1,490 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importsdk
import (
"context"
"database/sql"
"path/filepath"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/parser/mysql"
)
// SDK defines the interface for cloud import services
type SDK interface {
// CreateSchemasAndTables creates all database schemas and tables from source path
CreateSchemasAndTables(ctx context.Context) error
// GetTableMetas returns metadata for all tables in the source path
GetTableMetas(ctx context.Context) ([]*TableMeta, error)
// GetTableMetaByName returns metadata for a specific table
GetTableMetaByName(ctx context.Context, schema, table string) (*TableMeta, error)
// GetTotalSize returns the cumulative size (in bytes) of all data files under the source path
GetTotalSize(ctx context.Context) int64
// Close releases resources used by the SDK
Close() error
}
// TableMeta contains metadata for a table to be imported
type TableMeta struct {
Database string
Table string
DataFiles []DataFileMeta
TotalSize int64 // In bytes
WildcardPath string // Wildcard pattern that matches only this table's data files
SchemaFile string // Path to the table schema file, if available
}
// DataFileMeta contains metadata for a data file
type DataFileMeta struct {
Path string
Size int64
Format mydump.SourceType
Compression mydump.Compression
}
// ImportSDK implements SDK interface
type ImportSDK struct {
sourcePath string
db *sql.DB
store storage.ExternalStorage
loader *mydump.MDLoader
logger log.Logger
config *sdkConfig
}
// NewImportSDK creates a new CloudImportSDK instance
func NewImportSDK(ctx context.Context, sourcePath string, db *sql.DB, options ...SDKOption) (SDK, error) {
cfg := defaultSDKConfig()
for _, opt := range options {
opt(cfg)
}
u, err := storage.ParseBackend(sourcePath, nil)
if err != nil {
return nil, errors.Annotatef(err, "failed to parse storage backend URL (source=%s). Please verify the URL format and credentials", sourcePath)
}
store, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return nil, errors.Annotatef(err, "failed to create external storage (source=%s). Check network/connectivity and permissions", sourcePath)
}
ldrCfg := mydump.LoaderConfig{
SourceURL: sourcePath,
Filter: cfg.filter,
FileRouters: cfg.fileRouteRules,
DefaultFileRules: len(cfg.fileRouteRules) == 0,
CharacterSet: cfg.charset,
}
loader, err := mydump.NewLoaderWithStore(ctx, ldrCfg, store)
if err != nil {
return nil, errors.Annotatef(err, "failed to create MyDump loader (source=%s, charset=%s, filter=%v). Please check dump layout and router rules", sourcePath, cfg.charset, cfg.filter)
}
return &ImportSDK{
sourcePath: sourcePath,
db: db,
store: store,
loader: loader,
logger: cfg.logger,
config: cfg,
}, nil
}
// SDKOption customizes the SDK configuration
type SDKOption func(*sdkConfig)
type sdkConfig struct {
// Loader options
concurrency int
sqlMode mysql.SQLMode
fileRouteRules []*config.FileRouteRule
filter []string
charset string
// General options
logger log.Logger
}
func defaultSDKConfig() *sdkConfig {
return &sdkConfig{
concurrency: 4,
filter: config.GetDefaultFilter(),
logger: log.L(),
charset: "auto",
}
}
// WithConcurrency sets the number of concurrent DB/Table creation workers.
func WithConcurrency(n int) SDKOption {
return func(cfg *sdkConfig) {
if n > 0 {
cfg.concurrency = n
}
}
}
// WithLogger specifies a custom logger
func WithLogger(logger log.Logger) SDKOption {
return func(cfg *sdkConfig) {
cfg.logger = logger
}
}
// WithSQLMode specifies the SQL mode for schema parsing
func WithSQLMode(mode mysql.SQLMode) SDKOption {
return func(cfg *sdkConfig) {
cfg.sqlMode = mode
}
}
// WithFilter specifies a filter for the loader
func WithFilter(filter []string) SDKOption {
return func(cfg *sdkConfig) {
cfg.filter = filter
}
}
// WithFileRouters specifies custom file routing rules
func WithFileRouters(routers []*config.FileRouteRule) SDKOption {
return func(cfg *sdkConfig) {
cfg.fileRouteRules = routers
}
}
// WithCharset specifies the character set for import (default "auto").
func WithCharset(cs string) SDKOption {
return func(cfg *sdkConfig) {
if cs != "" {
cfg.charset = cs
}
}
}
// CreateSchemasAndTables implements the CloudImportSDK interface
func (sdk *ImportSDK) CreateSchemasAndTables(ctx context.Context) error {
dbMetas := sdk.loader.GetDatabases()
if len(dbMetas) == 0 {
return errors.Annotatef(ErrNoDatabasesFound, "source=%s. Ensure the path contains valid dump files (*.sql, *.csv, *.parquet, etc.) and filter rules are correct", sdk.sourcePath)
}
// Create all schemas and tables
importer := mydump.NewSchemaImporter(
sdk.logger,
sdk.config.sqlMode,
sdk.db,
sdk.store,
sdk.config.concurrency,
)
err := importer.Run(ctx, dbMetas)
if err != nil {
return errors.Annotatef(err, "creating schemas and tables failed (source=%s, db_count=%d, concurrency=%d)", sdk.sourcePath, len(dbMetas), sdk.config.concurrency)
}
return nil
}
// GetTableMetas implements the CloudImportSDK interface
func (sdk *ImportSDK) GetTableMetas(context.Context) ([]*TableMeta, error) {
dbMetas := sdk.loader.GetDatabases()
allFiles := sdk.loader.GetAllFiles()
var results []*TableMeta
for _, dbMeta := range dbMetas {
for _, tblMeta := range dbMeta.Tables {
tableMeta, err := sdk.buildTableMeta(dbMeta, tblMeta, allFiles)
if err != nil {
return nil, errors.Wrapf(err, "failed to build metadata for table %s.%s",
dbMeta.Name, tblMeta.Name)
}
results = append(results, tableMeta)
}
}
return results, nil
}
// GetTableMetaByName implements CloudImportSDK interface
func (sdk *ImportSDK) GetTableMetaByName(_ context.Context, schema, table string) (*TableMeta, error) {
dbMetas := sdk.loader.GetDatabases()
allFiles := sdk.loader.GetAllFiles()
// Find the specific table
for _, dbMeta := range dbMetas {
if dbMeta.Name != schema {
continue
}
for _, tblMeta := range dbMeta.Tables {
if tblMeta.Name != table {
continue
}
return sdk.buildTableMeta(dbMeta, tblMeta, allFiles)
}
return nil, errors.Annotatef(ErrTableNotFound, "schema=%s, table=%s", schema, table)
}
return nil, errors.Annotatef(ErrSchemaNotFound, "schema=%s", schema)
}
// GetTotalSize implements CloudImportSDK interface
func (sdk *ImportSDK) GetTotalSize(ctx context.Context) int64 {
var total int64
dbMetas := sdk.loader.GetDatabases()
for _, dbMeta := range dbMetas {
for _, tblMeta := range dbMeta.Tables {
total += tblMeta.TotalSize
}
}
return total
}
// buildTableMeta creates a TableMeta from database and table metadata
func (sdk *ImportSDK) buildTableMeta(
dbMeta *mydump.MDDatabaseMeta,
tblMeta *mydump.MDTableMeta,
allDataFiles map[string]mydump.FileInfo,
) (*TableMeta, error) {
tableMeta := &TableMeta{
Database: dbMeta.Name,
Table: tblMeta.Name,
DataFiles: make([]DataFileMeta, 0, len(tblMeta.DataFiles)),
SchemaFile: tblMeta.SchemaFile.FileMeta.Path,
}
// Process data files
dataFiles, totalSize := processDataFiles(tblMeta.DataFiles)
tableMeta.DataFiles = dataFiles
tableMeta.TotalSize = totalSize
wildcard, err := generateWildcardPath(tblMeta.DataFiles, allDataFiles)
if err != nil {
return nil, errors.Annotatef(err, "failed to build wildcard for table=%s.%s", dbMeta.Name, tblMeta.Name)
}
tableMeta.WildcardPath = strings.TrimSuffix(sdk.store.URI(), "/") + "/" + wildcard
return tableMeta, nil
}
// Close implements CloudImportSDK interface
func (sdk *ImportSDK) Close() error {
// close external storage
if sdk.store != nil {
sdk.store.Close()
}
return nil
}
// processDataFiles converts mydump data files to DataFileMeta and calculates total size
func processDataFiles(files []mydump.FileInfo) ([]DataFileMeta, int64) {
dataFiles := make([]DataFileMeta, 0, len(files))
var totalSize int64
for _, dataFile := range files {
fileMeta := createDataFileMeta(dataFile)
dataFiles = append(dataFiles, fileMeta)
totalSize += dataFile.FileMeta.RealSize
}
return dataFiles, totalSize
}
// createDataFileMeta creates a DataFileMeta from a mydump.DataFile
func createDataFileMeta(file mydump.FileInfo) DataFileMeta {
return DataFileMeta{
Path: file.FileMeta.Path,
Size: file.FileMeta.RealSize,
Format: file.FileMeta.Type,
Compression: file.FileMeta.Compression,
}
}
// generateWildcardPath creates a wildcard pattern path that matches only this table's files
func generateWildcardPath(
files []mydump.FileInfo,
allFiles map[string]mydump.FileInfo,
) (string, error) {
tableFiles := make(map[string]struct{}, len(files))
for _, df := range files {
tableFiles[df.FileMeta.Path] = struct{}{}
}
if len(files) == 0 {
return "", errors.Annotate(ErrNoTableDataFiles, "cannot generate wildcard pattern because the table has no data files")
}
// If there's only one file, we can just return its path
if len(files) == 1 {
return files[0].FileMeta.Path, nil
}
// Try Mydumper-specific pattern first
p := generateMydumperPattern(files[0])
if p != "" && isValidPattern(p, tableFiles, allFiles) {
return p, nil
}
// Fallback to generic prefix/suffix pattern
paths := make([]string, 0, len(files))
for _, file := range files {
paths = append(paths, file.FileMeta.Path)
}
p = generatePrefixSuffixPattern(paths)
if p != "" && isValidPattern(p, tableFiles, allFiles) {
return p, nil
}
return "", errors.Annotatef(ErrWildcardNotSpecific, "failed to find a wildcard that matches all and only the table's files.")
}
// isValidPattern checks if a wildcard pattern matches only the table's files
func isValidPattern(pattern string, tableFiles map[string]struct{}, allFiles map[string]mydump.FileInfo) bool {
if pattern == "" {
return false
}
for path := range allFiles {
isMatch, err := filepath.Match(pattern, path)
if err != nil {
return false // Invalid pattern
}
_, isTableFile := tableFiles[path]
// If pattern matches a file that's not from our table, it's invalid
if isMatch && !isTableFile {
return false
}
// If pattern doesn't match our table's file, it's also invalid
if !isMatch && isTableFile {
return false
}
}
return true
}
// generateMydumperPattern generates a wildcard pattern for Mydumper-formatted data files
// belonging to a specific table, based on their naming convention.
// It returns a pattern string that matches all data files for the table, or an empty string if not applicable.
func generateMydumperPattern(file mydump.FileInfo) string {
dbName, tableName := file.TableName.Schema, file.TableName.Name
if dbName == "" || tableName == "" {
return ""
}
// compute dirPrefix and basename
full := file.FileMeta.Path
dirPrefix, name := "", full
if idx := strings.LastIndex(full, "/"); idx >= 0 {
dirPrefix = full[:idx+1]
name = full[idx+1:]
}
// compression ext from filename when compression exists (last suffix like .gz/.zst)
compExt := ""
if file.FileMeta.Compression != mydump.CompressionNone {
compExt = filepath.Ext(name)
}
// data ext after stripping compression ext
base := strings.TrimSuffix(name, compExt)
dataExt := filepath.Ext(base)
return dirPrefix + dbName + "." + tableName + ".*" + dataExt + compExt
}
// longestCommonPrefix finds the longest string that is a prefix of all strings in the slice
func longestCommonPrefix(strs []string) string {
if len(strs) == 0 {
return ""
}
prefix := strs[0]
for _, s := range strs[1:] {
i := 0
for i < len(prefix) && i < len(s) && prefix[i] == s[i] {
i++
}
prefix = prefix[:i]
if prefix == "" {
break
}
}
return prefix
}
// longestCommonSuffix finds the longest string that is a suffix of all strings in the slice, starting after the given prefix length
func longestCommonSuffix(strs []string, prefixLen int) string {
if len(strs) == 0 {
return ""
}
suffix := strs[0][prefixLen:]
for _, s := range strs[1:] {
remaining := s[prefixLen:]
i := 0
for i < len(suffix) && i < len(remaining) && suffix[len(suffix)-i-1] == remaining[len(remaining)-i-1] {
i++
}
suffix = suffix[len(suffix)-i:]
if suffix == "" {
break
}
}
return suffix
}
// generatePrefixSuffixPattern returns a wildcard pattern that matches all and only the given paths
// by finding the longest common prefix and suffix among them, and placing a '*' wildcard in between.
func generatePrefixSuffixPattern(paths []string) string {
if len(paths) == 0 {
return ""
}
if len(paths) == 1 {
return paths[0]
}
prefix := longestCommonPrefix(paths)
suffix := longestCommonSuffix(paths, len(prefix))
return prefix + "*" + suffix
}
// TODO: add error code and doc for cloud sdk
// Sentinel errors to categorize common failure scenarios for clearer user messages.
var (
// ErrNoDatabasesFound indicates that the dump source contains no recognizable databases.
ErrNoDatabasesFound = errors.New("no databases found in the source path")
// ErrSchemaNotFound indicates the target schema doesn't exist in the dump source.
ErrSchemaNotFound = errors.New("schema not found")
// ErrTableNotFound indicates the target table doesn't exist in the dump source.
ErrTableNotFound = errors.New("table not found")
// ErrNoTableDataFiles indicates a table has zero data files and thus cannot proceed.
ErrNoTableDataFiles = errors.New("no data files for table")
// ErrWildcardNotSpecific indicates a wildcard cannot uniquely match the table's files.
ErrWildcardNotSpecific = errors.New("cannot generate a unique wildcard pattern for the table's data files")
)

573
pkg/importsdk/sdk_test.go Normal file
View File

@ -0,0 +1,573 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importsdk
import (
"context"
"fmt"
"regexp"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/parser/mysql"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type mockGCSSuite struct {
suite.Suite
server *fakestorage.Server
pool *pools.ResourcePool
}
var (
gcsHost = "127.0.0.1"
gcsPort = uint16(4443)
// for fake gcs server, we must use this endpoint format
// NOTE: must end with '/'
gcsEndpointFormat = "http://%s:%d/storage/v1/"
gcsEndpoint = fmt.Sprintf(gcsEndpointFormat, gcsHost, gcsPort)
)
func TestCloudSDK(t *testing.T) {
suite.Run(t, &mockGCSSuite{})
}
func (s *mockGCSSuite) SetupSuite() {
var err error
opt := fakestorage.Options{
Scheme: "http",
Host: gcsHost,
Port: gcsPort,
PublicHost: gcsHost,
}
s.server, err = fakestorage.NewServerWithOptions(opt)
s.NoError(err)
}
func (s *mockGCSSuite) TearDownSuite() {
s.server.Stop()
}
func (s *mockGCSSuite) TestDumplingSource() {
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1-schema-create.sql"},
Content: []byte("CREATE DATABASE IF NOT EXISTS db1;\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2-schema-create.sql"},
Content: []byte("CREATE DATABASE IF NOT EXISTS db2;\n"),
})
// table1 in db1
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1.tb1-schema.sql"},
Content: []byte("CREATE TABLE IF NOT EXISTS db1.tb1 (a INT, b VARCHAR(10));\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1.tb1.001.sql"},
Content: []byte("INSERT INTO db1.tb1 VALUES (1,'a'),(2,'b');\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db1.tb1.002.sql"},
Content: []byte("INSERT INTO db1.tb1 VALUES (3,'c'),(4,'d');\n"),
})
// table2 in db2
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2.tb2-schema.sql"},
Content: []byte("CREATE TABLE IF NOT EXISTS db2.tb2 (x INT, y VARCHAR(10));\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2.tb2.001.sql"},
Content: []byte("INSERT INTO db2.tb2 VALUES (5,'e'),(6,'f');\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "dumpling", Name: "db2.tb2.002.sql"},
Content: []byte("INSERT INTO db2.tb2 VALUES (7,'g'),(8,'h');\n"),
})
db, mock, err := sqlmock.New()
s.NoError(err)
defer db.Close()
mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`).
WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}))
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db1`;").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db2`;").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db1`.`tb1` (`a` INT,`b` VARCHAR(10));")).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db2`.`tb2` (`x` INT,`y` VARCHAR(10));")).
WillReturnResult(sqlmock.NewResult(0, 1))
importSDK, err := NewImportSDK(
context.Background(),
fmt.Sprintf("gs://dumpling?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint),
db,
WithConcurrency(1),
)
s.NoError(err)
defer importSDK.Close()
err = importSDK.CreateSchemasAndTables(context.Background())
s.NoError(err)
tablesMeta, err := importSDK.GetTableMetas(context.Background())
s.NoError(err)
s.Len(tablesMeta, 2)
expected1 := &TableMeta{
Database: "db1",
Table: "tb1",
SchemaFile: "db1.tb1-schema.sql",
DataFiles: []DataFileMeta{
{Path: "db1.tb1.001.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone},
{Path: "db1.tb1.002.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone},
},
TotalSize: 88,
WildcardPath: "gcs://dumpling/db1.tb1.*.sql",
}
expected2 := &TableMeta{
Database: "db2",
Table: "tb2",
SchemaFile: "db2.tb2-schema.sql",
DataFiles: []DataFileMeta{
{Path: "db2.tb2.001.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone},
{Path: "db2.tb2.002.sql", Size: 44, Format: mydump.SourceTypeSQL, Compression: mydump.CompressionNone},
},
TotalSize: 88,
WildcardPath: "gcs://dumpling/db2.tb2.*.sql",
}
s.Equal(expected1, tablesMeta[0])
s.Equal(expected2, tablesMeta[1])
// verify GetTableMetaByName for each db
tm1, err := importSDK.GetTableMetaByName(context.Background(), "db1", "tb1")
s.NoError(err)
s.Equal(tm1, expected1)
tm2, err := importSDK.GetTableMetaByName(context.Background(), "db2", "tb2")
s.NoError(err)
s.Equal(tm2, expected2)
totalSize := importSDK.GetTotalSize(context.Background())
s.Equal(totalSize, int64(176))
// check meets expectations
err = mock.ExpectationsWereMet()
s.NoError(err)
}
func (s *mockGCSSuite) TestCSVSource() {
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1-schema-create.sql"},
Content: []byte("CREATE DATABASE IF NOT EXISTS db1;\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2-schema-create.sql"},
Content: []byte("CREATE DATABASE IF NOT EXISTS db2;\n"),
})
// table1 in db1
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1.tb1-schema.sql"},
Content: []byte("CREATE TABLE IF NOT EXISTS db1.tb1 (a INT, b VARCHAR(10));\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1.tb1.001.csv"},
Content: []byte("1,a\n2,b\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db1.tb1.002.csv"},
Content: []byte("3,c\n4,d\n"),
})
// table2 in db2
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2.tb2-schema.sql"},
Content: []byte("CREATE TABLE IF NOT EXISTS db2.tb2 (x INT, y VARCHAR(10));\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2.tb2.001.csv"},
Content: []byte("5,e\n6,f\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "csv", Name: "db2.tb2.002.csv"},
Content: []byte("7,g\n8,h\n"),
})
db, mock, err := sqlmock.New()
s.NoError(err)
defer db.Close()
mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`).
WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}))
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db1`;").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db2`;").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db1`.`tb1` (`a` INT,`b` VARCHAR(10));")).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `db2`.`tb2` (`x` INT,`y` VARCHAR(10));")).
WillReturnResult(sqlmock.NewResult(0, 1))
importSDK, err := NewImportSDK(
context.Background(),
fmt.Sprintf("gs://csv?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint),
db,
WithConcurrency(1),
)
s.NoError(err)
defer importSDK.Close()
err = importSDK.CreateSchemasAndTables(context.Background())
s.NoError(err)
tablesMeta, err := importSDK.GetTableMetas(context.Background())
s.NoError(err)
s.Len(tablesMeta, 2)
expected1 := &TableMeta{
Database: "db1",
Table: "tb1",
SchemaFile: "db1.tb1-schema.sql",
DataFiles: []DataFileMeta{
{Path: "db1.tb1.001.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone},
{Path: "db1.tb1.002.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone},
},
TotalSize: 16,
WildcardPath: "gcs://csv/db1.tb1.*.csv",
}
expected2 := &TableMeta{
Database: "db2",
Table: "tb2",
SchemaFile: "db2.tb2-schema.sql",
DataFiles: []DataFileMeta{
{Path: "db2.tb2.001.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone},
{Path: "db2.tb2.002.csv", Size: 8, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone},
},
TotalSize: 16,
WildcardPath: "gcs://csv/db2.tb2.*.csv",
}
s.Equal(expected1, tablesMeta[0])
s.Equal(expected2, tablesMeta[1])
// verify GetTableMetaByName for each db
tm1, err := importSDK.GetTableMetaByName(context.Background(), "db1", "tb1")
s.NoError(err)
s.Equal(tm1, expected1)
tm2, err := importSDK.GetTableMetaByName(context.Background(), "db2", "tb2")
s.NoError(err)
s.Equal(tm2, expected2)
totalSize := importSDK.GetTotalSize(context.Background())
s.Equal(totalSize, int64(32))
// check meets expectations
err = mock.ExpectationsWereMet()
s.NoError(err)
}
func (s *mockGCSSuite) TestOnlyDataFiles() {
s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "onlydata"})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "onlydata", Name: "part1.csv"},
Content: []byte("a,b\n1,a\n2,b\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "onlydata", Name: "part2.csv"},
Content: []byte("a,b\n3,c\n4,d\n"),
})
db, mock, err := sqlmock.New()
s.NoError(err)
defer db.Close()
mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`).
WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}))
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `db`;").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SHOW CREATE TABLE `db`.`tb`").
WillReturnRows(sqlmock.NewRows([]string{"Create Table"}).AddRow("CREATE TABLE `db`.`tb` (`a` INT, `b` VARCHAR(10));"))
importSDK, err := NewImportSDK(
context.Background(),
fmt.Sprintf("gs://onlydata?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint),
db,
WithCharset("utf8"),
WithConcurrency(8),
WithFilter([]string{"*.*"}),
WithSQLMode(mysql.ModeANSIQuotes),
WithLogger(log.L()),
WithFileRouters([]*config.FileRouteRule{
{
Pattern: `.*\.csv$`,
Schema: "db",
Table: "tb",
Type: "csv",
},
}),
)
s.NoError(err)
defer importSDK.Close()
err = importSDK.CreateSchemasAndTables(context.Background())
s.NoError(err)
tablesMeta, err := importSDK.GetTableMetas(context.Background())
s.NoError(err)
s.Len(tablesMeta, 1)
s.Equal(&TableMeta{
Database: "db",
Table: "tb",
SchemaFile: "",
DataFiles: []DataFileMeta{
{Path: "part1.csv", Size: 12, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone},
{Path: "part2.csv", Size: 12, Format: mydump.SourceTypeCSV, Compression: mydump.CompressionNone},
},
TotalSize: 24,
WildcardPath: "gcs://onlydata/part*.csv",
}, tablesMeta[0])
tableMeta, err := importSDK.GetTableMetaByName(context.Background(), "db", "tb")
s.NoError(err)
s.Equal(tableMeta, tablesMeta[0])
totalSize := importSDK.GetTotalSize(context.Background())
s.Equal(totalSize, int64(24))
err = mock.ExpectationsWereMet()
s.NoError(err)
}
func TestLongestCommonPrefix(t *testing.T) {
strs := []string{"s3://bucket/foo/bar/baz1", "s3://bucket/foo/bar/baz2", "s3://bucket/foo/bar/baz"}
p := longestCommonPrefix(strs)
require.Equal(t, "s3://bucket/foo/bar/baz", p)
// no common prefix
require.Equal(t, "", longestCommonPrefix([]string{"a", "b"}))
// empty inputs
require.Equal(t, "", longestCommonPrefix(nil))
require.Equal(t, "", longestCommonPrefix([]string{}))
}
func TestLongestCommonSuffix(t *testing.T) {
strs := []string{"abcXYZ", "defXYZ", "XYZ"}
s := longestCommonSuffix(strs, 0)
require.Equal(t, "XYZ", s)
// no common suffix
require.Equal(t, "", longestCommonSuffix([]string{"a", "b"}, 0))
// empty inputs
require.Equal(t, "", longestCommonSuffix(nil, 0))
require.Equal(t, "", longestCommonSuffix([]string{}, 0))
// same prefix
require.Equal(t, "", longestCommonSuffix([]string{"abc", "abc"}, 3))
require.Equal(t, "f", longestCommonSuffix([]string{"abcdf", "abcef"}, 3))
}
func TestGeneratePrefixSuffixPattern(t *testing.T) {
paths := []string{"pre_middle_suf", "pre_most_suf"}
pattern := generatePrefixSuffixPattern(paths)
// common prefix "pre_m", suffix "_suf"
require.Equal(t, "pre_m*_suf", pattern)
// empty inputs
require.Equal(t, "", generatePrefixSuffixPattern(nil))
require.Equal(t, "", generatePrefixSuffixPattern([]string{}))
// only one file
require.Equal(t, "pre_middle_suf", generatePrefixSuffixPattern([]string{"pre_middle_suf"}))
// no common prefix/suffix
paths2 := []string{"foo", "bar"}
require.Equal(t, "*", generatePrefixSuffixPattern(paths2))
// overlapping prefix/suffix
paths3 := []string{"aaabaaa", "aaa"}
require.Equal(t, "aaa*", generatePrefixSuffixPattern(paths3))
}
func generateFileMetas(t *testing.T, paths []string) []mydump.FileInfo {
t.Helper()
files := make([]mydump.FileInfo, 0, len(paths))
fileRouter, err := mydump.NewDefaultFileRouter(log.L())
require.NoError(t, err)
for _, p := range paths {
res, err := fileRouter.Route(p)
require.NoError(t, err)
files = append(files, mydump.FileInfo{
TableName: res.Table,
FileMeta: mydump.SourceFileMeta{
Path: p,
Type: res.Type,
Compression: res.Compression,
SortKey: res.Key,
},
})
}
return files
}
func TestGenerateMydumperPattern(t *testing.T) {
paths := []string{"db.tb.0001.sql", "db.tb.0002.sql"}
p := generateMydumperPattern(generateFileMetas(t, paths)[0])
require.Equal(t, "db.tb.*.sql", p)
paths2 := []string{"s3://bucket/dir/db.tb.0001.sql", "s3://bucket/dir/db.tb.0002.sql"}
p2 := generateMydumperPattern(generateFileMetas(t, paths2)[0])
require.Equal(t, "s3://bucket/dir/db.tb.*.sql", p2)
// not mydumper pattern
require.Equal(t, "", generateMydumperPattern(mydump.FileInfo{
TableName: filter.Table{},
}))
}
func TestCreateDataFileMeta(t *testing.T) {
fi := mydump.FileInfo{
TableName: filter.Table{
Schema: "db",
Name: "table",
},
FileMeta: mydump.SourceFileMeta{
Path: "s3://bucket/path/to/f",
FileSize: 123,
Type: mydump.SourceTypeCSV,
Compression: mydump.CompressionGZ,
RealSize: 456,
},
}
df := createDataFileMeta(fi)
require.Equal(t, "s3://bucket/path/to/f", df.Path)
require.Equal(t, int64(456), df.Size)
require.Equal(t, mydump.SourceTypeCSV, df.Format)
require.Equal(t, mydump.CompressionGZ, df.Compression)
}
func TestProcessDataFiles(t *testing.T) {
files := []mydump.FileInfo{
{FileMeta: mydump.SourceFileMeta{Path: "s3://bucket/a", RealSize: 10}},
{FileMeta: mydump.SourceFileMeta{Path: "s3://bucket/b", RealSize: 20}},
}
dfm, total := processDataFiles(files)
require.Len(t, dfm, 2)
require.Equal(t, int64(30), total)
require.Equal(t, "s3://bucket/a", dfm[0].Path)
require.Equal(t, "s3://bucket/b", dfm[1].Path)
}
func TestValidatePattern(t *testing.T) {
tableFiles := map[string]struct{}{
"a.txt": {}, "b.txt": {},
}
// only table files in allFiles
smallAll := map[string]mydump.FileInfo{
"a.txt": {}, "b.txt": {},
}
require.True(t, isValidPattern("*.txt", tableFiles, smallAll))
// allFiles includes an extra file => invalid
fullAll := map[string]mydump.FileInfo{
"a.txt": {}, "b.txt": {}, "c.txt": {},
}
require.False(t, isValidPattern("*.txt", tableFiles, fullAll))
// If pattern doesn't match our table's file, it's also invalid
require.False(t, isValidPattern("*.csv", tableFiles, smallAll))
// empty pattern => invalid
require.False(t, isValidPattern("", tableFiles, smallAll))
}
func TestGenerateWildcardPath(t *testing.T) {
// Helper to create allFiles map
createAllFiles := func(paths []string) map[string]mydump.FileInfo {
allFiles := make(map[string]mydump.FileInfo)
for _, p := range paths {
allFiles[p] = mydump.FileInfo{
FileMeta: mydump.SourceFileMeta{Path: p},
}
}
return allFiles
}
// No files
files1 := []mydump.FileInfo{}
allFiles1 := createAllFiles([]string{})
_, err := generateWildcardPath(files1, allFiles1)
require.Error(t, err)
require.Contains(t, err.Error(), "no data files for table")
// Single file
files2 := generateFileMetas(t, []string{"db.tb.0001.sql"})
allFiles2 := createAllFiles([]string{"db.tb.0001.sql"})
path2, err := generateWildcardPath(files2, allFiles2)
require.NoError(t, err)
require.Equal(t, "db.tb.0001.sql", path2)
// Mydumper pattern succeeds
files3 := generateFileMetas(t, []string{"db.tb.0001.sql.gz", "db.tb.0002.sql.gz"})
allFiles3 := createAllFiles([]string{"db.tb.0001.sql.gz", "db.tb.0002.sql.gz"})
path3, err := generateWildcardPath(files3, allFiles3)
require.NoError(t, err)
require.Equal(t, "db.tb.*.sql.gz", path3)
// Mydumper pattern fails, fallback to prefix/suffix succeeds
files4 := []mydump.FileInfo{
{
TableName: filter.Table{Schema: "db", Name: "tb"},
FileMeta: mydump.SourceFileMeta{
Path: "a.sql",
Type: mydump.SourceTypeSQL,
Compression: mydump.CompressionNone,
},
},
{
TableName: filter.Table{Schema: "db", Name: "tb"},
FileMeta: mydump.SourceFileMeta{
Path: "b.sql",
Type: mydump.SourceTypeSQL,
Compression: mydump.CompressionNone,
},
},
}
allFiles4 := map[string]mydump.FileInfo{
files4[0].FileMeta.Path: files4[0],
files4[1].FileMeta.Path: files4[1],
}
path4, err := generateWildcardPath(files4, allFiles4)
require.NoError(t, err)
require.Equal(t, "*.sql", path4)
allFiles4["db-schema.sql"] = mydump.FileInfo{
FileMeta: mydump.SourceFileMeta{
Path: "db-schema.sql",
Type: mydump.SourceTypeSQL,
Compression: mydump.CompressionNone,
},
}
_, err = generateWildcardPath(files4, allFiles4)
require.Error(t, err)
require.Contains(t, err.Error(), "cannot generate a unique wildcard pattern")
}

View File

@ -824,6 +824,22 @@ func (l *MDLoader) GetStore() storage.ExternalStorage {
return l.store
}
// GetAllFiles gets all the files for the loader.
func (l *MDLoader) GetAllFiles() map[string]FileInfo {
allFiles := make(map[string]FileInfo)
for _, dbMeta := range l.dbs {
for _, tblMeta := range dbMeta.Tables {
for _, file := range tblMeta.DataFiles {
allFiles[file.FileMeta.Path] = file
}
if tblMeta.SchemaFile.FileMeta.Path != "" {
allFiles[tblMeta.SchemaFile.FileMeta.Path] = tblMeta.SchemaFile
}
}
}
return allFiles
}
func calculateFileBytes(ctx context.Context,
dataFile string,
compressType storage.CompressType,

View File

@ -4,6 +4,7 @@ go_test(
name = "importintotest4_test",
timeout = "moderate",
srcs = [
"cloud_sdk_test.go",
"global_sort_test.go",
"import_summary_test.go",
"main_test.go",
@ -20,6 +21,7 @@ go_test(
"//pkg/disttask/framework/testutil",
"//pkg/disttask/importinto",
"//pkg/executor/importer",
"//pkg/importsdk",
"//pkg/kv",
"//pkg/lightning/config",
"//pkg/planner/core",
@ -27,6 +29,7 @@ go_test(
"//pkg/testkit/testfailpoint",
"//tests/realtikvtest",
"//tests/realtikvtest/testutils",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",

View File

@ -0,0 +1,166 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importintotest
import (
"context"
"fmt"
"regexp"
"github.com/DATA-DOG/go-sqlmock"
"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/pingcap/tidb/pkg/importsdk"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/testkit"
)
func (s *mockGCSSuite) TestCSVSource() {
// prepare source data
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_csv", Name: "t.1.csv"},
Content: []byte("1,foo1,bar1,123\n2,foo2,bar2,456\n3,foo3,bar3,789\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_csv", Name: "t.2.csv"},
Content: []byte("4,foo4,bar4,123\n5,foo5,bar5,223\n6,foo6,bar6,323\n"),
})
s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "sorted"})
sortStorageURI := fmt.Sprintf("gs://sorted/cloud_csv?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint)
sourceURI := fmt.Sprintf("gs://cloud_csv/?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint)
// create database and table
s.prepareAndUseDB("cloud_csv")
s.tk.MustExec(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int,
key(a), key(c,d), key(d));`)
db, mock, err := sqlmock.New()
s.Require().NoError(err)
defer db.Close()
mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`).
WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}).AddRow("cloud_csv"))
mock.ExpectQuery("SHOW CREATE TABLE `cloud_csv`.`t`").
WillReturnRows(sqlmock.NewRows([]string{"Create Table"}).AddRow(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int,
key(a), key(c,d), key(d));`))
cloudSDK, err := importsdk.NewImportSDK(context.Background(), sourceURI, db,
importsdk.WithFileRouters([]*config.FileRouteRule{
{Pattern: ".*", Table: "t", Schema: "cloud_csv", Type: "csv"},
}))
s.Require().NoError(err)
defer cloudSDK.Close()
s.Require().NoError(cloudSDK.CreateSchemasAndTables(context.Background()))
tableMetas, err := cloudSDK.GetTableMetas(context.Background())
s.Require().NoError(err)
s.Len(tableMetas, 1)
tableMeta := tableMetas[0]
path := fmt.Sprintf("%s?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", tableMeta.WildcardPath, gcsEndpoint)
importSQL := fmt.Sprintf("import into %s.%s from '%s' with cloud_storage_uri='%s'", tableMeta.Database, tableMeta.Table, path, sortStorageURI)
result := s.tk.MustQuery(importSQL).Rows()
s.Len(result, 1)
s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(
"1 foo1 bar1 123", "2 foo2 bar2 456", "3 foo3 bar3 789",
"4 foo4 bar4 123", "5 foo5 bar5 223", "6 foo6 bar6 323",
))
}
func (s *mockGCSSuite) TestDumplingSource() {
// prepare source data
s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "cloud_dumpling"})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1-schema-create.sql"},
Content: []byte("CREATE DATABASE IF NOT EXISTS cloud_dumpling1;\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2-schema-create.sql"},
Content: []byte("CREATE DATABASE IF NOT EXISTS cloud_dumpling2;\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1.tb1-schema.sql"},
Content: []byte("CREATE TABLE IF NOT EXISTS cloud_dumpling1.tb1 (a INT, b VARCHAR(10));\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1.tb1.001.sql"},
Content: []byte("INSERT INTO cloud_dumpling1.tb1 VALUES (1,'a'),(2,'b');\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling1.tb1.002.sql"},
Content: []byte("INSERT INTO cloud_dumpling1.tb1 VALUES (3,'c'),(4,'d');\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2.tb2-schema.sql"},
Content: []byte("CREATE TABLE IF NOT EXISTS cloud_dumpling2.tb2 (x INT, y VARCHAR(10));\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2.tb2.001.sql"},
Content: []byte("INSERT INTO cloud_dumpling2.tb2 VALUES (5,'e'),(6,'f');\n"),
})
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "cloud_dumpling", Name: "cloud_dumpling2.tb2.002.sql"},
Content: []byte("INSERT INTO cloud_dumpling2.tb2 VALUES (7,'g'),(8,'h');\n"),
})
s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "sorted"})
sourceURI := fmt.Sprintf("gs://cloud_dumpling?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint)
sortStorageURI := fmt.Sprintf("gs://sorted/cloud_dumpling?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint)
db, mock, err := sqlmock.New()
s.Require().NoError(err)
defer db.Close()
mock.ExpectQuery(`SELECT SCHEMA_NAME FROM information_schema.SCHEMATA`).
WillReturnRows(sqlmock.NewRows([]string{"SCHEMA_NAME"}))
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `cloud_dumpling1`;").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("CREATE DATABASE IF NOT EXISTS `cloud_dumpling2`;").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `cloud_dumpling1`.`tb1` (`a` INT,`b` VARCHAR(10));")).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(regexp.QuoteMeta("CREATE TABLE IF NOT EXISTS `cloud_dumpling2`.`tb2` (`x` INT,`y` VARCHAR(10));")).
WillReturnResult(sqlmock.NewResult(0, 1))
cloudSDK, err := importsdk.NewImportSDK(context.Background(), sourceURI, db, importsdk.WithConcurrency(1))
s.Require().NoError(err)
defer cloudSDK.Close()
s.Require().NoError(cloudSDK.CreateSchemasAndTables(context.Background()))
tableMetas, err := cloudSDK.GetTableMetas(context.Background())
s.Require().NoError(err)
s.Len(tableMetas, 2)
s.prepareAndUseDB("cloud_dumpling1")
s.prepareAndUseDB("cloud_dumpling2")
s.tk.MustExec("CREATE TABLE IF NOT EXISTS cloud_dumpling1.tb1 (a INT, b VARCHAR(10));")
s.tk.MustExec("CREATE TABLE IF NOT EXISTS cloud_dumpling2.tb2 (x INT, y VARCHAR(10));")
// import and validate data for each table
for _, tm := range tableMetas {
path := fmt.Sprintf("%s?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb",
tm.WildcardPath, gcsEndpoint)
importSQL := fmt.Sprintf("import into %s.%s from '%s' format 'sql' with cloud_storage_uri='%s'", tm.Database, tm.Table, path, sortStorageURI)
result := s.tk.MustQuery(importSQL).Rows()
s.Len(result, 1)
// verify contents
fullQuery := fmt.Sprintf("select * from %s.%s", tm.Database, tm.Table)
switch tm.Table {
case "tb1":
s.tk.MustQuery(fullQuery).Sort().Check(testkit.Rows(
"1 a", "2 b", "3 c", "4 d"))
case "tb2":
s.tk.MustQuery(fullQuery).Sort().Check(testkit.Rows(
"5 e", "6 f", "7 g", "8 h"))
}
}
s.Require().NoError(mock.ExpectationsWereMet())
}