lightning: make OpLevelOptional suppress the error of DoChecksum (#45486)
close pingcap/tidb#45382
This commit is contained in:
@ -91,6 +91,8 @@ go_library(
|
||||
"@com_github_tikv_pd_client//:client",
|
||||
"@io_etcd_go_etcd_client_v3//:client",
|
||||
"@org_golang_google_grpc//:grpc",
|
||||
"@org_golang_google_grpc//codes",
|
||||
"@org_golang_google_grpc//status",
|
||||
"@org_golang_x_exp//maps",
|
||||
"@org_golang_x_exp//slices",
|
||||
"@org_golang_x_sync//errgroup",
|
||||
|
||||
@ -54,6 +54,8 @@ import (
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// TableImporter is a helper struct to import a table.
|
||||
@ -1034,15 +1036,26 @@ func (tr *TableImporter) postProcess(
|
||||
|
||||
var remoteChecksum *local.RemoteChecksum
|
||||
remoteChecksum, err = DoChecksum(ctx, tr.tableInfo)
|
||||
failpoint.Inject("checksum-error", func() {
|
||||
tr.logger.Info("failpoint checksum-error injected.")
|
||||
remoteChecksum = nil
|
||||
err = status.Error(codes.Unknown, "Checksum meets error.")
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
if rc.cfg.PostRestore.Checksum != config.OpLevelOptional {
|
||||
return false, err
|
||||
}
|
||||
tr.logger.Warn("do checksum failed, will skip this error and go on", log.ShortError(err))
|
||||
err = nil
|
||||
}
|
||||
err = tr.compareChecksum(remoteChecksum, localChecksum)
|
||||
// with post restore level 'optional', we will skip checksum error
|
||||
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
|
||||
if err != nil {
|
||||
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
|
||||
err = nil
|
||||
if remoteChecksum != nil {
|
||||
err = tr.compareChecksum(remoteChecksum, localChecksum)
|
||||
// with post restore level 'optional', we will skip checksum error
|
||||
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
|
||||
if err != nil {
|
||||
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -8,3 +8,6 @@ schema-pattern = "routes_a*"
|
||||
table-pattern = "t*"
|
||||
target-schema = "routes_b"
|
||||
target-table = "u"
|
||||
|
||||
[post-restore]
|
||||
checksum = "optional"
|
||||
|
||||
@ -4,12 +4,17 @@
|
||||
|
||||
set -eux
|
||||
|
||||
echo "testing checksum-error..."
|
||||
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/checksum-error=1*return()"
|
||||
|
||||
run_sql 'DROP DATABASE IF EXISTS routes_a0;'
|
||||
run_sql 'DROP DATABASE IF EXISTS routes_a1;'
|
||||
run_sql 'DROP DATABASE IF EXISTS routes_b;'
|
||||
|
||||
run_lightning
|
||||
|
||||
echo "test checksum-error success!"
|
||||
|
||||
run_sql 'SELECT count(1), sum(x) FROM routes_b.u;'
|
||||
check_contains 'count(1): 4'
|
||||
check_contains 'sum(x): 259'
|
||||
|
||||
@ -120,21 +120,26 @@ func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostPr
|
||||
}
|
||||
remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !remoteChecksum.IsEqual(&localChecksum) {
|
||||
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
|
||||
remoteChecksum.Checksum, localChecksum.Sum(),
|
||||
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
|
||||
remoteChecksum.TotalBytes, localChecksum.SumSize(),
|
||||
)
|
||||
if taskMeta.Plan.Checksum == config.OpLevelOptional {
|
||||
logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2))
|
||||
err2 = nil
|
||||
if taskMeta.Plan.Checksum != config.OpLevelOptional {
|
||||
return err
|
||||
}
|
||||
return err2
|
||||
logger.Warn("checksumTable failed, will skip this error and go on", zap.Error(err))
|
||||
}
|
||||
if remoteChecksum != nil {
|
||||
if !remoteChecksum.IsEqual(&localChecksum) {
|
||||
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
|
||||
remoteChecksum.Checksum, localChecksum.Sum(),
|
||||
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
|
||||
remoteChecksum.TotalBytes, localChecksum.SumSize(),
|
||||
)
|
||||
if taskMeta.Plan.Checksum == config.OpLevelOptional {
|
||||
logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2))
|
||||
err2 = nil
|
||||
}
|
||||
return err2
|
||||
}
|
||||
logger.Info("checksum pass", zap.Object("local", &localChecksum))
|
||||
}
|
||||
logger.Info("checksum pass", zap.Object("local", &localChecksum))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user