@ -67,7 +67,7 @@ func (s *mockGCSSuite) TestSameBehaviourDetachedOrNot() {
|
||||
require.Len(s.T(), rows, 1)
|
||||
require.Eventually(s.T(), func() bool {
|
||||
return executor.TestDetachedTaskFinished.Load()
|
||||
}, 10*time.Second, time.Second)
|
||||
}, maxWaitTime, time.Second)
|
||||
|
||||
r1 := s.tk.MustQuery("SELECT * FROM test_detached.t1").Sort().Rows()
|
||||
s.tk.MustQuery("SELECT * FROM test_detached.t2").Sort().Check(r1)
|
||||
|
||||
@ -965,7 +965,7 @@ func (s *mockGCSSuite) TestRegisterTask() {
|
||||
resp, err2 := client.GetClient().Get(context.Background(), etcdKey)
|
||||
s.NoError(err2)
|
||||
return len(resp.Kvs) == 1
|
||||
}, 5*time.Second, 300*time.Millisecond)
|
||||
}, maxWaitTime, 300*time.Millisecond)
|
||||
// continue the execution
|
||||
importinto.TestSyncChan <- struct{}{}
|
||||
wg.Wait()
|
||||
|
||||
@ -295,7 +295,7 @@ func (s *mockGCSSuite) TestShowDetachedJob() {
|
||||
s.Require().Eventually(func() bool {
|
||||
rows := s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID1)).Rows()
|
||||
return rows[0][5] == "finished"
|
||||
}, 20*time.Second, 500*time.Millisecond)
|
||||
}, maxWaitTime, 500*time.Millisecond)
|
||||
rows := s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID1)).Rows()
|
||||
s.Len(rows, 1)
|
||||
jobInfo.Status = "finished"
|
||||
@ -329,7 +329,7 @@ func (s *mockGCSSuite) TestShowDetachedJob() {
|
||||
s.Require().Eventually(func() bool {
|
||||
rows = s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID2)).Rows()
|
||||
return rows[0][5] == "failed"
|
||||
}, 10*time.Second, 500*time.Millisecond)
|
||||
}, maxWaitTime, 500*time.Millisecond)
|
||||
rows = s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID2)).Rows()
|
||||
s.Len(rows, 1)
|
||||
jobInfo.Status = "failed"
|
||||
@ -362,7 +362,7 @@ func (s *mockGCSSuite) TestShowDetachedJob() {
|
||||
s.Require().Eventually(func() bool {
|
||||
rows = s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID3)).Rows()
|
||||
return rows[0][5] == "failed"
|
||||
}, 10*time.Second, 500*time.Millisecond)
|
||||
}, maxWaitTime, 500*time.Millisecond)
|
||||
rows = s.tk.MustQuery(fmt.Sprintf("show import job %d", jobID3)).Rows()
|
||||
s.Len(rows, 1)
|
||||
jobInfo.Status = "failed"
|
||||
@ -442,7 +442,7 @@ func (s *mockGCSSuite) TestCancelJob() {
|
||||
s.Require().Eventually(func() bool {
|
||||
task := getTask(int64(jobID1))
|
||||
return task.State == proto.TaskStateReverted
|
||||
}, 10*time.Second, 500*time.Millisecond)
|
||||
}, maxWaitTime, 500*time.Millisecond)
|
||||
|
||||
// cancel again, should fail
|
||||
s.ErrorIs(s.tk.ExecToErr(fmt.Sprintf("cancel import job %d", jobID1)), exeerrors.ErrLoadDataInvalidOperation)
|
||||
@ -506,7 +506,7 @@ func (s *mockGCSSuite) TestCancelJob() {
|
||||
}
|
||||
}
|
||||
return globalTask.State == proto.TaskStateReverted && cancelled
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
}, maxWaitTime, 1*time.Second)
|
||||
|
||||
// todo: enable it when https://github.com/pingcap/tidb/issues/44443 fixed
|
||||
//// cancel a pending job created by test_cancel_job2 using root
|
||||
@ -625,5 +625,5 @@ func (s *mockGCSSuite) TestKillBeforeFinish() {
|
||||
globalTask, err2 := globalTaskManager.GetGlobalTaskByKeyWithHistory(taskKey)
|
||||
s.NoError(err2)
|
||||
return globalTask.State == proto.TaskStateReverted
|
||||
}, 5*time.Second, 1*time.Second)
|
||||
}, maxWaitTime, 1*time.Second)
|
||||
}
|
||||
|
||||
@ -67,7 +67,7 @@ func (s *mockGCSSuite) TestDetachedLoadParquet() {
|
||||
require.Len(s.T(), rows, 1)
|
||||
require.Eventually(s.T(), func() bool {
|
||||
return executor.TestDetachedTaskFinished.Load()
|
||||
}, 10*time.Second, time.Second)
|
||||
}, maxWaitTime, time.Second)
|
||||
|
||||
s.tk.MustQuery("SELECT * FROM t;").Check(testkit.Rows(
|
||||
"1 1 0 123 1.23 0.00000001 1234567890 123 1.23000000",
|
||||
|
||||
@ -17,6 +17,7 @@ package importintotest
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fsouza/fake-gcs-server/fakestorage"
|
||||
"github.com/pingcap/failpoint"
|
||||
@ -42,6 +43,8 @@ var (
|
||||
// NOTE: must end with '/'
|
||||
gcsEndpointFormat = "http://%s:%d/storage/v1/"
|
||||
gcsEndpoint = fmt.Sprintf(gcsEndpointFormat, gcsHost, gcsPort)
|
||||
|
||||
maxWaitTime = 30 * time.Second
|
||||
)
|
||||
|
||||
func TestLoadRemote(t *testing.T) {
|
||||
|
||||
@ -115,7 +115,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() {
|
||||
globalTask, err2 = globalTaskManager.GetGlobalTaskByKeyWithHistory(importinto.TaskKey(int64(jobID)))
|
||||
s.NoError(err2)
|
||||
return globalTask.State == "failed"
|
||||
}, 10*time.Second, 300*time.Millisecond)
|
||||
}, 30*time.Second, 300*time.Millisecond)
|
||||
// check all sorted data cleaned up
|
||||
<-dispatcher.WaitCleanUpFinished
|
||||
|
||||
|
||||
Reference in New Issue
Block a user