planner: fix ExtractCorrelatedCols method in PhysicalTableScan (#51205)
close pingcap/tidb#51204
This commit is contained in:
@ -36,6 +36,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/testkit"
|
||||
"github.com/pingcap/tidb/pkg/testkit/external"
|
||||
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -437,3 +438,60 @@ func TestPhysicalPlanMemoryTrace(t *testing.T) {
|
||||
pp.MPPPartitionCols = append(pp.MPPPartitionCols, &property.MPPPartitionColumn{})
|
||||
require.Greater(t, pp.MemoryUsage(), size)
|
||||
}
|
||||
|
||||
func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) {
|
||||
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(2))
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("create table t1 (id int, client_type tinyint, client_no char(18), taxpayer_no varchar(50), status tinyint, update_time datetime)")
|
||||
tk.MustExec("alter table t1 set tiflash replica 1")
|
||||
tb := external.GetTableByName(t, tk, "test", "t1")
|
||||
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
|
||||
require.NoError(t, err)
|
||||
tk.MustExec("create table t2 (id int, company_no char(18), name varchar(200), tax_registry_no varchar(30))")
|
||||
tk.MustExec("insert into t1(id, taxpayer_no, client_no, client_type, status, update_time) values (1, 'TAX001', 'Z9005', 1, 1, '2024-02-18 10:00:00'), (2, 'TAX002', 'Z9005', 1, 0, '2024-02-18 09:00:00'), (3, 'TAX003', 'Z9005', 2, 1, '2024-02-18 08:00:00'), (4, 'TAX004', 'Z9006', 1, 1, '2024-02-18 12:00:00')")
|
||||
tk.MustExec("insert into t2(id, company_no, name, tax_registry_no) values (1, 'Z9005', 'AA', 'aaa'), (2, 'Z9006', 'BB', 'bbb'), (3, 'Z9007', 'CC', 'ccc')")
|
||||
|
||||
sql := "select company_no, ifnull((select /*+ read_from_storage(tiflash[test.t1]) */ taxpayer_no from test.t1 where client_no = c.company_no and client_type = 1 and status = 1 order by update_time desc limit 1), tax_registry_no) as tax_registry_no from test.t2 c where company_no = 'Z9005' limit 1"
|
||||
tk.MustExec(sql)
|
||||
info := tk.Session().ShowProcess()
|
||||
require.NotNil(t, info)
|
||||
p, ok := info.Plan.(core.Plan)
|
||||
require.True(t, ok)
|
||||
|
||||
var findTableScan func(p core.Plan) *core.PhysicalTableScan
|
||||
findTableScan = func(p core.Plan) *core.PhysicalTableScan {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
switch v := p.(type) {
|
||||
case *core.PhysicalTableScan:
|
||||
if v.Table.Name.L == "t1" {
|
||||
return v
|
||||
}
|
||||
return nil
|
||||
case *core.PhysicalTableReader:
|
||||
return findTableScan(v.TablePlans[0])
|
||||
default:
|
||||
physicayPlan := p.(core.PhysicalPlan)
|
||||
for _, child := range physicayPlan.Children() {
|
||||
if ts := findTableScan(child); ts != nil {
|
||||
return ts
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
ts := findTableScan(p)
|
||||
require.NotNil(t, ts)
|
||||
|
||||
pb, err := ts.ToPB(tk.Session(), kv.TiFlash)
|
||||
require.NoError(t, err)
|
||||
// make sure the pushed down filter condition is correct
|
||||
require.Equal(t, 1, len(pb.TblScan.PushedDownFilterConditions))
|
||||
require.Equal(t, tipb.ExprType_ColumnRef, pb.TblScan.PushedDownFilterConditions[0].Children[0].Tp)
|
||||
// make sure the correlated columns are extracted correctly
|
||||
correlated := ts.ExtractCorrelatedCols()
|
||||
require.Equal(t, 1, len(correlated))
|
||||
require.Equal(t, "test.t2.company_no", correlated[0].String())
|
||||
}
|
||||
|
||||
@ -918,6 +918,7 @@ func (ts *PhysicalTableScan) Clone() (PhysicalPlan, error) {
|
||||
clonedScan.physicalSchemaProducer = *prod
|
||||
clonedScan.AccessCondition = util.CloneExprs(ts.AccessCondition)
|
||||
clonedScan.filterCondition = util.CloneExprs(ts.filterCondition)
|
||||
clonedScan.LateMaterializationFilterCondition = util.CloneExprs(ts.LateMaterializationFilterCondition)
|
||||
if ts.Table != nil {
|
||||
clonedScan.Table = ts.Table.Clone()
|
||||
}
|
||||
@ -935,11 +936,11 @@ func (ts *PhysicalTableScan) Clone() (PhysicalPlan, error) {
|
||||
|
||||
// ExtractCorrelatedCols implements PhysicalPlan interface.
|
||||
func (ts *PhysicalTableScan) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := make([]*expression.CorrelatedColumn, 0, len(ts.AccessCondition)+len(ts.filterCondition))
|
||||
corCols := make([]*expression.CorrelatedColumn, 0, len(ts.AccessCondition)+len(ts.LateMaterializationFilterCondition))
|
||||
for _, expr := range ts.AccessCondition {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(expr)...)
|
||||
}
|
||||
for _, expr := range ts.filterCondition {
|
||||
for _, expr := range ts.LateMaterializationFilterCondition {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(expr)...)
|
||||
}
|
||||
return corCols
|
||||
@ -1049,6 +1050,9 @@ func (ts *PhysicalTableScan) MemoryUsage() (sum int64) {
|
||||
for _, cond := range ts.filterCondition {
|
||||
sum += cond.MemoryUsage()
|
||||
}
|
||||
for _, cond := range ts.LateMaterializationFilterCondition {
|
||||
sum += cond.MemoryUsage()
|
||||
}
|
||||
for _, rang := range ts.Ranges {
|
||||
sum += rang.MemUsage()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user