diff --git a/ddl/column.go b/ddl/column.go index 933cd9eff5..4808121db6 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -336,7 +336,7 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error { for _, handle := range handles { - log.Info("backfill column...", handle) + log.Info("[ddl] backfill column...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { if err := d.isReorgRunnable(txn); err != nil { diff --git a/ddl/ddl.go b/ddl/ddl.go index a2dcacfb8f..e97733ead7 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -193,7 +193,7 @@ func (d *ddl) SetLease(lease time.Duration) { return } - log.Warnf("change schema lease %s -> %s", d.lease, lease) + log.Warnf("[ddl] change schema lease %s -> %s", d.lease, lease) if d.isClosed() { // if already closed, just set lease and return diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index f2885b4d39..85b785731a 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -336,7 +336,7 @@ func (ts *testSuite) TestAlterTableColumn(c *C) { } func statement(ctx context.Context, sql string) stmt.Statement { - log.Debug("Compile", sql) + log.Debug("[ddl] Compile", sql) lexer := parser.NewLexer(sql) parser.YYParse(lexer) compiler := &executor.Compiler{} diff --git a/ddl/index.go b/ddl/index.go index 98d3b608e3..446ee00156 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -426,7 +426,7 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.ID, indexInfo.Unique) for _, handle := range handles { - log.Debug("building index...", handle) + log.Debug("[ddl] building index...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { if err := d.isReorgRunnable(txn); err != nil { diff --git a/ddl/worker.go b/ddl/worker.go index c53101123d..d914f68a88 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -51,7 +51,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { // notice worker that we push a new job and wait the job done. asyncNotify(d.jobCh) - log.Warnf("start DDL job %v", job) + log.Warnf("[ddl] start DDL job %v", job) jobID := job.ID @@ -69,10 +69,10 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { historyJob, err = d.getHistoryJob(jobID) if err != nil { - log.Errorf("get history job err %v, check again", err) + log.Errorf("[ddl] get history job err %v, check again", err) continue } else if historyJob == nil { - log.Warnf("job %d is not in history, maybe not run", jobID) + log.Warnf("[ddl] job %d is not in history, maybe not run", jobID) continue } @@ -129,11 +129,11 @@ func (d *ddl) checkOwner(t *meta.Meta) (*model.Owner, error) { if err = t.SetDDLOwner(owner); err != nil { return nil, errors.Trace(err) } - log.Debugf("become owner %s", owner.OwnerID) + log.Debugf("[ddl] become owner %s", owner.OwnerID) } if owner.OwnerID != d.uuid { - log.Debugf("not owner, owner is %s", owner.OwnerID) + log.Debugf("[ddl] not owner, owner is %s", owner.OwnerID) return nil, errors.Trace(ErrNotOwner) } @@ -152,7 +152,7 @@ func (d *ddl) updateJob(t *meta.Meta, job *model.Job) error { } func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error { - log.Warnf("finish DDL job %v", job) + log.Warnf("[ddl] finish DDL job %v", job) // done, notice and run next job. _, err := t.DeQueueDDLJob() if err != nil { @@ -203,13 +203,13 @@ func (d *ddl) handleJobQueue() error { // wait again. elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS) if elapsed > 0 && elapsed < waitTime { - log.Warnf("the elapsed time from last update is %s < %s, wait again", elapsed, waitTime) + log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime) waitTime -= elapsed return nil } } - log.Warnf("run DDL job %v", job) + log.Warnf("[ddl] run DDL job %v", job) d.hook.OnJobRunBefore(job) @@ -281,7 +281,7 @@ func (d *ddl) onWorker() { for { select { case <-ticker.C: - log.Debugf("wait %s to check DDL status again", checkTime) + log.Debugf("[ddl] wait %s to check DDL status again", checkTime) case <-d.jobCh: case <-d.quitCh: return @@ -289,7 +289,7 @@ func (d *ddl) onWorker() { err := d.handleJobQueue() if err != nil { - log.Errorf("handle job err %v", errors.ErrorStack(err)) + log.Errorf("[ddl] handle job err %v", errors.ErrorStack(err)) } } } diff --git a/domain/domain.go b/domain/domain.go index 773969b4d1..1ac14b6fac 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -55,7 +55,7 @@ func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) { if info != nil && schemaMetaVersion <= info.SchemaMetaVersion() { // info may be changed by other txn, so here its version may be bigger than schema version, // so we don't need to reload. - log.Debugf("schema version is still %d, no need reload", schemaMetaVersion) + log.Debugf("[ddl] schema version is still %d, no need reload", schemaMetaVersion) return nil } @@ -85,7 +85,7 @@ func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) { } } - log.Infof("loadInfoSchema %d", schemaMetaVersion) + log.Infof("[ddl] loadInfoSchema %d", schemaMetaVersion) err = do.infoHandle.Set(schemas, schemaMetaVersion) return errors.Trace(err) } @@ -167,7 +167,7 @@ func (do *Domain) reload() error { } if err != nil { - log.Errorf("load schema err %v, retry again", errors.ErrorStack(err)) + log.Errorf("[ddl] load schema err %v, retry again", errors.ErrorStack(err)) // TODO: use a backoff algorithm. time.Sleep(500 * time.Millisecond) continue @@ -192,7 +192,7 @@ func (do *Domain) mustReload() { // if reload error, we will terminate whole program to guarantee data safe. err := do.reload() if err != nil { - log.Fatalf("reload schema err %v", errors.ErrorStack(err)) + log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err)) } } @@ -216,7 +216,7 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) { if terror.ErrorEqual(err, localstore.ErrDBClosed) { return } else if err != nil { - log.Fatalf("reload schema err %v", errors.ErrorStack(err)) + log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err)) } case newLease := <-do.leaseCh: if newLease <= 0 { @@ -245,7 +245,7 @@ func (c *ddlCallback) OnChanged(err error) error { if err != nil { return err } - log.Warnf("on DDL change") + log.Warnf("[ddl] on DDL change") c.do.mustReload() return nil diff --git a/kv/txn.go b/kv/txn.go index aa582ae337..2bf483970d 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -48,13 +48,13 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e for i := 0; i < maxRetryCnt; i++ { txn, err := store.Begin() if err != nil { - log.Errorf("RunInNewTxn error - %v", err) + log.Errorf("[kv] RunInNewTxn error - %v", err) return errors.Trace(err) } err = f(txn) if retryable && IsRetryableError(err) { - log.Warnf("Retry txn %v", txn) + log.Warnf("[kv] Retry txn %v", txn) txn.Rollback() continue } @@ -64,7 +64,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e err = txn.Commit() if retryable && IsRetryableError(err) { - log.Warnf("Retry txn %v", txn) + log.Warnf("[kv] Retry txn %v", txn) txn.Rollback() BackOff(i) continue diff --git a/kv/union_iter.go b/kv/union_iter.go index f6d119db52..0ef4069b5a 100644 --- a/kv/union_iter.go +++ b/kv/union_iter.go @@ -102,7 +102,7 @@ func (iter *UnionIter) updateCur() { } else { // record from dirty comes first if len(iter.dirtyIt.Value()) == 0 { - log.Warnf("delete a record not exists? k = %q", iter.dirtyIt.Key()) + log.Warnf("[kv] delete a record not exists? k = %q", iter.dirtyIt.Key()) // jump over this deletion iter.dirtyNext() continue diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 132be1edfc..45aeb3ff86 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -68,7 +68,7 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) { } alloc.base++ - log.Infof("Alloc id %d, table ID:%d, from %p, database ID:%d", alloc.base, tableID, alloc, alloc.dbID) + log.Debugf("[kv] Alloc id %d, table ID:%d, from %p, database ID:%d", alloc.base, tableID, alloc, alloc.dbID) return alloc.base, nil } diff --git a/store/hbase/txn.go b/store/hbase/txn.go index 69dac89ba3..4910959789 100644 --- a/store/hbase/txn.go +++ b/store/hbase/txn.go @@ -65,22 +65,22 @@ func newHbaseTxn(t themis.Txn, storeName string) *hbaseTxn { // Implement transaction interface func (txn *hbaseTxn) Get(k kv.Key) ([]byte, error) { - log.Debugf("get key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid) return txn.UnionStore.Get(k) } func (txn *hbaseTxn) Set(k kv.Key, v []byte) error { - log.Debugf("seek %q txn:%d", k, txn.tid) + log.Debugf("[kv] seek %q txn:%d", k, txn.tid) return txn.UnionStore.Set(k, v) } func (txn *hbaseTxn) Inc(k kv.Key, step int64) (int64, error) { - log.Debugf("Inc %q, step %d txn:%d", k, step, txn.tid) + log.Debugf("[kv] Inc %q, step %d txn:%d", k, step, txn.tid) return txn.UnionStore.Inc(k, step) } func (txn *hbaseTxn) GetInt64(k kv.Key) (int64, error) { - log.Debugf("GetInt64 %q, txn:%d", k, txn.tid) + log.Debugf("[kv] GetInt64 %q, txn:%d", k, txn.tid) return txn.UnionStore.GetInt64(k) } @@ -89,7 +89,7 @@ func (txn *hbaseTxn) String() string { } func (txn *hbaseTxn) Seek(k kv.Key) (kv.Iterator, error) { - log.Debugf("seek %q txn:%d", k, txn.tid) + log.Debugf("[kv] seek %q txn:%d", k, txn.tid) iter, err := txn.UnionStore.Seek(k) if err != nil { return nil, errors.Trace(err) @@ -98,7 +98,7 @@ func (txn *hbaseTxn) Seek(k kv.Key) (kv.Iterator, error) { } func (txn *hbaseTxn) Delete(k kv.Key) error { - log.Debugf("delete %q txn:%d", k, txn.tid) + log.Debugf("[kv] delete %q txn:%d", k, txn.tid) err := txn.UnionStore.Delete(k) if err != nil { return errors.Trace(err) @@ -140,7 +140,7 @@ func (txn *hbaseTxn) doCommit() error { } txn.version = kv.NewVersion(txn.txn.GetCommitTS()) - log.Debugf("commit successfully, txn.version:%d", txn.version.Ver) + log.Debugf("[kv] commit successfully, txn.version:%d", txn.version.Ver) return nil } @@ -148,7 +148,7 @@ func (txn *hbaseTxn) Commit() error { if !txn.valid { return kv.ErrInvalidTxn } - log.Debugf("start to commit txn %d", txn.tid) + log.Debugf("[kv] start to commit txn %d", txn.tid) defer func() { txn.close() }() @@ -174,7 +174,7 @@ func (txn *hbaseTxn) Rollback() error { if !txn.valid { return kv.ErrInvalidTxn } - log.Warnf("Rollback txn %d", txn.tid) + log.Warnf("[kv] Rollback txn %d", txn.tid) return txn.close() } diff --git a/store/localstore/compactor.go b/store/localstore/compactor.go index a60eb2086e..2a70276261 100644 --- a/store/localstore/compactor.go +++ b/store/localstore/compactor.go @@ -100,7 +100,7 @@ func (gc *localstoreCompactor) deleteWorker() { batch.Delete(key) // Batch delete. if cnt == gc.policy.BatchDeleteCnt { - log.Debugf("GC delete commit %d keys", batch.Len()) + log.Debugf("[kv] GC delete commit %d keys", batch.Len()) err := gc.db.Commit(batch) if err != nil { log.Error(err) @@ -117,7 +117,7 @@ func (gc *localstoreCompactor) checkExpiredKeysWorker() { for { select { case <-gc.stopCh: - log.Debug("GC stopped") + log.Debug("[kv] GC stopped") return case <-gc.ticker.C: gc.mu.Lock() @@ -169,7 +169,9 @@ func (gc *localstoreCompactor) Compact(k kv.Key) error { return errors.Trace(err) } filteredKeys := gc.filterExpiredKeys(keys) - log.Debugf("GC send %d keys to delete worker", len(filteredKeys)) + if len(filteredKeys) > 0 { + log.Debugf("[kv] GC send %d keys to delete worker", len(filteredKeys)) + } for _, key := range filteredKeys { gc.delCh <- key } diff --git a/store/localstore/kv.go b/store/localstore/kv.go index 10749b5c1e..7750192170 100644 --- a/store/localstore/kv.go +++ b/store/localstore/kv.go @@ -82,7 +82,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) { if store, ok := mc.cache[schema]; ok { // TODO: check the cache store has the same engine with this Driver. - log.Info("cache store", schema) + log.Info("[kv] cache store", schema) return store, nil } @@ -91,7 +91,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) { return nil, errors.Trace(err) } - log.Info("New store", schema) + log.Info("[kv] New store", schema) s := &dbStore{ txns: make(map[uint64]*dbTxn), keysLocked: make(map[string]uint64), @@ -164,7 +164,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) { version: kv.MinVersion, snapshotVals: make(map[string]struct{}), } - log.Debugf("Begin txn:%d", txn.tid) + log.Debugf("[kv] Begin txn:%d", txn.tid) txn.UnionStore = kv.NewUnionStore(newSnapshot(s, s.db, beginVer)) return txn, nil } @@ -247,7 +247,7 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string) error { // If there's newer version of this key, returns error. if ver > tid { - log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q", tid, key, currValue) + log.Warnf("[kv] txn:%d, tryLockKey condition not match for key %s, currValue:%q", tid, key, currValue) return errors.Trace(kv.ErrConditionNotMatch) } diff --git a/store/localstore/local_version_provider.go b/store/localstore/local_version_provider.go index a29bda4437..8450647678 100644 --- a/store/localstore/local_version_provider.go +++ b/store/localstore/local_version_provider.go @@ -37,7 +37,7 @@ func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) { ts = uint64((time.Now().UnixNano() / int64(time.Millisecond)) << timePrecisionOffset) if l.lastTimestamp > ts { - log.Error("invalid physical time stamp") + log.Error("[kv] invalid physical time stamp") continue } diff --git a/store/localstore/txn.go b/store/localstore/txn.go index c448fe21e1..6644a8f0b0 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -50,7 +50,7 @@ func (txn *dbTxn) markOrigin(k []byte) { // Implement transaction interface func (txn *dbTxn) Get(k kv.Key) ([]byte, error) { - log.Debugf("get key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid) val, err := txn.UnionStore.Get(k) if err != nil { return nil, errors.Trace(err) @@ -60,7 +60,7 @@ func (txn *dbTxn) Get(k kv.Key) ([]byte, error) { } func (txn *dbTxn) Set(k kv.Key, data []byte) error { - log.Debugf("set key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] set key:%q, txn:%d", k, txn.tid) err := txn.UnionStore.Set(k, data) if err != nil { return errors.Trace(err) @@ -71,7 +71,7 @@ func (txn *dbTxn) Set(k kv.Key, data []byte) error { } func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) { - log.Debugf("Inc %q, step %d txn:%d", k, step, txn.tid) + log.Debugf("[kv] Inc %q, step %d txn:%d", k, step, txn.tid) txn.markOrigin(k) val, err := txn.UnionStore.Inc(k, step) @@ -83,7 +83,7 @@ func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) { } func (txn *dbTxn) GetInt64(k kv.Key) (int64, error) { - log.Debugf("GetInt64 %q, txn:%d", k, txn.tid) + log.Debugf("[kv] GetInt64 %q, txn:%d", k, txn.tid) val, err := txn.UnionStore.GetInt64(k) if err != nil { return 0, errors.Trace(err) @@ -97,7 +97,7 @@ func (txn *dbTxn) String() string { } func (txn *dbTxn) Seek(k kv.Key) (kv.Iterator, error) { - log.Debugf("seek key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] seek key:%q, txn:%d", k, txn.tid) iter, err := txn.UnionStore.Seek(k) if err != nil { return nil, errors.Trace(err) @@ -110,7 +110,7 @@ func (txn *dbTxn) Seek(k kv.Key) (kv.Iterator, error) { } func (txn *dbTxn) Delete(k kv.Key) error { - log.Debugf("delete key:%q, txn:%d", k, txn.tid) + log.Debugf("[kv] delete key:%q, txn:%d", k, txn.tid) err := txn.UnionStore.Delete(k) if err != nil { return errors.Trace(err) @@ -177,7 +177,7 @@ func (txn *dbTxn) Commit() error { if !txn.valid { return errors.Trace(kv.ErrInvalidTxn) } - log.Infof("commit txn %d", txn.tid) + log.Debugf("[kv] commit txn %d", txn.tid) defer func() { txn.close() }() @@ -204,7 +204,7 @@ func (txn *dbTxn) Rollback() error { if !txn.valid { return errors.Trace(kv.ErrInvalidTxn) } - log.Warnf("Rollback txn %d", txn.tid) + log.Warnf("[kv] Rollback txn %d", txn.tid) return txn.close() }