importinto: fix some check and external-id not set after SEM v2 introduced (#64170)

ref pingcap/tidb#61702
This commit is contained in:
D3Hunter
2025-10-30 08:48:58 +08:00
committed by GitHub
parent 3d4d19a687
commit eaf12833ea
9 changed files with 163 additions and 74 deletions

View File

@ -503,6 +503,7 @@ go_test(
"//pkg/util/paging",
"//pkg/util/ranger",
"//pkg/util/sem",
"//pkg/util/sem/v2:sem",
"//pkg/util/set",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",

View File

@ -26,29 +26,66 @@ import (
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/sem"
semv1 "github.com/pingcap/tidb/pkg/util/sem"
semv2 "github.com/pingcap/tidb/pkg/util/sem/v2"
"github.com/stretchr/testify/require"
)
var (
semTestPatternFnsFac = func(sqlRules ...string) map[string][2]func(t *testing.T, tk *testkit.TestKit) {
return map[string][2]func(t *testing.T, tk *testkit.TestKit){
"v1": {
func(t *testing.T, tk *testkit.TestKit) { semv1.Enable() },
func(t *testing.T, tk *testkit.TestKit) { semv1.Disable() },
},
"v2": {
func(t *testing.T, tk *testkit.TestKit) {
t.Helper()
// in SEM V1, "import_with_external_id", "import_from_local" are forbidden
// completely, but in SEM V2, it's taken as restricted SQL, which
// requires RESTRICTED_SQL_ADMIN privilege, when SEM enabled, root
// doesn't have this privilege by default.
// we must add this Auth as default testkit session doesn't have user
// info.
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil))
require.NoError(t, semv2.EnableBy(&semv2.Config{TiDBVersion: "v0.0.0",
RestrictedSQL: semv2.SQLRestriction{Rule: sqlRules}}))
},
func(t *testing.T, tk *testkit.TestKit) { semv2.Disable() },
},
}
}
semTestPatternFns = semTestPatternFnsFac("import_from_local", "import_with_external_id")
)
func TestSecurityEnhancedMode(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
sem.Enable()
defer sem.Disable()
tk.MustExec("create table test.t (id int);")
for i, fns := range semTestPatternFns {
t.Run(fmt.Sprint(i), func(t *testing.T) {
fns[0](t, tk)
t.Cleanup(func() {
fns[1](t, tk)
})
// When SEM is enabled these features are restricted to all users
// regardless of what privileges they have available.
tk.MustGetErrMsg("IMPORT INTO test.t FROM '/file.csv'", "[planner:8132]Feature 'IMPORT INTO from server disk' is not supported when security enhanced mode is enabled")
// When SEM is enabled these features are restricted to all users
// regardless of what privileges they have available.
tk.MustMatchErrMsg("IMPORT INTO test.t FROM '/file.csv'", `(?i).*Feature 'IMPORT INTO .*from.*' is not supported when security enhanced mode is enabled`)
})
}
}
func TestClassicS3ExternalID(t *testing.T) {
@ -70,20 +107,24 @@ func TestClassicS3ExternalID(t *testing.T) {
})
t.Run("SEM enabled, explicit external ID is allowed, and we don't change it", func(t *testing.T) {
sem.Enable()
t.Cleanup(func() {
sem.Disable()
})
tk := testkit.NewTestKit(t, store)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/executor/importer/NewImportPlan", func(plan *plannercore.ImportInto) {
u, err := url.Parse(plan.Path)
require.NoError(t, err)
require.Contains(t, u.Query(), storage.S3ExternalID)
require.Equal(t, "allowed", u.Query().Get(storage.S3ExternalID))
panic("FAIL IT, AS WE CANNOT RUN IT HERE")
})
tk.MustExec("IMPORT INTO test.t FROM 's3://bucket?EXTERNAL-ID=allowed'")
tk.MustQuery("select * from test.t").Check(testkit.Rows())
for i, fns := range semTestPatternFnsFac() {
t.Run(fmt.Sprint(i), func(t *testing.T) {
tk := testkit.NewTestKit(t, store)
fns[0](t, tk)
t.Cleanup(func() {
fns[1](t, tk)
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/executor/importer/NewImportPlan", func(plan *plannercore.ImportInto) {
u, err := url.Parse(plan.Path)
require.NoError(t, err)
require.Contains(t, u.Query(), storage.S3ExternalID)
require.Equal(t, "allowed", u.Query().Get(storage.S3ExternalID))
panic("FAIL IT, AS WE CANNOT RUN IT HERE")
})
tk.MustExec("IMPORT INTO test.t FROM 's3://bucket?EXTERNAL-ID=allowed'")
tk.MustQuery("select * from test.t").Check(testkit.Rows())
})
}
})
t.Run("SEM disabled, explicit external ID is also allowed, and we don't change it", func(t *testing.T) {
@ -109,16 +150,19 @@ func TestNextGenS3ExternalID(t *testing.T) {
outerTK.MustExec("create table test.t (id int);")
t.Run("SEM enabled, forbid set S3 external ID", func(t *testing.T) {
tk := testkit.NewTestKit(t, store)
sem.Enable()
t.Cleanup(func() {
sem.Disable()
})
tk.MustGetErrMsg("IMPORT INTO test.t FROM 's3://bucket?EXTERNAL-ID=abc'", "[planner:8132]Feature 'IMPORT INTO with S3 external ID' is not supported when security enhanced mode is enabled")
for i, fns := range semTestPatternFns {
t.Run(fmt.Sprint(i), func(t *testing.T) {
tk := testkit.NewTestKit(t, store)
fns[0](t, tk)
t.Cleanup(func() {
fns[1](t, tk)
})
tk.MustMatchErrMsg("IMPORT INTO test.t FROM 's3://bucket?EXTERNAL-ID=abc'", `(?i).*Feature 'IMPORT INTO .*external.*' is not supported when security enhanced mode is enabled`)
})
}
})
t.Run("SEM enabled, set S3 external ID to keyspace name", func(t *testing.T) {
sem.Enable()
bak := config.GetGlobalKeyspaceName()
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = "aaa"
@ -127,18 +171,25 @@ func TestNextGenS3ExternalID(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = bak
})
sem.Disable()
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/executor/importer/NewImportPlan", func(plan *plannercore.ImportInto) {
u, err := url.Parse(plan.Path)
require.NoError(t, err)
require.Contains(t, u.Query(), storage.S3ExternalID)
require.Equal(t, "aaa", u.Query().Get(storage.S3ExternalID))
panic("FAIL IT, AS WE CANNOT RUN IT HERE")
})
tk := testkit.NewTestKit(t, store)
err := tk.QueryToErr("IMPORT INTO test.t FROM 's3://bucket'")
require.ErrorContains(t, err, "FAIL IT, AS WE CANNOT RUN IT HERE")
for i, fns := range semTestPatternFns {
t.Run(fmt.Sprint(i), func(t *testing.T) {
tk := testkit.NewTestKit(t, store)
fns[0](t, tk)
t.Cleanup(func() {
fns[1](t, tk)
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/executor/importer/NewImportPlan", func(plan *plannercore.ImportInto) {
u, err := url.Parse(plan.Path)
require.NoError(t, err)
require.Contains(t, u.Query(), storage.S3ExternalID)
require.Equal(t, "aaa", u.Query().Get(storage.S3ExternalID))
panic("FAIL IT, AS WE CANNOT RUN IT HERE")
})
err := tk.QueryToErr("IMPORT INTO test.t FROM 's3://bucket'")
require.ErrorContains(t, err, "FAIL IT, AS WE CANNOT RUN IT HERE")
})
}
})
t.Run("SEM disabled, allow explicit S3 external id, should not change it", func(t *testing.T) {
@ -172,20 +223,30 @@ func TestNextGenUnsupportedLocalSortAndOptions(t *testing.T) {
store := testkit.CreateMockStore(t)
outerTK := testkit.NewTestKit(t, store)
outerTK.MustExec("create table test.t (id int);")
sem.Enable()
t.Cleanup(func() {
sem.Disable()
})
for i, fns := range semTestPatternFns {
t.Run(fmt.Sprint(i), func(t *testing.T) {
testNextGenUnsupportedLocalSortAndOptions(t, store, func(t *testing.T, tk *testkit.TestKit) {
fns[0](t, tk)
t.Cleanup(func() {
fns[1](t, tk)
})
})
})
}
}
func testNextGenUnsupportedLocalSortAndOptions(t *testing.T, store kv.Storage, initFn func(t *testing.T, tk *testkit.TestKit)) {
t.Run("import from select", func(t *testing.T) {
tk := testkit.NewTestKit(t, store)
initFn(t, tk)
err := tk.ExecToErr("IMPORT INTO test.t FROM select 1")
require.ErrorIs(t, err, plannererrors.ErrNotSupportedWithSem)
require.ErrorContains(t, err, "IMPORT INTO from select")
require.Regexp(t, `IMPORT INTO .* select`, err.Error())
})
t.Run("local sort", func(t *testing.T) {
tk := testkit.NewTestKit(t, store)
initFn(t, tk)
err := tk.QueryToErr("IMPORT INTO test.t FROM 's3://bucket/*.csv'")
require.ErrorIs(t, err, plannererrors.ErrNotSupportedWithSem)
require.ErrorContains(t, err, "IMPORT INTO with local sort")
@ -193,6 +254,7 @@ func TestNextGenUnsupportedLocalSortAndOptions(t *testing.T) {
t.Run("unsupported options", func(t *testing.T) {
tk := testkit.NewTestKit(t, store)
initFn(t, tk)
bak := variable.ValidateCloudStorageURI
variable.ValidateCloudStorageURI = func(ctx context.Context, uri string) error {
return nil
@ -201,6 +263,9 @@ func TestNextGenUnsupportedLocalSortAndOptions(t *testing.T) {
variable.ValidateCloudStorageURI = bak
})
tk.MustExec("set global tidb_cloud_storage_uri='s3://bucket/tmp'")
t.Cleanup(func() {
tk.MustExec("set global tidb_cloud_storage_uri=''")
})
for _, option := range []string{
"disk_quota",
"max_write_speed",

View File

@ -75,6 +75,7 @@ go_library(
"//pkg/util/naming",
"//pkg/util/promutil",
"//pkg/util/sem",
"//pkg/util/sem/compat",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/stringutil",

View File

@ -70,6 +70,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/naming"
semv1 "github.com/pingcap/tidb/pkg/util/sem"
sem "github.com/pingcap/tidb/pkg/util/sem/compat"
"github.com/pingcap/tidb/pkg/util/stringutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
@ -690,6 +691,8 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
if p.DataSourceType == DataSourceTypeQuery {
return plannererrors.ErrNotSupportedWithSem.GenWithStackByArgs("IMPORT INTO from select")
}
}
if kerneltype.IsNextGen() && sem.IsEnabled() {
// we put the check here, not in planner, to make sure the cloud_storage_uri
// won't change in between.
if p.IsLocalSort() {
@ -702,7 +705,7 @@ func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, option
}
}
if semv1.IsEnabled() {
if sem.IsEnabled() {
for k := range disallowedOptionsForSEM {
if _, ok := specifiedOptions[k]; ok {
return exeerrors.ErrLoadDataUnsupportedOption.GenWithStackByArgs(k, "SEM enabled")

View File

@ -4722,18 +4722,27 @@ func (b *PlanBuilder) buildImportInto(ctx context.Context, ld *ast.ImportIntoStm
return nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(ImportIntoDataSource, err.Error())
}
importFromServer = storage.IsLocal(u)
// for SEM v2, they are checked by configured rules.
if semv1.IsEnabled() {
if importFromServer {
return nil, plannererrors.ErrNotSupportedWithSem.GenWithStackByArgs("IMPORT INTO from server disk")
}
if kerneltype.IsNextGen() && storage.IsS3(u) {
newPath, err := processSemNextGenS3Path(u)
if err != nil {
if err := checkNextGenS3PathWithSem(u); err != nil {
return nil, err
}
ld.Path = newPath
}
}
// a nextgen cluster might be shared by multiple tenants, and they might
// share the same AWS role to access import-into source data bucket, this
// external ID can be used to restrict the access only to the current tenant.
// when SEM enabled, we need set it.
if kerneltype.IsNextGen() && sem.IsEnabled() && storage.IsS3(u) {
values := u.Query()
values.Set(storage.S3ExternalID, config.GetGlobalKeyspaceName())
u.RawQuery = values.Encode()
ld.Path = u.String()
}
}
for _, opt := range ld.Options {
@ -6375,21 +6384,16 @@ func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error {
// for nextgen import-into with SEM, we disallow user to set S3 external ID explicitly,
// and we will use the keyspace name as the S3 external ID.
// a nextgen cluster might be shared by multiple tenants, and they might share the
// same AWS role to access import-into source data bucket, this external ID can
// be used to restrict the access only to the current tenant.
func processSemNextGenS3Path(u *url.URL) (string, error) {
func checkNextGenS3PathWithSem(u *url.URL) error {
values := u.Query()
for k := range values {
lowerK := strings.ToLower(k)
if lowerK == storage.S3ExternalID {
return "", plannererrors.ErrNotSupportedWithSem.GenWithStackByArgs("IMPORT INTO with S3 external ID")
return plannererrors.ErrNotSupportedWithSem.GenWithStackByArgs("IMPORT INTO with S3 external ID")
}
}
values.Set(storage.S3ExternalID, config.GetGlobalKeyspaceName())
u.RawQuery = values.Encode()
return u.String(), nil
return nil
}
// GetThreadOrBatchSizeFromExpression gets the numeric value of the thread or batch size from the expression.

View File

@ -28,7 +28,6 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
@ -1113,24 +1112,14 @@ func TestGetMaxWriteSpeedFromExpression(t *testing.T) {
func TestProcessNextGenS3Path(t *testing.T) {
u, err := url.Parse("S3://bucket?External-id=abc")
require.NoError(t, err)
_, err = processSemNextGenS3Path(u)
err = checkNextGenS3PathWithSem(u)
require.ErrorIs(t, err, plannererrors.ErrNotSupportedWithSem)
require.ErrorContains(t, err, "IMPORT INTO with S3 external ID")
bak := config.GetGlobalKeyspaceName()
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = "sem-next-gen"
})
t.Cleanup(func() {
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = bak
})
})
u, err = url.Parse("s3://bucket")
require.NoError(t, err)
newPath, err := processSemNextGenS3Path(u)
err = checkNextGenS3PathWithSem(u)
require.NoError(t, err)
require.Equal(t, "s3://bucket?external-id=sem-next-gen", newPath)
}
func TestIndexLookUpReaderTryLookUpPushDown(t *testing.T) {

View File

@ -16,6 +16,7 @@ package sem
import (
"strings"
"sync/atomic"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
@ -26,11 +27,14 @@ import (
)
var (
sem *semImpl
// in normal code path, sem is not changed after initialization, but during
// UT, we have to change it multiple times, so we use atomic.Pointer here.
globalSem atomic.Pointer[semImpl]
)
// IsInvisibleSchema checks if a database is hidden under SEM rules.
func IsInvisibleSchema(dbName string) bool {
sem := globalSem.Load()
if sem == nil {
return false
}
@ -39,6 +43,7 @@ func IsInvisibleSchema(dbName string) bool {
// IsInvisibleTable checks if a table is hidden in a specific database under SEM rules.
func IsInvisibleTable(dbLowerName, tblLowerName string) bool {
sem := globalSem.Load()
if sem == nil {
return false
}
@ -47,6 +52,7 @@ func IsInvisibleTable(dbLowerName, tblLowerName string) bool {
// IsRestrictedPrivilege checks if a privilege is restricted under SEM rules.
func IsRestrictedPrivilege(privilege string) bool {
sem := globalSem.Load()
if sem == nil {
return false
}
@ -55,6 +61,7 @@ func IsRestrictedPrivilege(privilege string) bool {
// IsInvisibleSysVar checks if a system variable is hidden under SEM rules.
func IsInvisibleSysVar(varName string) bool {
sem := globalSem.Load()
if sem == nil {
return false
}
@ -63,6 +70,7 @@ func IsInvisibleSysVar(varName string) bool {
// IsReadOnlyVariable checks if a system variable is read-only under SEM rules.
func IsReadOnlyVariable(varName string) bool {
sem := globalSem.Load()
if sem == nil {
return false
}
@ -71,6 +79,7 @@ func IsReadOnlyVariable(varName string) bool {
// IsInvisibleStatusVar checks if a status variable is restricted under SEM rules.
func IsInvisibleStatusVar(varName string) bool {
sem := globalSem.Load()
if sem == nil {
return false
}
@ -79,6 +88,7 @@ func IsInvisibleStatusVar(varName string) bool {
// IsRestrictedSQL checks if a SQL statement is restricted under SEM rules.
func IsRestrictedSQL(stmt ast.StmtNode) bool {
sem := globalSem.Load()
if sem == nil {
return false
}
@ -87,19 +97,29 @@ func IsRestrictedSQL(stmt ast.StmtNode) bool {
// Enable enables SEM.
func Enable(configPath string) error {
sem := globalSem.Load()
intest.Assert(sem == nil, "SEM is already enabled")
semConfig, err := parseSEMConfigFromFile(configPath)
if err != nil {
return err
}
err = validateSEMConfig(semConfig)
return EnableBy(semConfig)
}
// EnableBy enables SEM by the given configuration.
// we add this to simplify testing.
func EnableBy(semConfig *Config) error {
sem := globalSem.Load()
intest.Assert(sem == nil, "SEM is already enabled")
err := validateSEMConfig(semConfig)
if err != nil {
return err
}
sem = buildSEMFromConfig(semConfig)
sem.overrideRestrictedVariable()
globalSem.Store(sem)
// set the system variable to indicate SEM is configured by the config file.
variable.SetSysVar(vardef.TiDBEnableEnhancedSecurity, "CONFIG")
@ -112,7 +132,13 @@ func Enable(configPath string) error {
// IsEnabled checks if Security Enhanced Mode (SEM) is enabled
func IsEnabled() bool {
return sem != nil
return globalSem.Load() != nil
}
// Disable disables SEM.
func Disable() {
globalSem.Store(nil)
variable.SetSysVar(vardef.TiDBEnableEnhancedSecurity, vardef.Off)
}
type semImpl struct {

View File

@ -139,8 +139,8 @@ func TestEnableSEM(t *testing.T) {
// Test restricted tables
require.True(t, IsInvisibleTable("mysql", "user"))
require.True(t, sem.isInvisibleTable("mysql", "db"))
require.False(t, sem.isInvisibleTable("test1", "tbl2"))
require.True(t, globalSem.Load().isInvisibleTable("mysql", "db"))
require.False(t, globalSem.Load().isInvisibleTable("test1", "tbl2"))
// Test restricted variables
require.True(t, IsInvisibleSysVar(vardef.SuperReadOnly))

View File

@ -41,7 +41,7 @@ func EnableFromPathForTest(configPath string) (func(), error) {
}
return func() {
sem = nil
Disable()
for name, value := range variableDefValue {
variable.SetSysVar(name, value)