diff --git a/ddl/backfilling_scheduler.go b/ddl/backfilling_scheduler.go index 377f926c8c..a83db4cf9c 100644 --- a/ddl/backfilling_scheduler.go +++ b/ddl/backfilling_scheduler.go @@ -16,6 +16,7 @@ package ddl import ( "context" + "fmt" "sync" "time" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" decoder "github.com/pingcap/tidb/util/rowDecoder" @@ -343,6 +345,9 @@ func (b *ingestBackfillScheduler) close(force bool) { } } close(b.resultCh) + if intest.InTest && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() { + panic(fmt.Sprintf("unexpected chunk size %d", len(b.copReqSenderPool.srcChkPool))) + } if !force { jobID := b.reorgInfo.ID indexID := b.reorgInfo.currElement.ID @@ -450,13 +455,13 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) { defer util.Recover(metrics.LabelDDL, "ingestWorker.HandleTask", func() { w.resultCh <- &backfillResult{taskID: rs.id, err: dbterror.ErrReorgPanic} }, false) - + defer w.copReqSenderPool.recycleChunk(rs.chunk) result := &backfillResult{ taskID: rs.id, err: rs.err, } if result.err != nil { - logutil.BgLogger().Error("[ddl-ingest] finish a cop-request task with error", + logutil.BgLogger().Error("[ddl-ingest] encounter error when handle index chunk", zap.Int("id", rs.id), zap.Error(rs.err)) w.resultCh <- result return diff --git a/ddl/index.go b/ddl/index.go index df4914ebb9..d0153c848b 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1611,10 +1611,8 @@ func (w *addIndexIngestWorker) WriteLocal(rs *idxRecResult) (count int, nextKey vars := w.sessCtx.GetSessionVars() cnt, lastHandle, err := writeChunkToLocal(w.writer, w.index, copCtx, vars, rs.chunk) if err != nil || cnt == 0 { - w.copReqSenderPool.recycleChunk(rs.chunk) return 0, nil, err } - w.copReqSenderPool.recycleChunk(rs.chunk) w.metricCounter.Add(float64(cnt)) logSlowOperations(time.Since(oprStartTime), "writeChunkToLocal", 3000) nextKey = tablecodec.EncodeRecordKey(w.tbl.RecordPrefix(), lastHandle) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 9640304e6b..b02f23bfa2 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -144,7 +144,11 @@ func (c *copReqSender) run() { if p.checkpointMgr != nil { p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done) } - p.chunkSender.AddTask(idxRecResult{id: task.id, chunk: srcChk, done: done}) + idxRs := idxRecResult{id: task.id, chunk: srcChk, done: done} + failpoint.Inject("MockCopSenderError", func() { + idxRs.err = errors.New("mock cop error") + }) + p.chunkSender.AddTask(idxRs) } terror.Call(rs.Close) } diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index 8821c971e0..318a20c2cc 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -59,7 +59,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 6, + shard_count = 7, deps = [ ":ingest", "//config", @@ -68,6 +68,7 @@ go_test( "//sessionctx", "//testkit", "@com_github_ngaut_pools//:pools", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", ], ) diff --git a/ddl/ingest/integration_test.go b/ddl/ingest/integration_test.go index 3e51d52593..d30eaabc39 100644 --- a/ddl/ingest/integration_test.go +++ b/ddl/ingest/integration_test.go @@ -19,6 +19,7 @@ import ( "strings" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -47,10 +48,7 @@ func injectMockBackendMgr(t *testing.T, store kv.Storage) (restore func()) { func TestAddIndexIngestGeneratedColumns(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) - tk.MustExec("drop database if exists addindexlit;") - tk.MustExec("create database addindexlit;") - tk.MustExec("use addindexlit;") - tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("use test;") defer injectMockBackendMgr(t, store)() assertLastNDDLUseIngest := func(n int) { @@ -93,3 +91,26 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) { tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1 2 1", "2 2 2 4 2", "3 3 3 6 3")) assertLastNDDLUseIngest(4) } + +func TestIngestCopSenderErr(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + defer injectMockBackendMgr(t, store)() + + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 4; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) + } + tk.MustQuery("split table t between (0) and (50000) regions 5;").Check(testkit.Rows("4 1")) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCopSenderError", "return")) + tk.MustExec("alter table t add index idx(a);") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockCopSenderError")) + tk.MustExec("admin check table t;") + rows := tk.MustQuery("admin show ddl jobs 1;").Rows() + //nolint: forcetypeassert + jobTp := rows[0][3].(string) + require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp) +}