*: optimize DDL history http API, reduce memory (#36859)
close pingcap/tidb#35838
This commit is contained in:
20
ddl/ddl.go
20
ddl/ddl.go
@ -1685,6 +1685,26 @@ func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
|
||||
return allJobs, nil
|
||||
}
|
||||
|
||||
// ScanHistoryDDLJobs get some of the done DDL jobs.
|
||||
// When the DDL history is quite large, GetAllHistoryDDLJobs() API can't work well, because it makes the server OOM.
|
||||
// The result is in descending order by job ID.
|
||||
func ScanHistoryDDLJobs(m *meta.Meta, startJobID int64, limit int) ([]*model.Job, error) {
|
||||
var iter meta.LastJobIterator
|
||||
var err error
|
||||
if startJobID == 0 {
|
||||
iter, err = m.GetLastHistoryDDLJobsIterator()
|
||||
} else {
|
||||
if limit == 0 {
|
||||
return nil, errors.New("when 'start_job_id' is specified, it must work with a 'limit'")
|
||||
}
|
||||
iter, err = m.GetHistoryDDLJobsIterator(startJobID)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return iter.GetLastJobs(limit, nil)
|
||||
}
|
||||
|
||||
// GetHistoryJobByID return history DDL job by ID.
|
||||
func GetHistoryJobByID(sess sessionctx.Context, id int64) (*model.Job, error) {
|
||||
err := sessiontxn.NewTxn(context.Background(), sess)
|
||||
|
||||
@ -463,6 +463,7 @@ timezone.*
|
||||
```shell
|
||||
curl http://{TiDBIP}:10080/ddl/history
|
||||
```
|
||||
**Note**: When the DDL history is very very long, it may consume a lot memory and even cause OOM. Consider adding `start_job_id` and `limit`.
|
||||
|
||||
1. Get count {number} TiDB DDL job history information.
|
||||
|
||||
@ -470,6 +471,12 @@ timezone.*
|
||||
curl http://{TiDBIP}:10080/ddl/history?limit={number}
|
||||
```
|
||||
|
||||
1. Get count {number} TiDB DDL job history information, start with job {id}
|
||||
|
||||
```shell
|
||||
curl http://{TIDBIP}:10080/ddl/history?start_job_id={id}&limit={number}
|
||||
```
|
||||
|
||||
1. Download TiDB debug info
|
||||
|
||||
```shell
|
||||
|
||||
12
meta/meta.go
12
meta/meta.go
@ -1165,6 +1165,18 @@ func (m *Meta) GetLastHistoryDDLJobsIterator() (LastJobIterator, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetHistoryDDLJobsIterator gets the jobs iterator begin with startJobID.
|
||||
func (m *Meta) GetHistoryDDLJobsIterator(startJobID int64) (LastJobIterator, error) {
|
||||
field := m.jobIDKey(startJobID)
|
||||
iter, err := structure.NewHashReverseIterBeginWithField(m.txn, mDDLJobHistoryKey, field)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &HLastJobIterator{
|
||||
iter: iter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// HLastJobIterator is the iterator for gets the latest history.
|
||||
type HLastJobIterator struct {
|
||||
iter *structure.ReverseHashIterator
|
||||
|
||||
@ -93,6 +93,7 @@ const (
|
||||
const (
|
||||
qTableID = "table_id"
|
||||
qLimit = "limit"
|
||||
qJobID = "start_job_id"
|
||||
qOperation = "op"
|
||||
qSeconds = "seconds"
|
||||
)
|
||||
@ -1251,50 +1252,51 @@ func (h tableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
// ServeHTTP handles request of ddl jobs history.
|
||||
func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if limitID := req.FormValue(qLimit); len(limitID) > 0 {
|
||||
lid, err := strconv.Atoi(limitID)
|
||||
|
||||
var jobID, limitID int
|
||||
var err error
|
||||
if jobValue := req.FormValue(qJobID); len(jobValue) > 0 {
|
||||
jobID, err = strconv.Atoi(jobValue)
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if lid < 1 {
|
||||
writeError(w, errors.New("ddl history limit must be greater than 1"))
|
||||
if jobID < 1 {
|
||||
writeError(w, errors.New("ddl history start_job_id must be greater than 0"))
|
||||
return
|
||||
}
|
||||
|
||||
jobs, err := h.getAllHistoryDDL()
|
||||
if err != nil {
|
||||
writeError(w, errors.New("ddl history not found"))
|
||||
return
|
||||
}
|
||||
|
||||
jobsLen := len(jobs)
|
||||
if jobsLen > lid {
|
||||
start := jobsLen - lid
|
||||
jobs = jobs[start:]
|
||||
}
|
||||
|
||||
writeData(w, jobs)
|
||||
return
|
||||
}
|
||||
jobs, err := h.getAllHistoryDDL()
|
||||
if limitValue := req.FormValue(qLimit); len(limitValue) > 0 {
|
||||
limitID, err = strconv.Atoi(limitValue)
|
||||
if err != nil {
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
if limitID < 1 {
|
||||
writeError(w, errors.New("ddl history limit must be greater than 0"))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
jobs, err := h.getHistoryDDL(jobID, limitID)
|
||||
if err != nil {
|
||||
writeError(w, errors.New("ddl history not found"))
|
||||
writeError(w, err)
|
||||
return
|
||||
}
|
||||
writeData(w, jobs)
|
||||
}
|
||||
|
||||
func (h ddlHistoryJobHandler) getAllHistoryDDL() ([]*model.Job, error) {
|
||||
func (h ddlHistoryJobHandler) getHistoryDDL(jobID, limit int) (jobs []*model.Job, err error) {
|
||||
txn, err := h.Store.Begin()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
txnMeta := meta.NewMeta(txn)
|
||||
|
||||
jobs, err := ddl.GetAllHistoryDDLJobs(txnMeta)
|
||||
if jobID == 0 && limit == 0 {
|
||||
jobs, err = ddl.GetAllHistoryDDLJobs(txnMeta)
|
||||
} else {
|
||||
jobs, err = ddl.ScanHistoryDDLJobs(txnMeta, int64(jobID), limit)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -977,6 +977,25 @@ func TestAllHistory(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, resp.Body.Close())
|
||||
require.Equal(t, data, jobs)
|
||||
|
||||
// Cover the start_job_id parameter.
|
||||
resp, err = ts.fetchStatus("/ddl/history?start_job_id=41")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, resp.Body.Close())
|
||||
|
||||
resp, err = ts.fetchStatus("/ddl/history?start_job_id=41&limit=3")
|
||||
require.NoError(t, err)
|
||||
decoder = json.NewDecoder(resp.Body)
|
||||
err = decoder.Decode(&jobs)
|
||||
require.NoError(t, err)
|
||||
|
||||
// The result is in descending order
|
||||
lastID := int64(42)
|
||||
for _, job := range jobs {
|
||||
require.Less(t, job.ID, lastID)
|
||||
lastID = job.ID
|
||||
}
|
||||
require.NoError(t, resp.Body.Close())
|
||||
}
|
||||
|
||||
func dummyRecord() *deadlockhistory.DeadlockRecord {
|
||||
|
||||
@ -288,8 +288,24 @@ func (*ReverseHashIterator) Close() {}
|
||||
|
||||
// NewHashReverseIter creates a reverse hash iterator.
|
||||
func NewHashReverseIter(t *TxStructure, key []byte) (*ReverseHashIterator, error) {
|
||||
return newHashReverseIter(t, key, nil)
|
||||
}
|
||||
|
||||
// NewHashReverseIterBeginWithField creates a reverse hash iterator, begin with field.
|
||||
func NewHashReverseIterBeginWithField(t *TxStructure, key []byte, field []byte) (*ReverseHashIterator, error) {
|
||||
return newHashReverseIter(t, key, field)
|
||||
}
|
||||
|
||||
func newHashReverseIter(t *TxStructure, key []byte, field []byte) (*ReverseHashIterator, error) {
|
||||
var iterStart kv.Key
|
||||
dataPrefix := t.hashDataKeyPrefix(key)
|
||||
it, err := t.reader.IterReverse(dataPrefix.PrefixNext())
|
||||
if len(field) == 0 {
|
||||
iterStart = dataPrefix.PrefixNext()
|
||||
} else {
|
||||
iterStart = t.encodeHashDataKey(key, field).PrefixNext()
|
||||
}
|
||||
|
||||
it, err := t.reader.IterReverse(iterStart)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user