s3: remove the file existence check after write file (#65566)
ref pingcap/tidb#65461
This commit is contained in:
@ -795,15 +795,8 @@ func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) er
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// Use the proper waiter pattern in AWS SDK v2
|
||||
waiter := s3.NewObjectExistsWaiter(rs.svc)
|
||||
hinput := &s3.HeadObjectInput{
|
||||
Bucket: aws.String(rs.options.Bucket),
|
||||
Key: aws.String(rs.options.Prefix + file),
|
||||
}
|
||||
err = waiter.Wait(ctx, hinput, 30*time.Second)
|
||||
rs.accessRec.RecWrite(len(data))
|
||||
return errors.Trace(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadFile implements Storage.ReadFile.
|
||||
|
||||
@ -505,7 +505,7 @@ func TestWriteNoError(t *testing.T) {
|
||||
s := createS3SuiteWithRec(t, accessRec)
|
||||
ctx := context.Background()
|
||||
|
||||
putCall := s.s3.EXPECT().
|
||||
s.s3.EXPECT().
|
||||
PutObject(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
DoAndReturn(func(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
|
||||
require.Equal(t, "bucket", aws.ToString(input.Bucket))
|
||||
@ -518,14 +518,6 @@ func TestWriteNoError(t *testing.T) {
|
||||
require.Equal(t, []byte("test"), body)
|
||||
return &s3.PutObjectOutput{}, nil
|
||||
})
|
||||
s.s3.EXPECT().
|
||||
HeadObject(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
DoAndReturn(func(_ context.Context, input *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
|
||||
require.Equal(t, "bucket", aws.ToString(input.Bucket))
|
||||
require.Equal(t, "prefix/file", aws.ToString(input.Key))
|
||||
return &s3.HeadObjectOutput{}, nil
|
||||
}).
|
||||
After(putCall)
|
||||
|
||||
err := s.storage.WriteFile(ctx, "file", []byte("test"))
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -992,7 +992,7 @@ func TestNextGenMetering(t *testing.T) {
|
||||
return gotMeterData.Load() != ""
|
||||
}, 30*time.Second, 300*time.Millisecond)
|
||||
require.Contains(t, gotMeterData.Load(), fmt.Sprintf("id: %d, ", task.ID))
|
||||
require.Contains(t, gotMeterData.Load(), "requests{get: 7, put: 6}")
|
||||
require.Contains(t, gotMeterData.Load(), "requests{get: 5, put: 6}")
|
||||
// the read bytes is not stable, but it's more than 100B.
|
||||
// the write bytes is also not stable, due to retry, but mostly 100B to a few KB.
|
||||
require.Regexp(t, `cluster{r: 1\d\dB, w: (\d{3}|.*Ki)B}`, gotMeterData.Load())
|
||||
@ -1004,13 +1004,13 @@ func TestNextGenMetering(t *testing.T) {
|
||||
readIndexSum := getStepSummary(t, taskManager, task.ID, proto.BackfillStepReadIndex)
|
||||
mergeSum := getStepSummary(t, taskManager, task.ID, proto.BackfillStepMergeSort)
|
||||
ingestSum := getStepSummary(t, taskManager, task.ID, proto.BackfillStepWriteAndIngest)
|
||||
require.EqualValues(t, 1, readIndexSum.GetReqCnt.Load())
|
||||
require.EqualValues(t, 0, readIndexSum.GetReqCnt.Load())
|
||||
require.EqualValues(t, 3, readIndexSum.PutReqCnt.Load())
|
||||
require.Greater(t, readIndexSum.ReadBytes.Load(), int64(0))
|
||||
require.EqualValues(t, 153, readIndexSum.Bytes.Load())
|
||||
require.EqualValues(t, 3, readIndexSum.RowCnt.Load())
|
||||
|
||||
require.EqualValues(t, 3, mergeSum.GetReqCnt.Load())
|
||||
require.EqualValues(t, 2, mergeSum.GetReqCnt.Load())
|
||||
require.EqualValues(t, 3, mergeSum.PutReqCnt.Load())
|
||||
require.EqualValues(t, 0, mergeSum.ReadBytes.Load())
|
||||
require.EqualValues(t, 0, mergeSum.Bytes.Load())
|
||||
|
||||
@ -549,7 +549,7 @@ func TestNextGenMetering(t *testing.T) {
|
||||
}, 30*time.Second, 300*time.Millisecond)
|
||||
|
||||
s.Contains(gotMeterData.Load(), fmt.Sprintf("id: %d, ", task.ID))
|
||||
s.Contains(gotMeterData.Load(), "requests{get: 16, put: 13}")
|
||||
s.Contains(gotMeterData.Load(), "requests{get: 11, put: 13}")
|
||||
// note: the read/write of subtask meta file is also counted in obj_store part,
|
||||
// but meta file contains file name which contains task and subtask ID, so
|
||||
// the length may vary, we just use regexp to match here.
|
||||
@ -560,18 +560,18 @@ func TestNextGenMetering(t *testing.T) {
|
||||
sum := s.getStepSummary(ctx, taskManager, task.ID, proto.ImportStepEncodeAndSort)
|
||||
s.EqualValues(3, sum.RowCnt.Load())
|
||||
s.EqualValues(27, sum.Bytes.Load())
|
||||
s.EqualValues(2, sum.GetReqCnt.Load())
|
||||
s.EqualValues(1, sum.GetReqCnt.Load())
|
||||
s.EqualValues(5, sum.PutReqCnt.Load())
|
||||
|
||||
sum = s.getStepSummary(ctx, taskManager, task.ID, proto.ImportStepMergeSort)
|
||||
s.EqualValues(288, sum.Bytes.Load())
|
||||
s.EqualValues(6, sum.GetReqCnt.Load())
|
||||
s.EqualValues(4, sum.GetReqCnt.Load())
|
||||
s.EqualValues(6, sum.PutReqCnt.Load())
|
||||
|
||||
sum = s.getStepSummary(ctx, taskManager, task.ID, proto.ImportStepWriteAndIngest)
|
||||
// if we retry write, the bytes may be larger than 288
|
||||
s.GreaterOrEqual(sum.Bytes.Load(), int64(288))
|
||||
s.EqualValues(8, sum.GetReqCnt.Load())
|
||||
s.EqualValues(6, sum.GetReqCnt.Load())
|
||||
s.EqualValues(2, sum.PutReqCnt.Load())
|
||||
|
||||
s.Eventually(func() bool {
|
||||
|
||||
Reference in New Issue
Block a user