session, variables: Stream agg concurrency variable (#20960)
This commit is contained in:
@ -989,6 +989,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) {
|
||||
tk.MustQuery("select @@tidb_hashagg_partial_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset)))
|
||||
tk.MustQuery("select @@tidb_hashagg_final_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset)))
|
||||
tk.MustQuery("select @@tidb_window_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset)))
|
||||
tk.MustQuery("select @@tidb_streamagg_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.DefTiDBStreamAggConcurrency)))
|
||||
tk.MustQuery("select @@tidb_projection_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset)))
|
||||
tk.MustQuery("select @@tidb_distsql_scan_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.DefDistSQLScanConcurrency)))
|
||||
|
||||
@ -1002,6 +1003,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) {
|
||||
c.Assert(vars.HashAggPartialConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.HashAggFinalConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.WindowConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.StreamAggConcurrency(), Equals, variable.DefTiDBStreamAggConcurrency)
|
||||
c.Assert(vars.ProjectionConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.DistSQLScanConcurrency(), Equals, variable.DefDistSQLScanConcurrency)
|
||||
|
||||
@ -1037,6 +1039,9 @@ func (s *testSuite5) TestSetConcurrency(c *C) {
|
||||
checkSet(variable.TiDBWindowConcurrency)
|
||||
c.Assert(vars.WindowConcurrency(), Equals, 1)
|
||||
|
||||
checkSet(variable.TiDBStreamAggConcurrency)
|
||||
c.Assert(vars.StreamAggConcurrency(), Equals, 1)
|
||||
|
||||
tk.MustExec(fmt.Sprintf("set @@%s=1;", variable.TiDBDistSQLScanConcurrency))
|
||||
tk.MustQuery(fmt.Sprintf("select @@%s;", variable.TiDBDistSQLScanConcurrency)).Check(testkit.Rows("1"))
|
||||
c.Assert(vars.DistSQLScanConcurrency(), Equals, 1)
|
||||
@ -1053,6 +1058,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) {
|
||||
tk.MustExec("set @@tidb_hashagg_partial_concurrency=-1;")
|
||||
tk.MustExec("set @@tidb_hashagg_final_concurrency=-1;")
|
||||
tk.MustExec("set @@tidb_window_concurrency=-1;")
|
||||
tk.MustExec("set @@tidb_streamagg_concurrency=-1;")
|
||||
tk.MustExec("set @@tidb_projection_concurrency=-1;")
|
||||
|
||||
c.Assert(vars.IndexLookupConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
@ -1061,6 +1067,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) {
|
||||
c.Assert(vars.HashAggPartialConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.HashAggFinalConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.WindowConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.StreamAggConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
c.Assert(vars.ProjectionConcurrency(), Equals, variable.DefExecutorConcurrency)
|
||||
|
||||
_, err := tk.Exec("set @@tidb_executor_concurrency=-1;")
|
||||
|
||||
@ -2157,6 +2157,7 @@ var builtinGlobalVariable = []string{
|
||||
variable.TiDBHashAggPartialConcurrency,
|
||||
variable.TiDBHashAggFinalConcurrency,
|
||||
variable.TiDBWindowConcurrency,
|
||||
variable.TiDBStreamAggConcurrency,
|
||||
variable.TiDBExecutorConcurrency,
|
||||
variable.TiDBBackoffLockFast,
|
||||
variable.TiDBBackOffWeight,
|
||||
|
||||
@ -873,6 +873,7 @@ func NewSessionVars() *SessionVars {
|
||||
hashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency,
|
||||
hashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency,
|
||||
windowConcurrency: DefTiDBWindowConcurrency,
|
||||
streamAggConcurrency: DefTiDBStreamAggConcurrency,
|
||||
ExecutorConcurrency: DefExecutorConcurrency,
|
||||
}
|
||||
vars.MemQuota = MemQuota{
|
||||
@ -1327,6 +1328,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
|
||||
s.hashAggFinalConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
|
||||
case TiDBWindowConcurrency:
|
||||
s.windowConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
|
||||
case TiDBStreamAggConcurrency:
|
||||
s.streamAggConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset)
|
||||
case TiDBDistSQLScanConcurrency:
|
||||
s.distSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency)
|
||||
case TiDBIndexSerialScanConcurrency:
|
||||
@ -1660,6 +1663,10 @@ type Concurrency struct {
|
||||
// windowConcurrency is deprecated, use ExecutorConcurrency instead.
|
||||
windowConcurrency int
|
||||
|
||||
// streamAggConcurrency is the number of concurrent stream aggregation worker.
|
||||
// streamAggConcurrency is deprecated, use ExecutorConcurrency instead.
|
||||
streamAggConcurrency int
|
||||
|
||||
// indexSerialScanConcurrency is the number of concurrent index serial scan worker.
|
||||
indexSerialScanConcurrency int
|
||||
|
||||
@ -1710,6 +1717,11 @@ func (c *Concurrency) SetWindowConcurrency(n int) {
|
||||
c.windowConcurrency = n
|
||||
}
|
||||
|
||||
// SetStreamAggConcurrency set the number of concurrent stream aggregation worker.
|
||||
func (c *Concurrency) SetStreamAggConcurrency(n int) {
|
||||
c.streamAggConcurrency = n
|
||||
}
|
||||
|
||||
// SetIndexSerialScanConcurrency set the number of concurrent index serial scan worker.
|
||||
func (c *Concurrency) SetIndexSerialScanConcurrency(n int) {
|
||||
c.indexSerialScanConcurrency = n
|
||||
@ -1776,6 +1788,14 @@ func (c *Concurrency) WindowConcurrency() int {
|
||||
return c.ExecutorConcurrency
|
||||
}
|
||||
|
||||
// StreamAggConcurrency return the number of concurrent stream aggregation worker.
|
||||
func (c *Concurrency) StreamAggConcurrency() int {
|
||||
if c.streamAggConcurrency != ConcurrencyUnset {
|
||||
return c.streamAggConcurrency
|
||||
}
|
||||
return c.ExecutorConcurrency
|
||||
}
|
||||
|
||||
// IndexSerialScanConcurrency return the number of concurrent index serial scan worker.
|
||||
// This option is not sync with ExecutorConcurrency since it's used by Analyze table.
|
||||
func (c *Concurrency) IndexSerialScanConcurrency() int {
|
||||
|
||||
@ -1017,6 +1017,7 @@ var defaultSysVars = []*SysVar{
|
||||
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
|
||||
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
|
||||
{Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
|
||||
{Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true},
|
||||
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableParallelApply, Value: BoolToOnOff(DefTiDBEnableParallelApply), Type: TypeBool},
|
||||
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBackoffLockFast, Value: strconv.Itoa(kv.DefBackoffLockFast), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64},
|
||||
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBackOffWeight, Value: strconv.Itoa(kv.DefBackOffWeight), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64},
|
||||
|
||||
@ -312,6 +312,10 @@ const (
|
||||
// tidb_window_concurrency is deprecated, use tidb_executor_concurrency instead.
|
||||
TiDBWindowConcurrency = "tidb_window_concurrency"
|
||||
|
||||
// tidb_stream_agg_concurrency is used for stream aggregation parallel executor.
|
||||
// tidb_stream_agg_concurrency is deprecated, use tidb_executor_concurrency instead.
|
||||
TiDBStreamAggConcurrency = "tidb_streamagg_concurrency"
|
||||
|
||||
// tidb_enable_parallel_apply is used for parallel apply.
|
||||
TiDBEnableParallelApply = "tidb_enable_parallel_apply"
|
||||
|
||||
@ -531,6 +535,7 @@ const (
|
||||
DefTiDBHashAggPartialConcurrency = ConcurrencyUnset
|
||||
DefTiDBHashAggFinalConcurrency = ConcurrencyUnset
|
||||
DefTiDBWindowConcurrency = ConcurrencyUnset
|
||||
DefTiDBStreamAggConcurrency = 1
|
||||
DefTiDBForcePriority = mysql.NoPriority
|
||||
DefTiDBUseRadixJoin = false
|
||||
DefEnableWindowFunction = true
|
||||
|
||||
@ -267,7 +267,7 @@ func CheckDeprecationSetSystemVar(s *SessionVars, name string) {
|
||||
switch name {
|
||||
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency,
|
||||
TiDBHashJoinConcurrency, TiDBHashAggPartialConcurrency, TiDBHashAggFinalConcurrency,
|
||||
TiDBProjectionConcurrency, TiDBWindowConcurrency:
|
||||
TiDBProjectionConcurrency, TiDBWindowConcurrency, TiDBStreamAggConcurrency:
|
||||
s.StmtCtx.AppendWarning(errWarnDeprecatedSyntax.FastGenByArgs(name, TiDBExecutorConcurrency))
|
||||
case TIDBMemQuotaHashJoin, TIDBMemQuotaMergeJoin,
|
||||
TIDBMemQuotaSort, TIDBMemQuotaTopn,
|
||||
|
||||
@ -74,11 +74,13 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
|
||||
c.Assert(vars.hashAggPartialConcurrency, Equals, ConcurrencyUnset)
|
||||
c.Assert(vars.hashAggFinalConcurrency, Equals, ConcurrencyUnset)
|
||||
c.Assert(vars.windowConcurrency, Equals, ConcurrencyUnset)
|
||||
c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency)
|
||||
c.Assert(vars.distSQLScanConcurrency, Equals, DefDistSQLScanConcurrency)
|
||||
c.Assert(vars.ProjectionConcurrency(), Equals, DefExecutorConcurrency)
|
||||
c.Assert(vars.HashAggPartialConcurrency(), Equals, DefExecutorConcurrency)
|
||||
c.Assert(vars.HashAggFinalConcurrency(), Equals, DefExecutorConcurrency)
|
||||
c.Assert(vars.WindowConcurrency(), Equals, DefExecutorConcurrency)
|
||||
c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency)
|
||||
c.Assert(vars.DistSQLScanConcurrency(), Equals, DefDistSQLScanConcurrency)
|
||||
c.Assert(vars.ExecutorConcurrency, Equals, DefExecutorConcurrency)
|
||||
c.Assert(vars.MaxChunkSize, Equals, DefMaxChunkSize)
|
||||
@ -657,6 +659,14 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) {
|
||||
c.Assert(vars.windowConcurrency, Equals, wdConcurrency)
|
||||
c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency)
|
||||
|
||||
saConcurrency := 2
|
||||
c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency)
|
||||
c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency)
|
||||
err = SetSessionSystemVar(vars, TiDBStreamAggConcurrency, types.NewIntDatum(int64(saConcurrency)))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(vars.streamAggConcurrency, Equals, saConcurrency)
|
||||
c.Assert(vars.StreamAggConcurrency(), Equals, saConcurrency)
|
||||
|
||||
c.Assert(vars.indexLookupConcurrency, Equals, ConcurrencyUnset)
|
||||
c.Assert(vars.IndexLookupConcurrency(), Equals, DefExecutorConcurrency)
|
||||
exeConcurrency := DefExecutorConcurrency + 1
|
||||
@ -665,4 +675,6 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) {
|
||||
c.Assert(vars.indexLookupConcurrency, Equals, ConcurrencyUnset)
|
||||
c.Assert(vars.IndexLookupConcurrency(), Equals, exeConcurrency)
|
||||
c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency)
|
||||
c.Assert(vars.StreamAggConcurrency(), Equals, saConcurrency)
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user