diff --git a/executor/executor_test.go b/executor/executor_test.go index a9eae5a760..e85d2c4349 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1111,7 +1111,7 @@ func (s *testSuite) TestJoin(c *C) { result.Check(testkit.Rows()) result = tk.MustQuery("select * from t left outer join t1 on t.c1 = t1.c1 where t1.c1 = 3 or false") result.Check(testkit.Rows()) - result = tk.MustQuery("select * from t left outer join t1 on t.c1 = t1.c1 and t.c1 != 1") + result = tk.MustQuery("select * from t left outer join t1 on t.c1 = t1.c1 and t.c1 != 1 order by t1.c1") result.Check(testkit.Rows("1 1 ", "2 2 2 3")) tk.MustExec("drop table if exists t1") diff --git a/executor/new_builder.go b/executor/new_builder.go index ea38fdc343..6997f358be 100644 --- a/executor/new_builder.go +++ b/executor/new_builder.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tipb/go-tipb" ) -//TODO: select join algorithm during cbo phase. func (b *executorBuilder) buildJoin(v *plan.PhysicalHashJoin) Executor { var leftHashKey, rightHashKey []*expression.Column var targetTypes []*types.FieldType @@ -46,6 +45,7 @@ func (b *executorBuilder) buildJoin(v *plan.PhysicalHashJoin) Executor { prepared: false, ctx: b.ctx, targetTypes: targetTypes, + concurrency: v.Concurrency, } if v.SmallTable == 1 { e.smallFilter = expression.ComposeCNFCondition(v.RightConditions) @@ -70,6 +70,18 @@ func (b *executorBuilder) buildJoin(v *plan.PhysicalHashJoin) Executor { e.smallExec = b.build(v.GetChildByIndex(1)) e.bigExec = b.build(v.GetChildByIndex(0)) } + for i := 0; i < e.concurrency; i++ { + ctx := &hashJoinCtx{} + if e.bigFilter != nil { + ctx.bigFilter = e.bigFilter.DeepCopy() + } + if e.otherFilter != nil { + ctx.otherFilter = e.otherFilter.DeepCopy() + } + ctx.datumBuffer = make([]types.Datum, len(e.bigHashKey)) + ctx.hashKeyBuffer = make([]byte, 0, 10000) + e.hashJoinContexts = append(e.hashJoinContexts, ctx) + } return e } @@ -267,6 +279,7 @@ func (b *executorBuilder) buildNewTableScan(v *plan.PhysicalTableScan, s *plan.S st := &NewXSelectTableExec{ tableInfo: v.Table, ctx: b.ctx, + txn: txn, supportDesc: supportDesc, asName: v.TableAsName, table: table, @@ -330,6 +343,7 @@ func (b *executorBuilder) buildNewIndexScan(v *plan.PhysicalIndexScan, s *plan.S asName: v.TableAsName, table: table, indexPlan: v, + txn: txn, } ret = st if !txn.IsReadOnly() { diff --git a/executor/new_executor.go b/executor/new_executor.go index 37289dee5b..c3c7603d0f 100644 --- a/executor/new_executor.go +++ b/executor/new_executor.go @@ -15,6 +15,7 @@ package executor import ( "sort" + "sync" "github.com/juju/errors" "github.com/pingcap/tidb/ast" @@ -45,22 +46,38 @@ type HashJoinExec struct { schema expression.Schema outer bool leftSmall bool - matchedRows []*Row cursor int // targetTypes means the target the type that both smallHashKey and bigHashKey should convert to. targetTypes []*types.FieldType + + finished bool + // for sync multiple join workers. + wg sync.WaitGroup + + // Concurrent channels. + concurrency int + bigTableRows []chan []*Row + bigTableErr chan error + hashJoinContexts []*hashJoinCtx + + // Channels for output. + resultErr chan error + resultRows chan *Row +} + +type hashJoinCtx struct { + bigFilter expression.Expression + otherFilter expression.Expression + // Buffer used for encode hash keys. + datumBuffer []types.Datum + hashKeyBuffer []byte } // Close implements Executor Close interface. func (e *HashJoinExec) Close() error { e.prepared = false e.cursor = 0 - e.matchedRows = nil - err := e.smallExec.Close() - if err != nil { - return errors.Trace(err) - } - return e.bigExec.Close() + return e.smallExec.Close() } func joinTwoRow(a *Row, b *Row) *Row { @@ -77,28 +94,27 @@ func joinTwoRow(a *Row, b *Row) *Row { // getHashKey gets the hash key when given a row and hash columns. // It will return a boolean value representing if the hash key has null, a byte slice representing the result hash code. -func getHashKey(exprs []*expression.Column, row *Row, targetTypes []*types.FieldType) (bool, []byte, error) { - vals := make([]types.Datum, 0, len(exprs)) - for i, expr := range exprs { - v, err := expr.Eval(row.Data, nil) +func getHashKey(cols []*expression.Column, row *Row, targetTypes []*types.FieldType, vals []types.Datum, bytes []byte) (bool, []byte, error) { + var err error + for i, col := range cols { + vals[i], err = col.Eval(row.Data, nil) if err != nil { return false, nil, errors.Trace(err) } - if v.IsNull() { + if vals[i].IsNull() { return true, nil, nil } - if targetTypes[i].Tp != expr.RetType.Tp { - v, err = v.ConvertTo(targetTypes[i]) + if targetTypes[i].Tp != col.RetType.Tp { + vals[i], err = vals[i].ConvertTo(targetTypes[i]) if err != nil { return false, nil, errors.Trace(err) } } - vals = append(vals, v) } if len(vals) == 0 { return false, nil, nil } - bytes, err := codec.EncodeValue([]byte{}, vals...) + bytes, err = codec.EncodeValue(bytes, vals...) return false, bytes, errors.Trace(err) } @@ -112,7 +128,60 @@ func (e *HashJoinExec) Fields() []*ast.ResultField { return nil } +var batchSize = 128 + +// Worker to get big table rows. +func (e *HashJoinExec) fetchBigExec() { + cnt := 0 + defer func() { + for _, cn := range e.bigTableRows { + close(cn) + } + e.bigExec.Close() + }() + curBatchSize := 1 + for { + if e.finished { + break + } + rows := make([]*Row, 0, batchSize) + done := false + for i := 0; i < curBatchSize; i++ { + row, err := e.bigExec.Next() + if err != nil { + e.bigTableErr <- errors.Trace(err) + done = true + break + } + if row == nil { + done = true + break + } + rows = append(rows, row) + } + idx := cnt % e.concurrency + e.bigTableRows[idx] <- rows + cnt++ + if done { + break + } + if curBatchSize < batchSize { + curBatchSize *= 2 + } + } +} + func (e *HashJoinExec) prepare() error { + e.finished = false + e.bigTableRows = make([]chan []*Row, e.concurrency) + for i := 0; i < e.concurrency; i++ { + e.bigTableRows[i] = make(chan []*Row, e.concurrency*100) + } + e.bigTableErr = make(chan error, 1) + + // Start a worker to fetch big table rows. + go e.fetchBigExec() + e.hashTable = make(map[string][]*Row) e.cursor = 0 for { @@ -135,7 +204,7 @@ func (e *HashJoinExec) prepare() error { continue } } - hasNull, hashcode, err := getHashKey(e.smallHashKey, row, e.targetTypes) + hasNull, hashcode, err := getHashKey(e.smallHashKey, row, e.targetTypes, e.hashJoinContexts[0].datumBuffer, nil) if err != nil { return errors.Trace(err) } @@ -149,12 +218,87 @@ func (e *HashJoinExec) prepare() error { } } + e.resultRows = make(chan *Row, e.concurrency*1000) + e.resultErr = make(chan error, 1) + + e.wg = sync.WaitGroup{} + for i := 0; i < e.concurrency; i++ { + e.wg.Add(1) + go e.doJoin(i) + } + go e.closeChanWorker() + e.prepared = true return nil } -func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, err error) { - hasNull, hashcode, err := getHashKey(e.bigHashKey, bigRow, e.targetTypes) +func (e *HashJoinExec) closeChanWorker() { + e.wg.Wait() + close(e.resultRows) + e.hashTable = nil +} + +// doJoin does join job in one goroutine. +func (e *HashJoinExec) doJoin(idx int) { + for { + var ( + bigRows []*Row + ok bool + err error + ) + select { + case bigRows, ok = <-e.bigTableRows[idx]: + case err = <-e.bigTableErr: + } + if err != nil { + e.resultErr <- errors.Trace(err) + break + } + if !ok || e.finished { + break + } + for _, bigRow := range bigRows { + succ := e.join(e.hashJoinContexts[idx], bigRow) + if !succ { + break + } + } + } + e.wg.Done() +} + +func (e *HashJoinExec) join(ctx *hashJoinCtx, bigRow *Row) bool { + var ( + matchedRows []*Row + err error + ) + bigMatched := true + if e.bigFilter != nil { + bigMatched, err = expression.EvalBool(ctx.bigFilter, bigRow.Data, e.ctx) + if err != nil { + e.resultErr <- errors.Trace(err) + return false + } + } + if bigMatched { + matchedRows, err = e.constructMatchedRows(ctx, bigRow) + if err != nil { + e.resultErr <- errors.Trace(err) + return false + } + } + for _, r := range matchedRows { + e.resultRows <- r + } + if len(matchedRows) == 0 && e.outer { + r := e.fillNullRow(bigRow) + e.resultRows <- r + } + return true +} + +func (e *HashJoinExec) constructMatchedRows(ctx *hashJoinCtx, bigRow *Row) (matchedRows []*Row, err error) { + hasNull, hashcode, err := getHashKey(e.bigHashKey, bigRow, e.targetTypes, ctx.datumBuffer, ctx.hashKeyBuffer[0:0:cap(ctx.hashKeyBuffer)]) if err != nil { return nil, errors.Trace(err) } @@ -176,7 +320,7 @@ func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, er matchedRow = joinTwoRow(bigRow, smallRow) } if e.otherFilter != nil { - otherMatched, err = expression.EvalBool(e.otherFilter, matchedRow.Data, e.ctx) + otherMatched, err = expression.EvalBool(ctx.otherFilter, matchedRow.Data, e.ctx) if err != nil { return nil, errors.Trace(err) } @@ -205,14 +349,6 @@ func (e *HashJoinExec) fillNullRow(bigRow *Row) (returnRow *Row) { return returnRow } -func (e *HashJoinExec) returnRecord() (ret *Row, ok bool) { - if e.cursor >= len(e.matchedRows) { - return nil, false - } - e.cursor++ - return e.matchedRows[e.cursor-1], true -} - // Next implements Executor Next interface. func (e *HashJoinExec) Next() (*Row, error) { if !e.prepared { @@ -220,46 +356,23 @@ func (e *HashJoinExec) Next() (*Row, error) { return nil, errors.Trace(err) } } - - row, ok := e.returnRecord() - if ok { - return row, nil + var ( + row *Row + err error + ok bool + ) + select { + case row, ok = <-e.resultRows: + case err, ok = <-e.resultErr: } - - for { - bigRow, err := e.bigExec.Next() - if err != nil { - return nil, errors.Trace(err) - } - if bigRow == nil { - e.bigExec.Close() - return nil, nil - } - - var matchedRows []*Row - bigMatched := true - if e.bigFilter != nil { - bigMatched, err = expression.EvalBool(e.bigFilter, bigRow.Data, e.ctx) - if err != nil { - return nil, errors.Trace(err) - } - } - if bigMatched { - matchedRows, err = e.constructMatchedRows(bigRow) - if err != nil { - return nil, errors.Trace(err) - } - } - e.matchedRows = matchedRows - e.cursor = 0 - row, ok := e.returnRecord() - if ok { - return row, nil - } else if e.outer { - row = e.fillNullRow(bigRow) - return row, nil - } + if err != nil { + e.finished = true + return nil, errors.Trace(err) } + if !ok { + return nil, nil + } + return row, nil } // HashSemiJoinExec implements the hash join algorithm for semi join. @@ -326,7 +439,7 @@ func (e *HashSemiJoinExec) prepare() error { continue } } - hasNull, hashcode, err := getHashKey(e.smallHashKey, row, e.targetTypes) + hasNull, hashcode, err := getHashKey(e.smallHashKey, row, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil) if err != nil { return errors.Trace(err) } @@ -346,7 +459,7 @@ func (e *HashSemiJoinExec) prepare() error { } func (e *HashSemiJoinExec) rowIsMatched(bigRow *Row) (matched bool, hasNull bool, err error) { - hasNull, hashcode, err := getHashKey(e.bigHashKey, bigRow, e.targetTypes) + hasNull, hashcode, err := getHashKey(e.bigHashKey, bigRow, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil) if err != nil { return false, false, errors.Trace(err) } diff --git a/executor/new_executor_xapi.go b/executor/new_executor_xapi.go index cee4cb463a..b67069cfd0 100644 --- a/executor/new_executor_xapi.go +++ b/executor/new_executor_xapi.go @@ -79,6 +79,7 @@ type NewXSelectIndexExec struct { result xapi.SelectResult partialResult xapi.PartialResult where *tipb.Expr + txn kv.Transaction tasks []*lookupTableTask taskCursor int @@ -109,12 +110,7 @@ func (e *NewXSelectIndexExec) AddAggregate(funcs []*tipb.Expr, byItems []*tipb.B e.byItems = byItems e.aggFields = fields e.aggregate = true - txn, err := e.ctx.GetTxn(false) - if err != nil { - e.indexPlan.DoubleRead = true - return - } - client := txn.GetClient() + client := e.txn.GetClient() if !client.SupportRequestType(kv.ReqTypeIndex, kv.ReqSubTypeGroupBy) { e.indexPlan.DoubleRead = true } @@ -299,12 +295,8 @@ func (e *NewXSelectIndexExec) fetchHandles() ([]int64, error) { } func (e *NewXSelectIndexExec) doIndexRequest() (xapi.SelectResult, error) { - txn, err := e.ctx.GetTxn(false) - if err != nil { - return nil, errors.Trace(err) - } selIdxReq := new(tipb.SelectRequest) - selIdxReq.StartTs = txn.StartTS() + selIdxReq.StartTs = e.txn.StartTS() selIdxReq.IndexInfo = xapi.IndexToProto(e.table.Meta(), e.indexPlan.Index) selIdxReq.Limit = e.indexPlan.LimitCount if e.indexPlan.Desc { @@ -314,6 +306,7 @@ func (e *NewXSelectIndexExec) doIndexRequest() (xapi.SelectResult, error) { for i, v := range e.indexPlan.Index.Columns { fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType) } + var err error selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges, fieldTypes) if err != nil { return nil, errors.Trace(err) @@ -327,7 +320,7 @@ func (e *NewXSelectIndexExec) doIndexRequest() (xapi.SelectResult, error) { } else if e.indexPlan.OutOfOrder { concurrency = defaultConcurrency } - return xapi.Select(txn.GetClient(), selIdxReq, concurrency, !e.indexPlan.OutOfOrder) + return xapi.Select(e.txn.GetClient(), selIdxReq, concurrency, !e.indexPlan.OutOfOrder) } func (e *NewXSelectIndexExec) buildTableTasks(handles []int64) { @@ -460,13 +453,9 @@ func (e *NewXSelectIndexExec) extractRowsFromPartialResult(t table.Table, partia } func (e *NewXSelectIndexExec) doTableRequest(handles []int64) (xapi.SelectResult, error) { - txn, err := e.ctx.GetTxn(false) - if err != nil { - return nil, errors.Trace(err) - } // The handles are not in original index order, so we can't push limit here. selTableReq := new(tipb.SelectRequest) - selTableReq.StartTs = txn.StartTS() + selTableReq.StartTs = e.txn.StartTS() selTableReq.TableInfo = &tipb.TableInfo{ TableId: e.table.Meta().ID, } @@ -486,7 +475,7 @@ func (e *NewXSelectIndexExec) doTableRequest(handles []int64) (xapi.SelectResult selTableReq.Aggregates = e.aggFuncs selTableReq.GroupBy = e.byItems // Aggregate Info - resp, err := xapi.Select(txn.GetClient(), selTableReq, defaultConcurrency, false) + resp, err := xapi.Select(e.txn.GetClient(), selTableReq, defaultConcurrency, false) if err != nil { return nil, errors.Trace(err) } @@ -516,6 +505,7 @@ type NewXSelectTableExec struct { limitCount *int64 returnedRows uint64 // returned rowCount keepOrder bool + txn kv.Transaction /* The following attributes are used for aggregation push down. @@ -547,12 +537,9 @@ func (e *NewXSelectTableExec) Schema() expression.Schema { } func (e *NewXSelectTableExec) doRequest() error { - txn, err := e.ctx.GetTxn(false) - if err != nil { - return errors.Trace(err) - } + var err error selReq := new(tipb.SelectRequest) - selReq.StartTs = txn.StartTS() + selReq.StartTs = e.txn.StartTS() selReq.Where = e.where selReq.Ranges = tableRangesToPBRanges(e.ranges) columns := e.Columns @@ -567,7 +554,7 @@ func (e *NewXSelectTableExec) doRequest() error { // Aggregate Info selReq.Aggregates = e.aggFuncs selReq.GroupBy = e.byItems - e.result, err = xapi.Select(txn.GetClient(), selReq, defaultConcurrency, e.keepOrder) + e.result, err = xapi.Select(e.txn.GetClient(), selReq, defaultConcurrency, e.keepOrder) if err != nil { return errors.Trace(err) } @@ -614,7 +601,7 @@ func (e *NewXSelectTableExec) Next() (*Row, error) { if e.partialResult == nil { return nil, nil } - log.Debugf("[TIME_TABLE_SCAN] %v", time.Now().Sub(startTs)) + log.Infof("[TIME_TABLE_SCAN] %v", time.Now().Sub(startTs)) } h, rowData, err := e.partialResult.Next() if err != nil { diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index 6f868e98f7..f8fc24999f 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -301,6 +301,8 @@ func (p *Join) handleLeftJoin(prop requiredProperty, innerJoin bool) (*physicalP RightConditions: p.RightConditions, OtherConditions: p.OtherConditions, SmallTable: 1, + // TODO: decide concurrency by data size. + Concurrency: 5, } join.SetSchema(p.schema) if innerJoin { @@ -341,6 +343,8 @@ func (p *Join) handleRightJoin(prop requiredProperty, innerJoin bool) (*physical LeftConditions: p.LeftConditions, RightConditions: p.RightConditions, OtherConditions: p.OtherConditions, + // TODO: decide concurrency by data size. + Concurrency: 5, } join.SetSchema(p.schema) if innerJoin { diff --git a/plan/physical_plans.go b/plan/physical_plans.go index cc18f40b9a..48e55131ed 100644 --- a/plan/physical_plans.go +++ b/plan/physical_plans.go @@ -87,6 +87,7 @@ type PhysicalHashJoin struct { RightConditions []expression.Expression OtherConditions []expression.Expression SmallTable int + Concurrency int } // PhysicalHashSemiJoin represents hash join for semi join.