diff --git a/executor/set_test.go b/executor/set_test.go index acd45dfeb5..7efac55965 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -989,6 +989,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) { tk.MustQuery("select @@tidb_hashagg_partial_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset))) tk.MustQuery("select @@tidb_hashagg_final_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset))) tk.MustQuery("select @@tidb_window_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset))) + tk.MustQuery("select @@tidb_streamagg_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.DefTiDBStreamAggConcurrency))) tk.MustQuery("select @@tidb_projection_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.ConcurrencyUnset))) tk.MustQuery("select @@tidb_distsql_scan_concurrency;").Check(testkit.Rows(strconv.Itoa(variable.DefDistSQLScanConcurrency))) @@ -1002,6 +1003,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) { c.Assert(vars.HashAggPartialConcurrency(), Equals, variable.DefExecutorConcurrency) c.Assert(vars.HashAggFinalConcurrency(), Equals, variable.DefExecutorConcurrency) c.Assert(vars.WindowConcurrency(), Equals, variable.DefExecutorConcurrency) + c.Assert(vars.StreamAggConcurrency(), Equals, variable.DefTiDBStreamAggConcurrency) c.Assert(vars.ProjectionConcurrency(), Equals, variable.DefExecutorConcurrency) c.Assert(vars.DistSQLScanConcurrency(), Equals, variable.DefDistSQLScanConcurrency) @@ -1037,6 +1039,9 @@ func (s *testSuite5) TestSetConcurrency(c *C) { checkSet(variable.TiDBWindowConcurrency) c.Assert(vars.WindowConcurrency(), Equals, 1) + checkSet(variable.TiDBStreamAggConcurrency) + c.Assert(vars.StreamAggConcurrency(), Equals, 1) + tk.MustExec(fmt.Sprintf("set @@%s=1;", variable.TiDBDistSQLScanConcurrency)) tk.MustQuery(fmt.Sprintf("select @@%s;", variable.TiDBDistSQLScanConcurrency)).Check(testkit.Rows("1")) c.Assert(vars.DistSQLScanConcurrency(), Equals, 1) @@ -1053,6 +1058,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) { tk.MustExec("set @@tidb_hashagg_partial_concurrency=-1;") tk.MustExec("set @@tidb_hashagg_final_concurrency=-1;") tk.MustExec("set @@tidb_window_concurrency=-1;") + tk.MustExec("set @@tidb_streamagg_concurrency=-1;") tk.MustExec("set @@tidb_projection_concurrency=-1;") c.Assert(vars.IndexLookupConcurrency(), Equals, variable.DefExecutorConcurrency) @@ -1061,6 +1067,7 @@ func (s *testSuite5) TestSetConcurrency(c *C) { c.Assert(vars.HashAggPartialConcurrency(), Equals, variable.DefExecutorConcurrency) c.Assert(vars.HashAggFinalConcurrency(), Equals, variable.DefExecutorConcurrency) c.Assert(vars.WindowConcurrency(), Equals, variable.DefExecutorConcurrency) + c.Assert(vars.StreamAggConcurrency(), Equals, variable.DefExecutorConcurrency) c.Assert(vars.ProjectionConcurrency(), Equals, variable.DefExecutorConcurrency) _, err := tk.Exec("set @@tidb_executor_concurrency=-1;") diff --git a/session/session.go b/session/session.go index 3c3e5aeb28..7d3d0044c8 100644 --- a/session/session.go +++ b/session/session.go @@ -2157,6 +2157,7 @@ var builtinGlobalVariable = []string{ variable.TiDBHashAggPartialConcurrency, variable.TiDBHashAggFinalConcurrency, variable.TiDBWindowConcurrency, + variable.TiDBStreamAggConcurrency, variable.TiDBExecutorConcurrency, variable.TiDBBackoffLockFast, variable.TiDBBackOffWeight, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0a28071b45..22be00689e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -873,6 +873,7 @@ func NewSessionVars() *SessionVars { hashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency, hashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency, windowConcurrency: DefTiDBWindowConcurrency, + streamAggConcurrency: DefTiDBStreamAggConcurrency, ExecutorConcurrency: DefExecutorConcurrency, } vars.MemQuota = MemQuota{ @@ -1327,6 +1328,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.hashAggFinalConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) case TiDBWindowConcurrency: s.windowConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) + case TiDBStreamAggConcurrency: + s.streamAggConcurrency = tidbOptPositiveInt32(val, ConcurrencyUnset) case TiDBDistSQLScanConcurrency: s.distSQLScanConcurrency = tidbOptPositiveInt32(val, DefDistSQLScanConcurrency) case TiDBIndexSerialScanConcurrency: @@ -1660,6 +1663,10 @@ type Concurrency struct { // windowConcurrency is deprecated, use ExecutorConcurrency instead. windowConcurrency int + // streamAggConcurrency is the number of concurrent stream aggregation worker. + // streamAggConcurrency is deprecated, use ExecutorConcurrency instead. + streamAggConcurrency int + // indexSerialScanConcurrency is the number of concurrent index serial scan worker. indexSerialScanConcurrency int @@ -1710,6 +1717,11 @@ func (c *Concurrency) SetWindowConcurrency(n int) { c.windowConcurrency = n } +// SetStreamAggConcurrency set the number of concurrent stream aggregation worker. +func (c *Concurrency) SetStreamAggConcurrency(n int) { + c.streamAggConcurrency = n +} + // SetIndexSerialScanConcurrency set the number of concurrent index serial scan worker. func (c *Concurrency) SetIndexSerialScanConcurrency(n int) { c.indexSerialScanConcurrency = n @@ -1776,6 +1788,14 @@ func (c *Concurrency) WindowConcurrency() int { return c.ExecutorConcurrency } +// StreamAggConcurrency return the number of concurrent stream aggregation worker. +func (c *Concurrency) StreamAggConcurrency() int { + if c.streamAggConcurrency != ConcurrencyUnset { + return c.streamAggConcurrency + } + return c.ExecutorConcurrency +} + // IndexSerialScanConcurrency return the number of concurrent index serial scan worker. // This option is not sync with ExecutorConcurrency since it's used by Analyze table. func (c *Concurrency) IndexSerialScanConcurrency() int { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index efca5c4831..d6e0e5f57e 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1017,6 +1017,7 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggPartialConcurrency, Value: strconv.Itoa(DefTiDBHashAggPartialConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBHashAggFinalConcurrency, Value: strconv.Itoa(DefTiDBHashAggFinalConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBWindowConcurrency, Value: strconv.Itoa(DefTiDBWindowConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBStreamAggConcurrency, Value: strconv.Itoa(DefTiDBStreamAggConcurrency), Type: TypeInt, MinValue: 1, MaxValue: math.MaxInt64, AllowAutoValue: true}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableParallelApply, Value: BoolToOnOff(DefTiDBEnableParallelApply), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBackoffLockFast, Value: strconv.Itoa(kv.DefBackoffLockFast), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBBackOffWeight, Value: strconv.Itoa(kv.DefBackOffWeight), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 4bb06d0d2b..5e93fb1566 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -312,6 +312,10 @@ const ( // tidb_window_concurrency is deprecated, use tidb_executor_concurrency instead. TiDBWindowConcurrency = "tidb_window_concurrency" + // tidb_stream_agg_concurrency is used for stream aggregation parallel executor. + // tidb_stream_agg_concurrency is deprecated, use tidb_executor_concurrency instead. + TiDBStreamAggConcurrency = "tidb_streamagg_concurrency" + // tidb_enable_parallel_apply is used for parallel apply. TiDBEnableParallelApply = "tidb_enable_parallel_apply" @@ -531,6 +535,7 @@ const ( DefTiDBHashAggPartialConcurrency = ConcurrencyUnset DefTiDBHashAggFinalConcurrency = ConcurrencyUnset DefTiDBWindowConcurrency = ConcurrencyUnset + DefTiDBStreamAggConcurrency = 1 DefTiDBForcePriority = mysql.NoPriority DefTiDBUseRadixJoin = false DefEnableWindowFunction = true diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index f049d01577..8c757ab81f 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -267,7 +267,7 @@ func CheckDeprecationSetSystemVar(s *SessionVars, name string) { switch name { case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBHashJoinConcurrency, TiDBHashAggPartialConcurrency, TiDBHashAggFinalConcurrency, - TiDBProjectionConcurrency, TiDBWindowConcurrency: + TiDBProjectionConcurrency, TiDBWindowConcurrency, TiDBStreamAggConcurrency: s.StmtCtx.AppendWarning(errWarnDeprecatedSyntax.FastGenByArgs(name, TiDBExecutorConcurrency)) case TIDBMemQuotaHashJoin, TIDBMemQuotaMergeJoin, TIDBMemQuotaSort, TIDBMemQuotaTopn, diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 14be2e8dd7..b458d0787d 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -74,11 +74,13 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.hashAggPartialConcurrency, Equals, ConcurrencyUnset) c.Assert(vars.hashAggFinalConcurrency, Equals, ConcurrencyUnset) c.Assert(vars.windowConcurrency, Equals, ConcurrencyUnset) + c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency) c.Assert(vars.distSQLScanConcurrency, Equals, DefDistSQLScanConcurrency) c.Assert(vars.ProjectionConcurrency(), Equals, DefExecutorConcurrency) c.Assert(vars.HashAggPartialConcurrency(), Equals, DefExecutorConcurrency) c.Assert(vars.HashAggFinalConcurrency(), Equals, DefExecutorConcurrency) c.Assert(vars.WindowConcurrency(), Equals, DefExecutorConcurrency) + c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency) c.Assert(vars.DistSQLScanConcurrency(), Equals, DefDistSQLScanConcurrency) c.Assert(vars.ExecutorConcurrency, Equals, DefExecutorConcurrency) c.Assert(vars.MaxChunkSize, Equals, DefMaxChunkSize) @@ -657,6 +659,14 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) { c.Assert(vars.windowConcurrency, Equals, wdConcurrency) c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency) + saConcurrency := 2 + c.Assert(vars.streamAggConcurrency, Equals, DefTiDBStreamAggConcurrency) + c.Assert(vars.StreamAggConcurrency(), Equals, DefTiDBStreamAggConcurrency) + err = SetSessionSystemVar(vars, TiDBStreamAggConcurrency, types.NewIntDatum(int64(saConcurrency))) + c.Assert(err, IsNil) + c.Assert(vars.streamAggConcurrency, Equals, saConcurrency) + c.Assert(vars.StreamAggConcurrency(), Equals, saConcurrency) + c.Assert(vars.indexLookupConcurrency, Equals, ConcurrencyUnset) c.Assert(vars.IndexLookupConcurrency(), Equals, DefExecutorConcurrency) exeConcurrency := DefExecutorConcurrency + 1 @@ -665,4 +675,6 @@ func (s *testVarsutilSuite) TestConcurrencyVariables(c *C) { c.Assert(vars.indexLookupConcurrency, Equals, ConcurrencyUnset) c.Assert(vars.IndexLookupConcurrency(), Equals, exeConcurrency) c.Assert(vars.WindowConcurrency(), Equals, wdConcurrency) + c.Assert(vars.StreamAggConcurrency(), Equals, saConcurrency) + }