mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2025-02-13 07:20:11 +08:00
Merge pull request #34 from libp2p/ttldatastore
Store-native TTL management
This commit is contained in:
commit
6f9a3c21c8
@ -57,14 +57,14 @@
|
||||
},
|
||||
{
|
||||
"author": "magik6k",
|
||||
"hash": "QmUCfrikzKVGAfpE31RPwPd32fu1DYxSG7HTGCadba5Wza",
|
||||
"hash": "QmaiEBFgkgB1wjrPRxru5PyXPEkx58WuMjXNaR1Q9QNRjn",
|
||||
"name": "go-ds-badger",
|
||||
"version": "1.6.1"
|
||||
"version": "1.7.0"
|
||||
},
|
||||
{
|
||||
"hash": "QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w",
|
||||
"hash": "QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV",
|
||||
"name": "go-datastore",
|
||||
"version": "3.1.0"
|
||||
"version": "3.2.0"
|
||||
},
|
||||
{
|
||||
"hash": "QmQjMHF8ptRgx4E57UFMiT4YM6kqaJeYxZ1MCDX23aw4rK",
|
||||
|
@ -2,7 +2,8 @@ package pstoreds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
@ -19,6 +20,12 @@ import (
|
||||
|
||||
var (
|
||||
log = logging.Logger("peerstore/ds")
|
||||
// The maximum representable value in time.Time is time.Unix(1<<63-62135596801, 999999999).
|
||||
// But it's too brittle and implementation-dependent, so we prefer to use 1<<62, which is in the
|
||||
// year 146138514283. We're safe.
|
||||
maxTime = time.Unix(1<<62, 0)
|
||||
|
||||
ErrTTLDatastore = errors.New("datastore must provide TTL support")
|
||||
)
|
||||
|
||||
var _ pstore.AddrBook = (*dsAddrBook)(nil)
|
||||
@ -28,15 +35,48 @@ var _ pstore.AddrBook = (*dsAddrBook)(nil)
|
||||
type dsAddrBook struct {
|
||||
cache cache
|
||||
ds ds.TxnDatastore
|
||||
ttlManager *ttlManager
|
||||
subsManager *pstoremem.AddrSubManager
|
||||
writeRetries int
|
||||
}
|
||||
|
||||
type ttlWriteMode int
|
||||
|
||||
const (
|
||||
ttlOverride ttlWriteMode = iota
|
||||
ttlExtend
|
||||
)
|
||||
|
||||
type cacheEntry struct {
|
||||
expiration time.Time
|
||||
addrs []ma.Multiaddr
|
||||
}
|
||||
|
||||
type addrRecord struct {
|
||||
ttl time.Duration
|
||||
addr ma.Multiaddr
|
||||
}
|
||||
|
||||
func (ar *addrRecord) MarshalBinary() ([]byte, error) {
|
||||
ttlB := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(ttlB, uint64(ar.ttl))
|
||||
return append(ttlB, ar.addr.Bytes()...), nil
|
||||
}
|
||||
|
||||
func (ar *addrRecord) UnmarshalBinary(b []byte) error {
|
||||
ar.ttl = time.Duration(binary.LittleEndian.Uint64(b))
|
||||
// this had been serialized by us, no need to check for errors
|
||||
ar.addr, _ = ma.NewMultiaddrBytes(b[8:])
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewAddrBook initializes a new address book given a
|
||||
// Datastore instance, a context for managing the TTL manager,
|
||||
// and the interval at which the TTL manager should sweep the Datastore.
|
||||
func NewAddrBook(ctx context.Context, ds ds.TxnDatastore, opts Options) (*dsAddrBook, error) {
|
||||
func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (*dsAddrBook, error) {
|
||||
if _, ok := store.(ds.TTLDatastore); !ok {
|
||||
return nil, ErrTTLDatastore
|
||||
}
|
||||
|
||||
var (
|
||||
cache cache = &noopCache{}
|
||||
err error
|
||||
@ -50,19 +90,13 @@ func NewAddrBook(ctx context.Context, ds ds.TxnDatastore, opts Options) (*dsAddr
|
||||
|
||||
mgr := &dsAddrBook{
|
||||
cache: cache,
|
||||
ds: ds,
|
||||
ttlManager: newTTLManager(ctx, ds, &cache, opts.TTLInterval),
|
||||
ds: store,
|
||||
subsManager: pstoremem.NewAddrSubManager(),
|
||||
writeRetries: int(opts.WriteRetries),
|
||||
}
|
||||
return mgr, nil
|
||||
}
|
||||
|
||||
// Stop will signal the TTL manager to stop and block until it returns.
|
||||
func (mgr *dsAddrBook) Stop() {
|
||||
mgr.ttlManager.cancel()
|
||||
}
|
||||
|
||||
func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) {
|
||||
var (
|
||||
keys = make([]ds.Key, len(addrs))
|
||||
@ -98,7 +132,7 @@ func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati
|
||||
if ttl <= 0 {
|
||||
return
|
||||
}
|
||||
mgr.setAddrs(p, addrs, ttl, false)
|
||||
mgr.setAddrs(p, addrs, ttl, ttlExtend)
|
||||
}
|
||||
|
||||
// SetAddr will add or update the TTL of an address in the AddrBook.
|
||||
@ -113,7 +147,7 @@ func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati
|
||||
mgr.deleteAddrs(p, addrs)
|
||||
return
|
||||
}
|
||||
mgr.setAddrs(p, addrs, ttl, true)
|
||||
mgr.setAddrs(p, addrs, ttl, ttlOverride)
|
||||
}
|
||||
|
||||
func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error {
|
||||
@ -123,7 +157,7 @@ func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error {
|
||||
return err
|
||||
}
|
||||
|
||||
mgr.cache.Remove(p.Pretty())
|
||||
mgr.cache.Remove(p)
|
||||
// Attempt transactional KV deletion.
|
||||
for i := 0; i < mgr.writeRetries; i++ {
|
||||
if err = mgr.dbDelete(keys); err == nil {
|
||||
@ -137,22 +171,21 @@ func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error {
|
||||
return err
|
||||
}
|
||||
|
||||
mgr.ttlManager.deleteTTLs(keys)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, ttlReset bool) error {
|
||||
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) error {
|
||||
// Keys and cleaned up addresses.
|
||||
keys, addrs, err := keysAndAddrs(p, addrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mgr.cache.Remove(p.Pretty())
|
||||
mgr.cache.Remove(p)
|
||||
// Attempt transactional KV insertion.
|
||||
var existed []bool
|
||||
for i := 0; i < mgr.writeRetries; i++ {
|
||||
if existed, err = mgr.dbInsert(keys, addrs); err == nil {
|
||||
if existed, err = mgr.dbInsert(keys, addrs, ttl, mode); err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
|
||||
@ -169,42 +202,54 @@ func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati
|
||||
mgr.subsManager.BroadcastAddr(p, addrs[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Force update TTLs only if TTL reset was requested; otherwise
|
||||
// insert the appropriate TTL entries if they don't already exist.
|
||||
if ttlReset {
|
||||
mgr.ttlManager.setTTLs(keys, ttl)
|
||||
} else {
|
||||
mgr.ttlManager.insertOrExtendTTLs(keys, ttl)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// dbInsert performs a transactional insert of the provided keys and values.
|
||||
func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr) ([]bool, error) {
|
||||
func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) ([]bool, error) {
|
||||
var (
|
||||
err error
|
||||
existed = make([]bool, len(keys))
|
||||
exp = time.Now().Add(ttl)
|
||||
)
|
||||
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
ttltxn := txn.(ds.TTLDatastore)
|
||||
for i, key := range keys {
|
||||
// Check if the key existed previously.
|
||||
if existed[i], err = txn.Has(key); err != nil {
|
||||
if existed[i], err = ttltxn.Has(key); err != nil {
|
||||
log.Errorf("transaction failed and aborted while checking key existence: %s, cause: %v", key.String(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The key embeds a hash of the value, so if it existed, we can safely skip the insert.
|
||||
// The key embeds a hash of the value, so if it existed, we can safely skip the insert and
|
||||
// just update the TTL.
|
||||
if existed[i] {
|
||||
switch mode {
|
||||
case ttlOverride:
|
||||
err = ttltxn.SetTTL(key, ttl)
|
||||
case ttlExtend:
|
||||
var curr time.Time
|
||||
if curr, err = ttltxn.GetExpiration(key); err == nil && exp.After(curr) {
|
||||
err = ttltxn.SetTTL(key, ttl)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
// mode will be printed as an int
|
||||
log.Errorf("failed while updating the ttl for key: %s, mode: %v, cause: %v", key.String(), mode, err)
|
||||
return nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Attempt to add the key.
|
||||
if err = txn.Put(key, addrs[i].Bytes()); err != nil {
|
||||
r := &addrRecord{
|
||||
ttl: ttl,
|
||||
addr: addrs[i],
|
||||
}
|
||||
value, _ := r.MarshalBinary()
|
||||
if err = ttltxn.PutWithTTL(key, value, ttl); err != nil {
|
||||
log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err)
|
||||
return nil, err
|
||||
}
|
||||
@ -221,12 +266,23 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr) ([]bool, er
|
||||
// UpdateAddrs will update any addresses for a given peer and TTL combination to
|
||||
// have a new TTL.
|
||||
func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
||||
prefix := ds.NewKey(p.Pretty())
|
||||
mgr.ttlManager.adjustTTLs(prefix, oldTTL, newTTL)
|
||||
mgr.cache.Remove(p)
|
||||
|
||||
var err error
|
||||
for i := 0; i < mgr.writeRetries; i++ {
|
||||
if err = mgr.dbUpdateTTL(p, oldTTL, newTTL); err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed to update ttlsfor peer %s: %s\n", p.Pretty(), err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("failed to avoid write conflict when updating ttls for peer %s after %d retries: %v\n",
|
||||
p.Pretty(), mgr.writeRetries, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Addrs returns all of the non-expired addresses for a given peer.
|
||||
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
||||
func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error {
|
||||
var (
|
||||
prefix = ds.NewKey(p.Pretty())
|
||||
q = query.Query{Prefix: prefix.String(), KeysOnly: false}
|
||||
@ -234,12 +290,56 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
||||
err error
|
||||
)
|
||||
|
||||
// Check the cache.
|
||||
if entry, ok := mgr.cache.Get(p.Pretty()); ok {
|
||||
e := entry.([]ma.Multiaddr)
|
||||
addrs := make([]ma.Multiaddr, len(e))
|
||||
copy(addrs, e)
|
||||
return addrs
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
if results, err = txn.Query(q); err != nil {
|
||||
return err
|
||||
}
|
||||
defer results.Close()
|
||||
|
||||
ttltxn := txn.(ds.TTLDatastore)
|
||||
r := &addrRecord{}
|
||||
for result := range results.Next() {
|
||||
r.UnmarshalBinary(result.Value)
|
||||
if r.ttl != oldTTL {
|
||||
continue
|
||||
}
|
||||
|
||||
r.ttl = newTTL
|
||||
value, _ := r.MarshalBinary()
|
||||
if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), value, newTTL); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
log.Errorf("failed to commit transaction when updating ttls, cause: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Addrs returns all of the non-expired addresses for a given peer.
|
||||
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
||||
var (
|
||||
prefix = ds.NewKey(p.Pretty())
|
||||
q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true}
|
||||
results query.Results
|
||||
err error
|
||||
)
|
||||
|
||||
// Check the cache and return the entry only if it hasn't expired; if expired, remove.
|
||||
if e, ok := mgr.cache.Get(p); ok {
|
||||
entry := e.(cacheEntry)
|
||||
if entry.expiration.After(time.Now()) {
|
||||
addrs := make([]ma.Multiaddr, len(entry.addrs))
|
||||
copy(addrs, entry.addrs)
|
||||
return addrs
|
||||
} else {
|
||||
mgr.cache.Remove(p)
|
||||
}
|
||||
}
|
||||
|
||||
txn := mgr.ds.NewTransaction(true)
|
||||
@ -252,16 +352,24 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
||||
defer results.Close()
|
||||
|
||||
var addrs []ma.Multiaddr
|
||||
var r addrRecord
|
||||
// used to set the expiration for the entire cache entry
|
||||
earliestExp := maxTime
|
||||
for result := range results.Next() {
|
||||
if addr, err := ma.NewMultiaddrBytes(result.Value); err == nil {
|
||||
addrs = append(addrs, addr)
|
||||
if err = r.UnmarshalBinary(result.Value); err == nil {
|
||||
addrs = append(addrs, r.addr)
|
||||
}
|
||||
|
||||
if exp := result.Expiration; !exp.IsZero() && exp.Before(earliestExp) {
|
||||
earliestExp = exp
|
||||
}
|
||||
}
|
||||
|
||||
// Store a copy in the cache.
|
||||
addrsCpy := make([]ma.Multiaddr, len(addrs))
|
||||
copy(addrsCpy, addrs)
|
||||
mgr.cache.Add(p.Pretty(), addrsCpy)
|
||||
entry := cacheEntry{addrs: addrsCpy, expiration: earliestExp}
|
||||
mgr.cache.Add(p, entry)
|
||||
|
||||
return addrs
|
||||
}
|
||||
@ -319,16 +427,15 @@ func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
|
||||
deleteFn func() error
|
||||
)
|
||||
|
||||
if e, ok := mgr.cache.Peek(p.Pretty()); ok {
|
||||
mgr.cache.Remove(p.Pretty())
|
||||
keys, _, _ := keysAndAddrs(p, e.([]ma.Multiaddr))
|
||||
if e, ok := mgr.cache.Peek(p); ok {
|
||||
mgr.cache.Remove(p)
|
||||
keys, _, _ := keysAndAddrs(p, e.(cacheEntry).addrs)
|
||||
deleteFn = func() error {
|
||||
return mgr.dbDelete(keys)
|
||||
}
|
||||
} else {
|
||||
deleteFn = func() error {
|
||||
_, err := mgr.dbDeleteIter(prefix)
|
||||
return err
|
||||
return mgr.dbDeleteIter(prefix)
|
||||
}
|
||||
}
|
||||
|
||||
@ -343,24 +450,23 @@ func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
|
||||
if err != nil {
|
||||
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), mgr.writeRetries)
|
||||
}
|
||||
|
||||
// Perform housekeeping.
|
||||
mgr.ttlManager.clear(prefix)
|
||||
}
|
||||
|
||||
// dbDelete transactionally deletes the provided keys.
|
||||
func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error {
|
||||
var err error
|
||||
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
for _, key := range keys {
|
||||
if err := txn.Delete(key); err != nil {
|
||||
if err = txn.Delete(key); err != nil {
|
||||
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
if err = txn.Commit(); err != nil {
|
||||
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
|
||||
return err
|
||||
}
|
||||
@ -370,7 +476,7 @@ func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error {
|
||||
|
||||
// dbDeleteIter removes all entries whose keys are prefixed with the argument.
|
||||
// it returns a slice of the removed keys in case it's needed
|
||||
func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) {
|
||||
func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) error {
|
||||
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
||||
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
@ -379,167 +485,30 @@ func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) {
|
||||
results, err := txn.Query(q)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch all keys prefixed with: %s, cause: %v", prefix.String(), err)
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
var keys []ds.Key
|
||||
var keys = make([]ds.Key, 0, 4) // cap: 4 to reduce allocs
|
||||
var key ds.Key
|
||||
for result := range results.Next() {
|
||||
key := ds.RawKey(result.Key)
|
||||
key = ds.RawKey(result.Key)
|
||||
keys = append(keys, key)
|
||||
|
||||
if err = txn.Delete(key); err != nil {
|
||||
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := results.Close(); err != nil {
|
||||
if err = results.Close(); err != nil {
|
||||
log.Errorf("failed to close cursor, cause: %v", err)
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if err = txn.Commit(); err != nil {
|
||||
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
type ttlEntry struct {
|
||||
TTL time.Duration
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
type ttlManager struct {
|
||||
sync.RWMutex
|
||||
entries map[ds.Key]*ttlEntry
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ticker *time.Ticker
|
||||
ds ds.TxnDatastore
|
||||
cache cache
|
||||
}
|
||||
|
||||
func newTTLManager(parent context.Context, d ds.Datastore, c *cache, tick time.Duration) *ttlManager {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
txnDs, ok := d.(ds.TxnDatastore)
|
||||
if !ok {
|
||||
panic("must construct ttlManager with transactional datastore")
|
||||
}
|
||||
mgr := &ttlManager{
|
||||
entries: make(map[ds.Key]*ttlEntry),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
ticker: time.NewTicker(tick),
|
||||
ds: txnDs,
|
||||
cache: *c,
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-mgr.ctx.Done():
|
||||
mgr.ticker.Stop()
|
||||
return
|
||||
case <-mgr.ticker.C:
|
||||
mgr.tick()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return mgr
|
||||
}
|
||||
|
||||
// To be called by TTL manager's coroutine only.
|
||||
func (mgr *ttlManager) tick() {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
var toDel []ds.Key
|
||||
for key, entry := range mgr.entries {
|
||||
if entry.ExpiresAt.After(now) {
|
||||
continue
|
||||
}
|
||||
toDel = append(toDel, key)
|
||||
}
|
||||
|
||||
if len(toDel) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
for _, key := range toDel {
|
||||
if err := txn.Delete(key); err != nil {
|
||||
log.Error("failed to delete TTL key: %v, cause: %v", key.String(), err)
|
||||
break
|
||||
}
|
||||
mgr.cache.Remove(key.Parent().Name())
|
||||
delete(mgr.entries, key)
|
||||
}
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
log.Error("failed to commit TTL deletion, cause: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlManager) deleteTTLs(keys []ds.Key) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
for _, key := range keys {
|
||||
delete(mgr.entries, key)
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlManager) insertOrExtendTTLs(keys []ds.Key, ttl time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
expiration := time.Now().Add(ttl)
|
||||
for _, key := range keys {
|
||||
if entry, ok := mgr.entries[key]; !ok || (ok && entry.ExpiresAt.Before(expiration)) {
|
||||
mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlManager) setTTLs(keys []ds.Key, ttl time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
expiration := time.Now().Add(ttl)
|
||||
for _, key := range keys {
|
||||
mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlManager) adjustTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
var keys []ds.Key
|
||||
for key, entry := range mgr.entries {
|
||||
if key.IsDescendantOf(prefix) && entry.TTL == oldTTL {
|
||||
keys = append(keys, key)
|
||||
entry.TTL = newTTL
|
||||
entry.ExpiresAt = now.Add(newTTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlManager) clear(prefix ds.Key) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
for key := range mgr.entries {
|
||||
if key.IsDescendantOf(prefix) {
|
||||
delete(mgr.entries, key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -172,9 +172,9 @@ func badgerStore(t testing.TB) (ds.TxnDatastore, func()) {
|
||||
|
||||
func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory {
|
||||
return func() (pstore.Peerstore, func()) {
|
||||
ds, closeFunc := badgerStore(tb)
|
||||
store, closeFunc := badgerStore(tb)
|
||||
|
||||
ps, err := NewPeerstore(context.Background(), ds, opts)
|
||||
ps, err := NewPeerstore(context.Background(), store, opts)
|
||||
if err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
@ -185,17 +185,13 @@ func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory {
|
||||
|
||||
func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory {
|
||||
return func() (pstore.AddrBook, func()) {
|
||||
ds, closeDB := badgerStore(tb)
|
||||
store, closeFunc := badgerStore(tb)
|
||||
|
||||
mgr, err := NewAddrBook(context.Background(), ds, opts)
|
||||
ab, err := NewAddrBook(context.Background(), store, opts)
|
||||
if err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
|
||||
closeFunc := func() {
|
||||
mgr.Stop()
|
||||
closeDB()
|
||||
}
|
||||
return mgr, closeFunc
|
||||
return ab, closeFunc
|
||||
}
|
||||
}
|
||||
|
@ -93,6 +93,20 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
|
||||
|
||||
testHas(t, addrs, ab.Addrs(id))
|
||||
})
|
||||
|
||||
t.Run("adding an existing address with a later expiration extends its ttl", func(t *testing.T) {
|
||||
id := generatePeerIds(1)[0]
|
||||
addrs := generateAddrs(3)
|
||||
|
||||
ab.AddAddrs(id, addrs, time.Second)
|
||||
|
||||
// same address as before but with a higher TTL
|
||||
ab.AddAddrs(id, addrs[2:], time.Hour)
|
||||
|
||||
// after the initial TTL has expired, check that only the third address is present.
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
testHas(t, addrs[2:], ab.Addrs(id))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -154,25 +168,27 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
// Will only affect addrs1[0].
|
||||
m.UpdateAddrs(ids[0], time.Hour, 100*time.Microsecond)
|
||||
// Badger does not support subsecond TTLs.
|
||||
// https://github.com/dgraph-io/badger/issues/339
|
||||
m.UpdateAddrs(ids[0], time.Hour, 1*time.Second)
|
||||
|
||||
// No immediate effect.
|
||||
testHas(t, addrs1, m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
// After a wait, addrs[0] is gone.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
// Will only affect addrs2[0].
|
||||
m.UpdateAddrs(ids[1], time.Hour, 100*time.Microsecond)
|
||||
m.UpdateAddrs(ids[1], time.Hour, 1*time.Second)
|
||||
|
||||
// No immediate effect.
|
||||
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
|
||||
// First addrs is gone in both.
|
||||
testHas(t, addrs1[1:], m.Addrs(ids[0]))
|
||||
|
Loading…
Reference in New Issue
Block a user