// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package executor import ( "bufio" "bytes" "compress/gzip" "context" "fmt" "math" "os" "runtime/pprof" "strings" "testing" "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/meta/metadef" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/terror" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { retriever.taskList = make(chan slowLogTask, 100) ctx := context.Background() retriever.parseSlowLog(ctx, sctx, reader, 64) task, ok := <-retriever.taskList if !ok { return nil, nil } var rows [][]types.Datum var err error result := <-task.resultCh rows, err = result.rows, result.err return rows, err } func newSlowQueryRetriever() (*slowQueryRetriever, error) { data := infoschema.NewData() schemaCacheSize := vardef.SchemaCacheSize.Load() newISBuilder := infoschema.NewBuilder(nil, schemaCacheSize, nil, data, schemaCacheSize > 0) err := newISBuilder.InitWithDBInfos(nil, nil, nil, 0) if err != nil { return nil, err } is := newISBuilder.Build(math.MaxUint64) tbl, err := is.TableByName(context.Background(), metadef.InformationSchemaName, ast.NewCIStr(infoschema.TableSlowQuery)) if err != nil { return nil, err } return &slowQueryRetriever{outputCols: tbl.Meta().Columns}, nil } func parseSlowLog(sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { retriever, err := newSlowQueryRetriever() if err != nil { return nil, err } // Ignore the error is ok for test. terror.Log(retriever.initialize(context.Background(), sctx)) rows, err := parseLog(retriever, sctx, reader) return rows, err } func TestParseSlowLogPanic(t *testing.T) { slowLogStr := `# Time: 2019-04-28T15:24:04.309074+08:00 # Txn_start_ts: 405888132465033227 # User@Host: root[root] @ localhost [127.0.0.1] # Query_time: 0.216905 # Cop_time: 0.38 Process_time: 0.021 Request_count: 1 Total_keys: 637 Processed_keys: 436 # Is_internal: true # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 # Stats: t1:1,t2:2 # Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160 # Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160 # Mem_max: 70724 # Disk_max: 65536 # Plan_from_cache: true # Plan_from_binding: true # Succ: false # Plan_digest: 60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4 # Prev_stmt: update t set i = 1; use test; select * from t;` require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/errorMockParseSlowLogPanic", `return(true)`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/errorMockParseSlowLogPanic")) }() reader := bufio.NewReader(bytes.NewBufferString(slowLogStr)) loc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) sctx := mock.NewContext() sctx.ResetSessionAndStmtTimeZone(loc) sctx.GetSessionVars().TimeZone = loc _, err = parseSlowLog(sctx, reader) require.Error(t, err) require.Equal(t, err.Error(), "panic test") } func TestParseSlowLogFile(t *testing.T) { slowLogStr := `# Time: 2019-04-28T15:24:04.309074+08:00 # Txn_start_ts: 405888132465033227 # User@Host: root[root] @ localhost [127.0.0.1] # Session_alias: alias123 # Exec_retry_time: 0.12 Exec_retry_count: 57 # Query_time: 0.216905 # Cop_time: 0.38 Process_time: 0.021 Request_count: 1 Total_keys: 637 Processed_keys: 436 # Rocksdb_delete_skipped_count: 10 Rocksdb_key_skipped_count: 10 Rocksdb_block_cache_hit_count: 10 Rocksdb_block_read_count: 10 Rocksdb_block_read_byte: 100 # Is_internal: true # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 # Stats: t1:1,t2:2 # Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 Cop_proc_addr: 127.0.0.1:20160 # Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 Cop_wait_addr: 0.0.0.0:20160 # Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 # Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 # Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2 # Mem_max: 70724 # Mem_arbitration: 23333 # Disk_max: 65536 # Plan_from_cache: true # Plan_from_binding: true # Unpacked_bytes_sent_tikv_total: 30000 # Unpacked_bytes_received_tikv_total: 3000 # Unpacked_bytes_sent_tikv_cross_zone: 10000 # Unpacked_bytes_received_tikv_cross_zone: 1000 # Unpacked_bytes_sent_tiflash_total: 500000 # Unpacked_bytes_received_tiflash_total: 500005 # Unpacked_bytes_sent_tiflash_cross_zone: 300000 # Unpacked_bytes_received_tiflash_cross_zone: 300005 # Succ: false # IsExplicitTxn: true # Resource_group: default # Request_unit_read: 2.158 # Request_unit_write: 2.123 # Time_queued_by_rc: 0.05 # Tidb_cpu_time: 0.01 # Tikv_cpu_time: 0.021 # Storage_from_kv: true # Storage_from_mpp: true # Plan_digest: 60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4 # Prev_stmt: update t set i = 1; use test; select * from t;` reader := bufio.NewReader(bytes.NewBufferString(slowLogStr)) loc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) ctx := mock.NewContext() ctx.ResetSessionAndStmtTimeZone(loc) rows, err := parseSlowLog(ctx, reader) require.NoError(t, err) require.Len(t, rows, 1) recordString := "" for i, value := range rows[0] { str, err := value.ToString() require.NoError(t, err) if i > 0 { recordString += "," } recordString += str } expectRecordString := `2019-04-28 15:24:04.309074,` + `405888132465033227,root,localhost,0,alias123,57,0.12,0.216905,` + `0,0,0,0,0,0,0,0,0,0,0,0,,0,0,0,0,0,0,0.38,0.021,0,0,0,1,637,0,10,10,10,10,100,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,` + `0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,23333,65536,0,0,0,30000,3000,10000,1000,500000,500005,300000,300005,0,0,,` + `Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` + `0,0,1,0,1,1,0,default,2.158,2.123,0.05,0.01,0.021,1,1,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` + `,update t set i = 1;,select * from t;` require.Equal(t, expectRecordString, recordString) // Issue 20928 reader = bufio.NewReader(bytes.NewBufferString(slowLogStr)) rows, err = parseSlowLog(ctx, reader) require.NoError(t, err) require.Len(t, rows, 1) recordString = "" for i, value := range rows[0] { str, err := value.ToString() require.NoError(t, err) if i > 0 { recordString += "," } recordString += str } expectRecordString = `2019-04-28 15:24:04.309074,` + `405888132465033227,root,localhost,0,alias123,57,0.12,0.216905,` + `0,0,0,0,0,0,0,0,0,0,0,0,,0,0,0,0,0,0,0.38,0.021,0,0,0,1,637,0,10,10,10,10,100,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,` + `0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,23333,65536,0,0,0,30000,3000,10000,1000,500000,500005,300000,300005,0,0,,` + `Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` + `0,0,1,0,1,1,0,default,2.158,2.123,0.05,0.01,0.021,1,1,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` + `,update t set i = 1;,select * from t;` require.Equal(t, expectRecordString, recordString) // fix sql contain '# ' bug slowLog := bytes.NewBufferString( `# Time: 2019-04-28T15:24:04.309074+08:00 select a# from t; # Time: 2019-01-24T22:32:29.313255+08:00 # Txn_start_ts: 405888132465033227 # Query_time: 0.216905 # Process_time: 0.021 Request_count: 1 Total_keys: 637 Processed_keys: 436 # Is_internal: true # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 # Stats: t1:1,t2:2 # Succ: false select * from t; `) reader = bufio.NewReader(slowLog) _, err = parseSlowLog(ctx, reader) require.NoError(t, err) // test for time format compatibility. slowLog = bytes.NewBufferString( `# Time: 2019-04-28T15:24:04.309074+08:00 select * from t; # Time: 2019-04-24-19:41:21.716221 +0800 select * from t; `) reader = bufio.NewReader(slowLog) rows, err = parseSlowLog(ctx, reader) require.NoError(t, err) require.Len(t, rows, 2) t0Str, err := rows[0][0].ToString() require.NoError(t, err) require.Equal(t, t0Str, "2019-04-28 15:24:04.309074") t1Str, err := rows[1][0].ToString() require.NoError(t, err) require.Equal(t, t1Str, "2019-04-24 19:41:21.716221") // Add parse error check. slowLog = bytes.NewBufferString( `# Time: 2019-04-28T15:24:04.309074+08:00 # Succ: abc select * from t; `) reader = bufio.NewReader(slowLog) _, err = parseSlowLog(ctx, reader) require.NoError(t, err) warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() require.Len(t, warnings, 1) require.Equal(t, warnings[0].Err.Error(), "Parse slow log at line 2, failed field is Succ, failed value is abc, error is strconv.ParseBool: parsing \"abc\": invalid syntax") // issue 39940 slowLog = bytes.NewBufferString( `# Time: 2019-04-28T15:24:04.309074+08:00 # DB: a: b # Index_names: [t:i: a] # Succ: true select * from t; `) reader = bufio.NewReader(slowLog) rows, err = parseSlowLog(ctx, reader) require.NoError(t, err) require.Len(t, rows, 1) value, _ := rows[0][41].ToString() require.Equal(t, value, "a: b") value, _ = rows[0][42].ToString() require.Equal(t, value, "[t:i: a]") } // It changes variable.MaxOfMaxAllowedPacket, so must be stayed in SerialSuite. func TestParseSlowLogFileSerial(t *testing.T) { loc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) ctx := mock.NewContext() ctx.ResetSessionAndStmtTimeZone(loc) // test for bufio.Scanner: token too long. slowLog := bytes.NewBufferString( `# Time: 2019-04-28T15:24:04.309074+08:00 select * from t; # Time: 2019-04-24-19:41:21.716221 +0800 `) originValue := vardef.MaxOfMaxAllowedPacket vardef.MaxOfMaxAllowedPacket = 65536 sql := strings.Repeat("x", int(vardef.MaxOfMaxAllowedPacket+1)) slowLog.WriteString(sql) reader := bufio.NewReader(slowLog) _, err = parseSlowLog(ctx, reader) require.Error(t, err) require.EqualError(t, err, "single line length exceeds limit: 65536") vardef.MaxOfMaxAllowedPacket = originValue reader = bufio.NewReader(slowLog) _, err = parseSlowLog(ctx, reader) require.NoError(t, err) } func TestSlowLogParseTime(t *testing.T) { t1Str := "2019-01-24T22:32:29.313255+08:00" t2Str := "2019-01-24T22:32:29.313255" t1, err := ParseTime(t1Str) require.NoError(t, err) loc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) t2, err := time.ParseInLocation("2006-01-02T15:04:05.999999999", t2Str, loc) require.NoError(t, err) require.Equal(t, t1.Unix(), t2.Unix()) t1Format := t1.In(loc).Format(logutil.SlowLogTimeFormat) require.Equal(t, t1Format, t1Str) } // TestFixParseSlowLogFile bugfix // sql select * from INFORMATION_SCHEMA.SLOW_QUERY limit 1; // ERROR 1105 (HY000): string "2019-05-12-11:23:29.61474688" doesn't has a prefix that matches format "2006-01-02-15:04:05.999999999 -0700", err: parsing time "2019-05-12-11:23:29.61474688" as "2006-01-02-15:04:05.999999999 -0700": cannot parse "" as "-0700" func TestFixParseSlowLogFile(t *testing.T) { slowLog := bytes.NewBufferString( `# Time: 2019-05-12-11:23:29.614327491 +0800 # Txn_start_ts: 405888132465033227 # Query_time: 0.216905 # Process_time: 0.021 Request_count: 1 Total_keys: 637 Processed_keys: 436 # Is_internal: true # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 # Stats: t1:1,t2:2 # Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 # Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 # Mem_max: 70724 select * from t # Time: 2019-05-12-11:23:29.614327491 +0800 # Txn_start_ts: 405888132465033227 # Query_time: 0.216905 # Process_time: 0.021 Request_count: 1 Total_keys: 637 Processed_keys: 436 # Is_internal: true # Digest: 42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772 # Stats: t1:1,t2:2 # Cop_proc_avg: 0.1 Cop_proc_p90: 0.2 Cop_proc_max: 0.03 # Cop_wait_avg: 0.05 Cop_wait_p90: 0.6 Cop_wait_max: 0.8 # Mem_max: 70724 # Plan_digest: 60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4 select * from t;`) scanner := bufio.NewReader(slowLog) loc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) ctx := mock.NewContext() ctx.ResetSessionAndStmtTimeZone(loc) _, err = parseSlowLog(ctx, scanner) require.NoError(t, err) // Test parser error. slowLog = bytes.NewBufferString( `# Time: 2019-05-12-11:23:29.614327491 +0800 # Txn_start_ts: 405888132465033227# select * from t; `) scanner = bufio.NewReader(slowLog) _, err = parseSlowLog(ctx, scanner) require.NoError(t, err) warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() require.Len(t, warnings, 1) require.Equal(t, warnings[0].Err.Error(), "Parse slow log at line 2, failed field is Txn_start_ts, failed value is 405888132465033227#, error is strconv.ParseUint: parsing \"405888132465033227#\": invalid syntax") } func TestSlowQueryRetriever(t *testing.T) { logData0 := "" logData1 := ` # Time: 2020-02-15T18:00:01.000000+08:00 select 1; # Time: 2020-02-15T19:00:05.000000+08:00 select 2;` logData2 := ` # Time: 2020-02-16T18:00:01.000000+08:00 select 3; # Time: 2020-02-16T18:00:05.000000+08:00 select 4;` logData3 := ` # Time: 2020-02-16T19:00:00.000000+08:00 select 5; # Time: 2020-02-17T18:00:05.000000+08:00 select 6; # Time: 2020-04-15T18:00:05.299063744+08:00 select 7;` logData := []string{logData0, logData1, logData2, logData3} fileName0 := "tidb-slow-retriever-2020-02-14T19-04-05.01.log" fileName1 := "tidb-slow-retriever-2020-02-15T19-04-05.01.log" fileName2 := "tidb-slow-retriever-2020-02-16T19-04-05.01.log" fileName3 := "tidb-slow-retriever.log" defer config.RestoreFunc()() config.UpdateGlobal(func(conf *config.Config) { conf.Log.SlowQueryFile = fileName3 }) for k := range 2 { // k = 0 for normal files // k = 1 for compressed files var fileNames []string if k == 0 { fileNames = []string{fileName0, fileName1, fileName2, fileName3} } else { fileNames = []string{fileName0 + ".gz", fileName1 + ".gz", fileName2 + ".gz", fileName3} } prepareLogs(t, logData, fileNames) cases := []struct { startTime string endTime string files []string querys []string }{ { startTime: "2020-02-15T18:00:00.000000+08:00", endTime: "2020-02-17T20:00:00.000000+08:00", files: []string{fileName1, fileName2, fileName3}, querys: []string{ "select 1;", "select 2;", "select 3;", "select 4;", "select 5;", "select 6;", }, }, { startTime: "2020-02-15T18:00:02.000000+08:00", endTime: "2020-02-16T20:00:00.000000+08:00", files: []string{fileName1, fileName2, fileName3}, querys: []string{ "select 2;", "select 3;", "select 4;", "select 5;", }, }, { startTime: "2020-02-16T18:00:03.000000+08:00", endTime: "2020-02-16T18:59:00.000000+08:00", files: []string{fileName2}, querys: []string{ "select 4;", }, }, { startTime: "2020-02-16T18:00:03.000000+08:00", endTime: "2020-02-16T20:00:00.000000+08:00", files: []string{fileName2, fileName3}, querys: []string{ "select 4;", "select 5;", }, }, { startTime: "2020-02-16T19:00:00.000000+08:00", endTime: "2020-02-17T17:00:00.000000+08:00", files: []string{fileName3}, querys: []string{ "select 5;", }, }, { startTime: "2010-01-01T00:00:00.000000+08:00", endTime: "2010-01-01T01:00:00.000000+08:00", files: []string{}, }, { startTime: "2020-03-01T00:00:00.000000+08:00", endTime: "2010-03-01T01:00:00.000000+08:00", files: []string{}, }, { startTime: "", endTime: "", files: []string{fileName1, fileName2, fileName3}, querys: []string{ "select 1;", "select 2;", "select 3;", "select 4;", "select 5;", "select 6;", "select 7;", }, }, { startTime: "2020-04-15T18:00:05.299063744+08:00", endTime: "2020-04-15T18:00:05.299063744+08:00", files: []string{fileName3}, querys: []string{ "select 7;", }, }, } loc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) sctx := mock.NewContext() sctx.ResetSessionAndStmtTimeZone(loc) sctx.GetSessionVars().SlowQueryFile = fileName3 for i, cas := range cases { extractor := &plannercore.SlowQueryExtractor{Enable: len(cas.startTime) > 0 && len(cas.endTime) > 0} if extractor.Enable { startTime, err := ParseTime(cas.startTime) require.NoError(t, err) endTime, err := ParseTime(cas.endTime) require.NoError(t, err) extractor.TimeRanges = []*plannercore.TimeRange{{StartTime: startTime, EndTime: endTime}} } retriever, err := newSlowQueryRetriever() require.NoError(t, err) retriever.extractor = extractor err = retriever.initialize(context.Background(), sctx) require.NoError(t, err) comment := fmt.Sprintf("compressed: %v, case id: %v", k, i) if len(retriever.files) > 0 { var reader *bufio.Reader reader, err := retriever.getNextReader() require.NoError(t, err, comment) rows, err := parseLog(retriever, sctx, reader) require.NoError(t, err, comment) require.Equal(t, len(rows), len(cas.querys), comment) for i, row := range rows { require.Equal(t, row[len(row)-1].GetString(), cas.querys[i], comment) } } if k == 0 { require.Equal(t, len(retriever.files), len(cas.files), comment) for i, file := range retriever.files { require.Equal(t, file.file.Name(), cas.files[i], comment) } } else { // for compressed file, sometimes it will contains one more file. require.True(t, (len(retriever.files) == len(cas.files)) || (len(retriever.files) == len(cas.files)+1), comment) var fileNames []string for _, file := range retriever.files { fileNames = append(fileNames, strings.TrimSuffix(file.file.Name(), ".gz")) } for _, file := range cas.files { require.Contains(t, fileNames, file, comment) } } require.NoError(t, retriever.close()) } removeFiles(fileNames) } } func TestSplitbyColon(t *testing.T) { cases := []struct { line string fields []string values []string }{ { "", []string{}, []string{}, }, { "123a", []string{"123a"}, []string{""}, }, { "1a: 2b", []string{"1a"}, []string{"2b"}, }, { "1a: [2b 3c] 4d: 5e", []string{"1a", "4d"}, []string{"[2b 3c]", "5e"}, }, { "1a: [2b,3c] 4d: 5e", []string{"1a", "4d"}, []string{"[2b,3c]", "5e"}, }, { "1a: [2b,[3c: 3cc]] 4d: 5e", []string{"1a", "4d"}, []string{"[2b,[3c: 3cc]]", "5e"}, }, { "1a: {2b 3c} 4d: 5e", []string{"1a", "4d"}, []string{"{2b 3c}", "5e"}, }, { "1a: {2b,3c} 4d: 5e", []string{"1a", "4d"}, []string{"{2b,3c}", "5e"}, }, { "1a: {2b,{3c: 3cc}} 4d: 5e", []string{"1a", "4d"}, []string{"{2b,{3c: 3cc}}", "5e"}, }, { "1a: {{{2b,{3c: 3cc}} 4d: 5e", nil, nil, }, { "1a: [2b,[3c: 3cc]]]] 4d: 5e", nil, nil, }, { "Time: 2021-09-08T14:39:54.506967433+08:00", []string{"Time"}, []string{"2021-09-08T14:39:54.506967433+08:00"}, }, { "Cop_proc_avg: 0 Cop_proc_addr: Cop_proc_max: Cop_proc_min: ", []string{"Cop_proc_avg", "Cop_proc_addr", "Cop_proc_max", "Cop_proc_min"}, []string{"0", "", "", ""}, }, } for _, c := range cases { resFields, resValues := splitByColon(c.line) logutil.BgLogger().Info(c.line) require.Equal(t, c.fields, resFields) require.Equal(t, c.values, resValues) } } func TestBatchLogForReversedScan(t *testing.T) { logData0 := "" logData1 := ` # Time: 2020-02-15T18:00:01.000000+08:00 select 1; # Time: 2020-02-15T19:00:05.000000+08:00 select 2; # Time: 2020-02-15T20:00:05.000000+08:00` logData2 := `select 3; # Time: 2020-02-16T18:00:01.000000+08:00 select 4; # Time: 2020-02-16T18:00:05.000000+08:00 select 5;` logData3 := ` # Time: 2020-02-16T19:00:00.000000+08:00 select 6; # Time: 2020-02-17T18:00:05.000000+08:00 select 7; # Time: 2020-04-15T18:00:05.299063744+08:00` logData4 := `select 8; # Time: 2020-04-15T19:00:05.299063744+08:00 select 9;` logData := []string{logData0, logData1, logData2, logData3, logData4} fileName0 := "tidb-slow-reverse-scan-2020-02-14T19-04-05.01.log" fileName1 := "tidb-slow-reverse-scan-2020-02-15T19-04-05.01.log" fileName2 := "tidb-slow-reverse-scan-2020-02-16T19-04-05.01.log" fileName3 := "tidb-slow-reverse-scan-2020-02-17T19-04-05.01.log" fileName4 := "tidb-slow-reverse-scan.log" defer config.RestoreFunc()() config.UpdateGlobal(func(conf *config.Config) { conf.Log.SlowQueryFile = fileName4 }) fileNames := []string{fileName0, fileName1, fileName2, fileName3, fileName4} prepareLogs(t, logData, fileNames) defer func() { removeFiles(fileNames) }() cases := []struct { startTime string endTime string files []string logs [][]string }{ { startTime: "2020-02-15T18:00:00.000000+08:00", endTime: "2020-02-15T19:00:00.000000+08:00", files: []string{fileName1}, logs: [][]string{ {"# Time: 2020-02-15T19:00:05.000000+08:00", "select 2;", "# Time: 2020-02-15T18:00:01.000000+08:00", "select 1;"}, }, }, { startTime: "2020-02-15T20:00:05.000000+08:00", endTime: "2020-02-17T19:00:00.000000+08:00", files: []string{fileName1, fileName2, fileName3}, logs: [][]string{ {"# Time: 2020-02-17T18:00:05.000000+08:00", "select 7;", "# Time: 2020-02-16T19:00:00.000000+08:00", "select 6;", "# Time: 2020-02-16T18:00:05.000000+08:00", "select 5;", "# Time: 2020-02-16T18:00:01.000000+08:00", "select 4;", "# Time: 2020-02-16T18:00:01.000000+08:00", "select 3;"}, }, }, { startTime: "2020-02-16T19:00:00.000000+08:00", endTime: "2020-04-15T20:00:00.000000+08:00", files: []string{fileName3, fileName4}, logs: [][]string{ {"# Time: 2020-04-15T19:00:05.299063744+08:00", "select 9;", "Time: 2020-04-15T18:00:05.299063744+08:00", "select 8;", "# Time: 2020-02-17T18:00:05.000000+08:00", "select 7;", "# Time: 2020-02-16T19:00:00.000000+08:00", "select 6;"}, }, }, } loc, err := time.LoadLocation("Asia/Shanghai") require.NoError(t, err) sctx := mock.NewContext() sctx.ResetSessionAndStmtTimeZone(loc) sctx.GetSessionVars().SlowQueryFile = fileName3 for i, cas := range cases { extractor := &plannercore.SlowQueryExtractor{Enable: len(cas.startTime) > 0 && len(cas.endTime) > 0, Desc: true} if extractor.Enable { startTime, err := ParseTime(cas.startTime) require.NoError(t, err) endTime, err := ParseTime(cas.endTime) require.NoError(t, err) extractor.TimeRanges = []*plannercore.TimeRange{{StartTime: startTime, EndTime: endTime}} } retriever, err := newSlowQueryRetriever() require.NoError(t, err) retriever.extractor = extractor sctx.GetSessionVars().SlowQueryFile = fileName4 err = retriever.initialize(context.Background(), sctx) require.NoError(t, err) comment := fmt.Sprintf("case id: %v", i) if len(retriever.files) > 0 { reader := bufio.NewReader(retriever.files[0].file) offset := &offset{length: 0, offset: 0} rows, err := retriever.getBatchLogForReversedScan(context.Background(), reader, offset, 3) require.NoError(t, err) for _, row := range rows { for j, log := range row { require.Equal(t, log, cas.logs[0][j], comment) } } } require.NoError(t, retriever.close()) } } func TestCancelParseSlowLog(t *testing.T) { fileName := "tidb-slow-2020-02-14T19-04-05.01.log" slowLog := `# Time: 2019-04-28T15:24:04.309074+08:00 select * from t;` prepareLogs(t, []string{slowLog}, []string{fileName}) defer func() { removeFiles([]string{fileName}) }() sctx := mock.NewContext() sctx.GetSessionVars().SlowQueryFile = fileName retriever, err := newSlowQueryRetriever() require.NoError(t, err) var signal1, signal2 = make(chan int, 1), make(chan int, 1) ctx := context.WithValue(context.Background(), signalsKey{}, []chan int{signal1, signal2}) ctx, cancel := context.WithCancel(ctx) err = failpoint.Enable("github.com/pingcap/tidb/pkg/executor/mockReadSlowLogSlow", "return(true)") require.NoError(t, err) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/mockReadSlowLogSlow")) }() go func() { _, err := retriever.retrieve(ctx, sctx) require.Errorf(t, err, "context canceled") }() // Wait for parseSlowLog going to add tasks. <-signal1 // Cancel the retriever and then dataForSlowLog exits. cancel() // Assume that there are already unprocessed tasks. retriever.taskList <- slowLogTask{} // Let parseSlowLog continue. signal2 <- 1 // parseSlowLog should exit immediately. time.Sleep(1 * time.Second) require.False(t, checkGoroutineExists("parseSlowLog")) } func checkGoroutineExists(keyword string) bool { buf := new(bytes.Buffer) profile := pprof.Lookup("goroutine") err := profile.WriteTo(buf, 1) if err != nil { panic(err) } str := buf.String() return strings.Contains(str, keyword) } func prepareLogs(t *testing.T, logData []string, fileNames []string) { writeFile := func(file string, data string) { if strings.HasSuffix(file, ".gz") { f, err := os.Create(file) require.NoError(t, err) gz := gzip.NewWriter(f) _, err = gz.Write([]byte(data)) require.NoError(t, err) require.NoError(t, gz.Close()) require.NoError(t, f.Close()) } else { f, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) require.NoError(t, err) _, err = f.Write([]byte(data)) require.NoError(t, err) require.NoError(t, f.Close()) } } for i, log := range logData { writeFile(fileNames[i], log) } } func removeFiles(fileNames []string) { for _, fileName := range fileNames { os.Remove(fileName) } } func TestIssue54324(t *testing.T) { f, err := os.CreateTemp("", "test-tidb-slow-query-issue54324") require.NoError(t, err) defer os.Remove(f.Name()) // clean up w := bufio.NewWriter(f) for range 8191 { w.WriteByte('x') } w.WriteByte('\n') for range 4096 { w.WriteByte('a') } require.NoError(t, w.Flush()) stat, err := f.Stat() require.NoError(t, err) endCursor := stat.Size() lines, readBytes, err := readLastLines(context.Background(), f, endCursor) require.NoError(t, err) require.Len(t, lines, 2) require.Equal(t, readBytes, 8192+4096) }