diff --git a/pkg/planner/core/physical_plan_test.go b/pkg/planner/core/physical_plan_test.go index 65c2200fb0..d58a8073f4 100644 --- a/pkg/planner/core/physical_plan_test.go +++ b/pkg/planner/core/physical_plan_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" + "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" ) @@ -437,3 +438,60 @@ func TestPhysicalPlanMemoryTrace(t *testing.T) { pp.MPPPartitionCols = append(pp.MPPPartitionCols, &property.MPPPartitionColumn{}) require.Greater(t, pp.MemoryUsage(), size) } + +func TestPhysicalTableScanExtractCorrelatedCols(t *testing.T) { + store := testkit.CreateMockStore(t, internal.WithMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (id int, client_type tinyint, client_no char(18), taxpayer_no varchar(50), status tinyint, update_time datetime)") + tk.MustExec("alter table t1 set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "t1") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustExec("create table t2 (id int, company_no char(18), name varchar(200), tax_registry_no varchar(30))") + tk.MustExec("insert into t1(id, taxpayer_no, client_no, client_type, status, update_time) values (1, 'TAX001', 'Z9005', 1, 1, '2024-02-18 10:00:00'), (2, 'TAX002', 'Z9005', 1, 0, '2024-02-18 09:00:00'), (3, 'TAX003', 'Z9005', 2, 1, '2024-02-18 08:00:00'), (4, 'TAX004', 'Z9006', 1, 1, '2024-02-18 12:00:00')") + tk.MustExec("insert into t2(id, company_no, name, tax_registry_no) values (1, 'Z9005', 'AA', 'aaa'), (2, 'Z9006', 'BB', 'bbb'), (3, 'Z9007', 'CC', 'ccc')") + + sql := "select company_no, ifnull((select /*+ read_from_storage(tiflash[test.t1]) */ taxpayer_no from test.t1 where client_no = c.company_no and client_type = 1 and status = 1 order by update_time desc limit 1), tax_registry_no) as tax_registry_no from test.t2 c where company_no = 'Z9005' limit 1" + tk.MustExec(sql) + info := tk.Session().ShowProcess() + require.NotNil(t, info) + p, ok := info.Plan.(core.Plan) + require.True(t, ok) + + var findTableScan func(p core.Plan) *core.PhysicalTableScan + findTableScan = func(p core.Plan) *core.PhysicalTableScan { + if p == nil { + return nil + } + switch v := p.(type) { + case *core.PhysicalTableScan: + if v.Table.Name.L == "t1" { + return v + } + return nil + case *core.PhysicalTableReader: + return findTableScan(v.TablePlans[0]) + default: + physicayPlan := p.(core.PhysicalPlan) + for _, child := range physicayPlan.Children() { + if ts := findTableScan(child); ts != nil { + return ts + } + } + return nil + } + } + ts := findTableScan(p) + require.NotNil(t, ts) + + pb, err := ts.ToPB(tk.Session(), kv.TiFlash) + require.NoError(t, err) + // make sure the pushed down filter condition is correct + require.Equal(t, 1, len(pb.TblScan.PushedDownFilterConditions)) + require.Equal(t, tipb.ExprType_ColumnRef, pb.TblScan.PushedDownFilterConditions[0].Children[0].Tp) + // make sure the correlated columns are extracted correctly + correlated := ts.ExtractCorrelatedCols() + require.Equal(t, 1, len(correlated)) + require.Equal(t, "test.t2.company_no", correlated[0].String()) +} diff --git a/pkg/planner/core/physical_plans.go b/pkg/planner/core/physical_plans.go index 14e474ace7..043b9aaec8 100644 --- a/pkg/planner/core/physical_plans.go +++ b/pkg/planner/core/physical_plans.go @@ -918,6 +918,7 @@ func (ts *PhysicalTableScan) Clone() (PhysicalPlan, error) { clonedScan.physicalSchemaProducer = *prod clonedScan.AccessCondition = util.CloneExprs(ts.AccessCondition) clonedScan.filterCondition = util.CloneExprs(ts.filterCondition) + clonedScan.LateMaterializationFilterCondition = util.CloneExprs(ts.LateMaterializationFilterCondition) if ts.Table != nil { clonedScan.Table = ts.Table.Clone() } @@ -935,11 +936,11 @@ func (ts *PhysicalTableScan) Clone() (PhysicalPlan, error) { // ExtractCorrelatedCols implements PhysicalPlan interface. func (ts *PhysicalTableScan) ExtractCorrelatedCols() []*expression.CorrelatedColumn { - corCols := make([]*expression.CorrelatedColumn, 0, len(ts.AccessCondition)+len(ts.filterCondition)) + corCols := make([]*expression.CorrelatedColumn, 0, len(ts.AccessCondition)+len(ts.LateMaterializationFilterCondition)) for _, expr := range ts.AccessCondition { corCols = append(corCols, expression.ExtractCorColumns(expr)...) } - for _, expr := range ts.filterCondition { + for _, expr := range ts.LateMaterializationFilterCondition { corCols = append(corCols, expression.ExtractCorColumns(expr)...) } return corCols @@ -1049,6 +1050,9 @@ func (ts *PhysicalTableScan) MemoryUsage() (sum int64) { for _, cond := range ts.filterCondition { sum += cond.MemoryUsage() } + for _, cond := range ts.LateMaterializationFilterCondition { + sum += cond.MemoryUsage() + } for _, rang := range ts.Ranges { sum += rang.MemUsage() }