Merge pull request #745 from pingcap/coocood/txn-log-switch

kv, ddl: add tag on log message to make it easy to be filtered.
This commit is contained in:
Ewan Chou
2015-12-18 11:15:24 +08:00
14 changed files with 52 additions and 50 deletions

View File

@ -336,7 +336,7 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error {
for _, handle := range handles {
log.Info("backfill column...", handle)
log.Info("[ddl] backfill column...", handle)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
if err := d.isReorgRunnable(txn); err != nil {

View File

@ -193,7 +193,7 @@ func (d *ddl) SetLease(lease time.Duration) {
return
}
log.Warnf("change schema lease %s -> %s", d.lease, lease)
log.Warnf("[ddl] change schema lease %s -> %s", d.lease, lease)
if d.isClosed() {
// if already closed, just set lease and return

View File

@ -336,7 +336,7 @@ func (ts *testSuite) TestAlterTableColumn(c *C) {
}
func statement(ctx context.Context, sql string) stmt.Statement {
log.Debug("Compile", sql)
log.Debug("[ddl] Compile", sql)
lexer := parser.NewLexer(sql)
parser.YYParse(lexer)
compiler := &executor.Compiler{}

View File

@ -426,7 +426,7 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand
kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.ID, indexInfo.Unique)
for _, handle := range handles {
log.Debug("building index...", handle)
log.Debug("[ddl] building index...", handle)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
if err := d.isReorgRunnable(txn); err != nil {

View File

@ -51,7 +51,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
// notice worker that we push a new job and wait the job done.
asyncNotify(d.jobCh)
log.Warnf("start DDL job %v", job)
log.Warnf("[ddl] start DDL job %v", job)
jobID := job.ID
@ -69,10 +69,10 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
historyJob, err = d.getHistoryJob(jobID)
if err != nil {
log.Errorf("get history job err %v, check again", err)
log.Errorf("[ddl] get history job err %v, check again", err)
continue
} else if historyJob == nil {
log.Warnf("job %d is not in history, maybe not run", jobID)
log.Warnf("[ddl] job %d is not in history, maybe not run", jobID)
continue
}
@ -129,11 +129,11 @@ func (d *ddl) checkOwner(t *meta.Meta) (*model.Owner, error) {
if err = t.SetDDLOwner(owner); err != nil {
return nil, errors.Trace(err)
}
log.Debugf("become owner %s", owner.OwnerID)
log.Debugf("[ddl] become owner %s", owner.OwnerID)
}
if owner.OwnerID != d.uuid {
log.Debugf("not owner, owner is %s", owner.OwnerID)
log.Debugf("[ddl] not owner, owner is %s", owner.OwnerID)
return nil, errors.Trace(ErrNotOwner)
}
@ -152,7 +152,7 @@ func (d *ddl) updateJob(t *meta.Meta, job *model.Job) error {
}
func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error {
log.Warnf("finish DDL job %v", job)
log.Warnf("[ddl] finish DDL job %v", job)
// done, notice and run next job.
_, err := t.DeQueueDDLJob()
if err != nil {
@ -203,13 +203,13 @@ func (d *ddl) handleJobQueue() error {
// wait again.
elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS)
if elapsed > 0 && elapsed < waitTime {
log.Warnf("the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
log.Warnf("[ddl] the elapsed time from last update is %s < %s, wait again", elapsed, waitTime)
waitTime -= elapsed
return nil
}
}
log.Warnf("run DDL job %v", job)
log.Warnf("[ddl] run DDL job %v", job)
d.hook.OnJobRunBefore(job)
@ -281,7 +281,7 @@ func (d *ddl) onWorker() {
for {
select {
case <-ticker.C:
log.Debugf("wait %s to check DDL status again", checkTime)
log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
case <-d.jobCh:
case <-d.quitCh:
return
@ -289,7 +289,7 @@ func (d *ddl) onWorker() {
err := d.handleJobQueue()
if err != nil {
log.Errorf("handle job err %v", errors.ErrorStack(err))
log.Errorf("[ddl] handle job err %v", errors.ErrorStack(err))
}
}
}

View File

@ -55,7 +55,7 @@ func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
if info != nil && schemaMetaVersion <= info.SchemaMetaVersion() {
// info may be changed by other txn, so here its version may be bigger than schema version,
// so we don't need to reload.
log.Debugf("schema version is still %d, no need reload", schemaMetaVersion)
log.Debugf("[ddl] schema version is still %d, no need reload", schemaMetaVersion)
return nil
}
@ -85,7 +85,7 @@ func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
}
}
log.Infof("loadInfoSchema %d", schemaMetaVersion)
log.Infof("[ddl] loadInfoSchema %d", schemaMetaVersion)
err = do.infoHandle.Set(schemas, schemaMetaVersion)
return errors.Trace(err)
}
@ -167,7 +167,7 @@ func (do *Domain) reload() error {
}
if err != nil {
log.Errorf("load schema err %v, retry again", errors.ErrorStack(err))
log.Errorf("[ddl] load schema err %v, retry again", errors.ErrorStack(err))
// TODO: use a backoff algorithm.
time.Sleep(500 * time.Millisecond)
continue
@ -192,7 +192,7 @@ func (do *Domain) mustReload() {
// if reload error, we will terminate whole program to guarantee data safe.
err := do.reload()
if err != nil {
log.Fatalf("reload schema err %v", errors.ErrorStack(err))
log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err))
}
}
@ -216,7 +216,7 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
if terror.ErrorEqual(err, localstore.ErrDBClosed) {
return
} else if err != nil {
log.Fatalf("reload schema err %v", errors.ErrorStack(err))
log.Fatalf("[ddl] reload schema err %v", errors.ErrorStack(err))
}
case newLease := <-do.leaseCh:
if newLease <= 0 {
@ -245,7 +245,7 @@ func (c *ddlCallback) OnChanged(err error) error {
if err != nil {
return err
}
log.Warnf("on DDL change")
log.Warnf("[ddl] on DDL change")
c.do.mustReload()
return nil

View File

@ -48,13 +48,13 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e
for i := 0; i < maxRetryCnt; i++ {
txn, err := store.Begin()
if err != nil {
log.Errorf("RunInNewTxn error - %v", err)
log.Errorf("[kv] RunInNewTxn error - %v", err)
return errors.Trace(err)
}
err = f(txn)
if retryable && IsRetryableError(err) {
log.Warnf("Retry txn %v", txn)
log.Warnf("[kv] Retry txn %v", txn)
txn.Rollback()
continue
}
@ -64,7 +64,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e
err = txn.Commit()
if retryable && IsRetryableError(err) {
log.Warnf("Retry txn %v", txn)
log.Warnf("[kv] Retry txn %v", txn)
txn.Rollback()
BackOff(i)
continue

View File

@ -102,7 +102,7 @@ func (iter *UnionIter) updateCur() {
} else {
// record from dirty comes first
if len(iter.dirtyIt.Value()) == 0 {
log.Warnf("delete a record not exists? k = %q", iter.dirtyIt.Key())
log.Warnf("[kv] delete a record not exists? k = %q", iter.dirtyIt.Key())
// jump over this deletion
iter.dirtyNext()
continue

View File

@ -68,7 +68,7 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) {
}
alloc.base++
log.Infof("Alloc id %d, table ID:%d, from %p, database ID:%d", alloc.base, tableID, alloc, alloc.dbID)
log.Debugf("[kv] Alloc id %d, table ID:%d, from %p, database ID:%d", alloc.base, tableID, alloc, alloc.dbID)
return alloc.base, nil
}

View File

@ -65,22 +65,22 @@ func newHbaseTxn(t themis.Txn, storeName string) *hbaseTxn {
// Implement transaction interface
func (txn *hbaseTxn) Get(k kv.Key) ([]byte, error) {
log.Debugf("get key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid)
return txn.UnionStore.Get(k)
}
func (txn *hbaseTxn) Set(k kv.Key, v []byte) error {
log.Debugf("seek %q txn:%d", k, txn.tid)
log.Debugf("[kv] seek %q txn:%d", k, txn.tid)
return txn.UnionStore.Set(k, v)
}
func (txn *hbaseTxn) Inc(k kv.Key, step int64) (int64, error) {
log.Debugf("Inc %q, step %d txn:%d", k, step, txn.tid)
log.Debugf("[kv] Inc %q, step %d txn:%d", k, step, txn.tid)
return txn.UnionStore.Inc(k, step)
}
func (txn *hbaseTxn) GetInt64(k kv.Key) (int64, error) {
log.Debugf("GetInt64 %q, txn:%d", k, txn.tid)
log.Debugf("[kv] GetInt64 %q, txn:%d", k, txn.tid)
return txn.UnionStore.GetInt64(k)
}
@ -89,7 +89,7 @@ func (txn *hbaseTxn) String() string {
}
func (txn *hbaseTxn) Seek(k kv.Key) (kv.Iterator, error) {
log.Debugf("seek %q txn:%d", k, txn.tid)
log.Debugf("[kv] seek %q txn:%d", k, txn.tid)
iter, err := txn.UnionStore.Seek(k)
if err != nil {
return nil, errors.Trace(err)
@ -98,7 +98,7 @@ func (txn *hbaseTxn) Seek(k kv.Key) (kv.Iterator, error) {
}
func (txn *hbaseTxn) Delete(k kv.Key) error {
log.Debugf("delete %q txn:%d", k, txn.tid)
log.Debugf("[kv] delete %q txn:%d", k, txn.tid)
err := txn.UnionStore.Delete(k)
if err != nil {
return errors.Trace(err)
@ -140,7 +140,7 @@ func (txn *hbaseTxn) doCommit() error {
}
txn.version = kv.NewVersion(txn.txn.GetCommitTS())
log.Debugf("commit successfully, txn.version:%d", txn.version.Ver)
log.Debugf("[kv] commit successfully, txn.version:%d", txn.version.Ver)
return nil
}
@ -148,7 +148,7 @@ func (txn *hbaseTxn) Commit() error {
if !txn.valid {
return kv.ErrInvalidTxn
}
log.Debugf("start to commit txn %d", txn.tid)
log.Debugf("[kv] start to commit txn %d", txn.tid)
defer func() {
txn.close()
}()
@ -174,7 +174,7 @@ func (txn *hbaseTxn) Rollback() error {
if !txn.valid {
return kv.ErrInvalidTxn
}
log.Warnf("Rollback txn %d", txn.tid)
log.Warnf("[kv] Rollback txn %d", txn.tid)
return txn.close()
}

View File

@ -100,7 +100,7 @@ func (gc *localstoreCompactor) deleteWorker() {
batch.Delete(key)
// Batch delete.
if cnt == gc.policy.BatchDeleteCnt {
log.Debugf("GC delete commit %d keys", batch.Len())
log.Debugf("[kv] GC delete commit %d keys", batch.Len())
err := gc.db.Commit(batch)
if err != nil {
log.Error(err)
@ -117,7 +117,7 @@ func (gc *localstoreCompactor) checkExpiredKeysWorker() {
for {
select {
case <-gc.stopCh:
log.Debug("GC stopped")
log.Debug("[kv] GC stopped")
return
case <-gc.ticker.C:
gc.mu.Lock()
@ -169,7 +169,9 @@ func (gc *localstoreCompactor) Compact(k kv.Key) error {
return errors.Trace(err)
}
filteredKeys := gc.filterExpiredKeys(keys)
log.Debugf("GC send %d keys to delete worker", len(filteredKeys))
if len(filteredKeys) > 0 {
log.Debugf("[kv] GC send %d keys to delete worker", len(filteredKeys))
}
for _, key := range filteredKeys {
gc.delCh <- key
}

View File

@ -82,7 +82,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) {
if store, ok := mc.cache[schema]; ok {
// TODO: check the cache store has the same engine with this Driver.
log.Info("cache store", schema)
log.Info("[kv] cache store", schema)
return store, nil
}
@ -91,7 +91,7 @@ func (d Driver) Open(schema string) (kv.Storage, error) {
return nil, errors.Trace(err)
}
log.Info("New store", schema)
log.Info("[kv] New store", schema)
s := &dbStore{
txns: make(map[uint64]*dbTxn),
keysLocked: make(map[string]uint64),
@ -164,7 +164,7 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
version: kv.MinVersion,
snapshotVals: make(map[string]struct{}),
}
log.Debugf("Begin txn:%d", txn.tid)
log.Debugf("[kv] Begin txn:%d", txn.tid)
txn.UnionStore = kv.NewUnionStore(newSnapshot(s, s.db, beginVer))
return txn, nil
}
@ -247,7 +247,7 @@ func (s *dbStore) tryConditionLockKey(tid uint64, key string) error {
// If there's newer version of this key, returns error.
if ver > tid {
log.Warnf("txn:%d, tryLockKey condition not match for key %s, currValue:%q", tid, key, currValue)
log.Warnf("[kv] txn:%d, tryLockKey condition not match for key %s, currValue:%q", tid, key, currValue)
return errors.Trace(kv.ErrConditionNotMatch)
}

View File

@ -37,7 +37,7 @@ func (l *LocalVersionProvider) CurrentVersion() (kv.Version, error) {
ts = uint64((time.Now().UnixNano() / int64(time.Millisecond)) << timePrecisionOffset)
if l.lastTimestamp > ts {
log.Error("invalid physical time stamp")
log.Error("[kv] invalid physical time stamp")
continue
}

View File

@ -50,7 +50,7 @@ func (txn *dbTxn) markOrigin(k []byte) {
// Implement transaction interface
func (txn *dbTxn) Get(k kv.Key) ([]byte, error) {
log.Debugf("get key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] get key:%q, txn:%d", k, txn.tid)
val, err := txn.UnionStore.Get(k)
if err != nil {
return nil, errors.Trace(err)
@ -60,7 +60,7 @@ func (txn *dbTxn) Get(k kv.Key) ([]byte, error) {
}
func (txn *dbTxn) Set(k kv.Key, data []byte) error {
log.Debugf("set key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] set key:%q, txn:%d", k, txn.tid)
err := txn.UnionStore.Set(k, data)
if err != nil {
return errors.Trace(err)
@ -71,7 +71,7 @@ func (txn *dbTxn) Set(k kv.Key, data []byte) error {
}
func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) {
log.Debugf("Inc %q, step %d txn:%d", k, step, txn.tid)
log.Debugf("[kv] Inc %q, step %d txn:%d", k, step, txn.tid)
txn.markOrigin(k)
val, err := txn.UnionStore.Inc(k, step)
@ -83,7 +83,7 @@ func (txn *dbTxn) Inc(k kv.Key, step int64) (int64, error) {
}
func (txn *dbTxn) GetInt64(k kv.Key) (int64, error) {
log.Debugf("GetInt64 %q, txn:%d", k, txn.tid)
log.Debugf("[kv] GetInt64 %q, txn:%d", k, txn.tid)
val, err := txn.UnionStore.GetInt64(k)
if err != nil {
return 0, errors.Trace(err)
@ -97,7 +97,7 @@ func (txn *dbTxn) String() string {
}
func (txn *dbTxn) Seek(k kv.Key) (kv.Iterator, error) {
log.Debugf("seek key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] seek key:%q, txn:%d", k, txn.tid)
iter, err := txn.UnionStore.Seek(k)
if err != nil {
return nil, errors.Trace(err)
@ -110,7 +110,7 @@ func (txn *dbTxn) Seek(k kv.Key) (kv.Iterator, error) {
}
func (txn *dbTxn) Delete(k kv.Key) error {
log.Debugf("delete key:%q, txn:%d", k, txn.tid)
log.Debugf("[kv] delete key:%q, txn:%d", k, txn.tid)
err := txn.UnionStore.Delete(k)
if err != nil {
return errors.Trace(err)
@ -177,7 +177,7 @@ func (txn *dbTxn) Commit() error {
if !txn.valid {
return errors.Trace(kv.ErrInvalidTxn)
}
log.Infof("commit txn %d", txn.tid)
log.Debugf("[kv] commit txn %d", txn.tid)
defer func() {
txn.close()
}()
@ -204,7 +204,7 @@ func (txn *dbTxn) Rollback() error {
if !txn.valid {
return errors.Trace(kv.ErrInvalidTxn)
}
log.Warnf("Rollback txn %d", txn.tid)
log.Warnf("[kv] Rollback txn %d", txn.tid)
return txn.close()
}