migrate TTL manager to ds.TxnDatastore.

This commit is contained in:
Raúl Kripalani 2018-09-04 14:30:52 +01:00
parent 1b57c62e3d
commit de573ac1ff

View File

@ -424,22 +424,22 @@ type ttlManager struct {
ctx context.Context
cancel context.CancelFunc
ticker *time.Ticker
ds ds.Batching
ds ds.TxnDatastore
cache cache
}
func newTTLManager(parent context.Context, d ds.Datastore, c *cache, tick time.Duration) *ttlManager {
ctx, cancel := context.WithCancel(parent)
batching, ok := d.(ds.Batching)
txnDs, ok := d.(ds.TxnDatastore)
if !ok {
panic("must construct ttlManager with batching datastore")
panic("must construct ttlManager with transactional datastore")
}
mgr := &ttlManager{
entries: make(map[ds.Key]*ttlEntry),
ctx: ctx,
cancel: cancel,
ticker: time.NewTicker(tick),
ds: batching,
ds: txnDs,
cache: *c,
}
@ -463,25 +463,24 @@ func (mgr *ttlManager) tick() {
mgr.Lock()
defer mgr.Unlock()
txn := mgr.ds.NewTransaction(false)
defer txn.Discard()
now := time.Now()
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
for key, entry := range mgr.entries {
if entry.ExpiresAt.Before(now) {
if err := batch.Delete(key); err != nil {
log.Error(err)
} else {
mgr.cache.Remove(key)
}
delete(mgr.entries, key)
if entry.ExpiresAt.After(now) {
continue
}
if err := txn.Delete(key); err != nil {
log.Error("failed to delete TTL key: %v, cause: %v", key.String(), err)
break
}
mgr.cache.Remove(key)
delete(mgr.entries, key)
}
err = batch.Commit()
if err != nil {
log.Error(err)
if err := txn.Commit(); err != nil {
log.Error("failed to commit TTL deletion, cause: %v", err)
}
}