ddl,lightning: fix ingest data unexpectedly using zero commit ts (#48797)

close pingcap/tidb#48804
This commit is contained in:
tangenta
2023-11-22 20:56:41 +08:00
committed by GitHub
parent 3ed7732958
commit d2cfbdef67
7 changed files with 104 additions and 8 deletions

View File

@ -1761,6 +1761,12 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
if err = local.allocateTSIfNotExists(ctx, localEngine); err != nil {
return errors.Trace(err)
}
failpoint.Inject("mockAllocateTSErr", func() {
// mock generate timestamp error when reset engine.
localEngine.TS = 0
mockGRPCErr, _ := status.FromError(errors.Errorf("mock generate timestamp error"))
failpoint.Return(errors.Trace(mockGRPCErr.Err()))
})
}
localEngine.pendingFileSize.Store(0)

View File

@ -318,9 +318,10 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
clients = append(clients, wstream)
allPeers = append(allPeers, peer)
}
dataCommitTS := j.ingestData.GetTS()
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: j.ingestData.GetTS(),
CommitTs: dataCommitTS,
},
}
@ -407,7 +408,8 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
logutil.Key("endKey", j.keyRange.End),
logutil.Key("remainStart", remainingStartKey),
logutil.Region(region),
logutil.Leader(j.region.Leader))
logutil.Leader(j.region.Leader),
zap.Uint64("commitTS", dataCommitTS))
}
break
}

View File

@ -20,6 +20,7 @@ import (
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
lightning "github.com/pingcap/tidb/br/pkg/lightning/config"
@ -214,18 +215,45 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
}
}()
}
logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys))
err = bc.unsafeImportAndReset(ei)
if err != nil {
logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
return true, false, err
}
return true, true, nil
}
func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", ei.indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
logger := log.FromContext(bc.ctx).With(
zap.Stringer("engineUUID", ei.uuid),
)
ei.closedEngine = backend.NewClosedEngine(bc.backend, logger, ei.uuid, 0)
regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio)
regionSplitKeys := int64(lightning.SplitRegionKeys)
if err := ei.closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil {
logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID),
zap.String("usage info", bc.diskRoot.UsageInfo()))
return err
}
err := bc.backend.ResetEngine(bc.ctx, ei.uuid)
if err != nil {
logutil.Logger(bc.ctx).Error(LitErrResetEngineFail, zap.Int64("index ID", ei.indexID))
err1 := ei.closedEngine.Cleanup(bc.ctx)
if err1 != nil {
logutil.Logger(ei.ctx).Error(LitErrCleanEngineErr, zap.Error(err1),
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
ei.closedEngine = nil
return err
}
return nil
}
// ForceSyncFlagForTest is a flag to force sync only for test.
var ForceSyncFlagForTest = false

View File

@ -42,6 +42,7 @@ const (
LitErrCloseWriterErr string = "close writer error"
LitErrReadSortPath string = "cannot read sort path"
LitErrCleanSortPath string = "cannot cleanup sort path"
LitErrResetEngineFail string = "reset engine failed"
LitWarnEnvInitFail string = "initialize environment failed"
LitWarnConfigError string = "build config for backend failed"
LitInfoEnvInitSucc string = "init global ingest backend environment finished"

View File

@ -15,8 +15,12 @@ go_test(
"//pkg/disttask/framework/dispatcher",
"//pkg/disttask/framework/proto",
"//pkg/errno",
"//pkg/kv",
"//pkg/parser/model",
"//pkg/store/helper",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/types",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",

View File

@ -24,8 +24,12 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/dispatcher"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
)
@ -229,3 +233,53 @@ func TestAddIndexForCurrentTimestampColumn(t *testing.T) {
tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check table t;")
}
func TestAddIndexTSErrorWhenResetImportEngine(t *testing.T) {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
var tblInfo *model.TableInfo
var idxInfo *model.IndexInfo
cb := &callback.TestDDLCallback{}
interceptFn := func(job *model.Job) {
if idxInfo == nil {
tbl, _ := dom.InfoSchema().TableByID(job.TableID)
tblInfo = tbl.Meta()
if len(tblInfo.Indices) == 0 {
return
}
idxInfo = tblInfo.Indices[0]
}
}
cb.OnJobUpdatedExported.Store(&interceptFn)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_dist_task = off;")
})
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set global tidb_enable_dist_task = on;")
err := failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockAllocateTSErr", `1*return`)
require.NoError(t, err)
tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1), (2), (3);")
dom.DDL().SetHook(cb)
tk.MustExec("alter table t add index idx(a);")
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/local/mockAllocateTSErr")
require.NoError(t, err)
dts := []types.Datum{types.NewIntDatum(1)}
sctx := tk.Session().GetSessionVars().StmtCtx
idxKey, _, err := tablecodec.GenIndexKey(sctx, tblInfo, idxInfo, tblInfo.ID, dts, kv.IntHandle(1), nil)
require.NoError(t, err)
tikvStore := dom.Store().(helper.Storage)
newHelper := helper.NewHelper(tikvStore)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 0)
require.NoError(t, err)
require.NotNil(t, mvccResp)
require.NotNil(t, mvccResp.Info)
require.Greater(t, len(mvccResp.Info.Writes), 0)
require.Greater(t, mvccResp.Info.Writes[0].CommitTs, uint64(0))
}

View File

@ -399,6 +399,7 @@ func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set global tidb_ddl_reorg_worker_cnt=1;")
tk.MustExec("set global tidb_enable_dist_task = 0;")
tk.MustExec("create table t(id int primary key, b int, k int);")
tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))