From 80fc9f03cabc59176f515ce2e35df427f1c45eb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 12 Sep 2018 20:40:51 +0100 Subject: [PATCH 01/11] migrate from bespoke TTL manager to db-managed TTLs. --- pstoreds/addr_book.go | 183 ++++++++++++++++++++++++++++++---------- test/addr_book_suite.go | 4 +- 2 files changed, 142 insertions(+), 45 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 9bb5a45..7ee3de9 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -1,24 +1,33 @@ package pstoreds import ( + "bytes" "context" + "encoding/binary" + "errors" "sync" "time" - lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/golang-lru" ds "github.com/ipfs/go-datastore" - query "github.com/ipfs/go-datastore/query" + "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" mh "github.com/multiformats/go-multihash" pstore "github.com/libp2p/go-libp2p-peerstore" - pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" + "github.com/libp2p/go-libp2p-peerstore/pstoremem" ) var ( log = logging.Logger("peerstore/ds") + // The maximum representable value in time.Time is time.Unix(1<<63-62135596801, 999999999). + // But it's too brittle and implementation-dependent, so we prefer to use 1<<62, which is in the + // year 146138514283. We're safe. + maxTime = time.Unix(1<<62, 0) + + ErrTTLDatastore = errors.New("datastore must provide TTL support") ) var _ pstore.AddrBook = (*dsAddrBook)(nil) @@ -28,15 +37,30 @@ var _ pstore.AddrBook = (*dsAddrBook)(nil) type dsAddrBook struct { cache cache ds ds.TxnDatastore - ttlManager *ttlManager subsManager *pstoremem.AddrSubManager writeRetries int } +type ttlWriteMode int + +const ( + ttlOverride ttlWriteMode = iota + ttlExtend +) + +type cacheEntry struct { + expiration time.Time + addrs []ma.Multiaddr +} + // NewAddrBook initializes a new address book given a // Datastore instance, a context for managing the TTL manager, // and the interval at which the TTL manager should sweep the Datastore. -func NewAddrBook(ctx context.Context, ds ds.TxnDatastore, opts PeerstoreOpts) (*dsAddrBook, error) { +func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts PeerstoreOpts) (*dsAddrBook, error) { + if _, ok := store.(ds.TTLDatastore); !ok { + return nil, ErrTTLDatastore + } + var ( cache cache = &noopCache{} err error @@ -50,8 +74,7 @@ func NewAddrBook(ctx context.Context, ds ds.TxnDatastore, opts PeerstoreOpts) (* mgr := &dsAddrBook{ cache: cache, - ds: ds, - ttlManager: newTTLManager(ctx, ds, &cache, opts.TTLInterval), + ds: store, subsManager: pstoremem.NewAddrSubManager(), writeRetries: int(opts.WriteRetries), } @@ -60,7 +83,7 @@ func NewAddrBook(ctx context.Context, ds ds.TxnDatastore, opts PeerstoreOpts) (* // Stop will signal the TTL manager to stop and block until it returns. func (mgr *dsAddrBook) Stop() { - mgr.ttlManager.cancel() + // noop } func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) { @@ -98,7 +121,7 @@ func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati if ttl <= 0 { return } - mgr.setAddrs(p, addrs, ttl, false) + mgr.setAddrs(p, addrs, ttl, ttlExtend) } // SetAddr will add or update the TTL of an address in the AddrBook. @@ -113,7 +136,7 @@ func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati mgr.deleteAddrs(p, addrs) return } - mgr.setAddrs(p, addrs, ttl, true) + mgr.setAddrs(p, addrs, ttl, ttlOverride) } func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error { @@ -137,11 +160,10 @@ func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error { return err } - mgr.ttlManager.deleteTTLs(keys) return nil } -func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, ttlReset bool) error { +func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) error { // Keys and cleaned up addresses. keys, addrs, err := keysAndAddrs(p, addrs) if err != nil { @@ -149,10 +171,11 @@ func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati } mgr.cache.Remove(p.Pretty()) + // Attempt transactional KV insertion. var existed []bool for i := 0; i < mgr.writeRetries; i++ { - if existed, err = mgr.dbInsert(keys, addrs); err == nil { + if existed, err = mgr.dbInsert(keys, addrs, ttl, mode); err == nil { break } log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err) @@ -170,41 +193,54 @@ func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati } } - // Force update TTLs only if TTL reset was requested; otherwise - // insert the appropriate TTL entries if they don't already exist. - if ttlReset { - mgr.ttlManager.setTTLs(keys, ttl) - } else { - mgr.ttlManager.insertTTLs(keys, ttl) - } - return nil } // dbInsert performs a transactional insert of the provided keys and values. -func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr) ([]bool, error) { +func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) ([]bool, error) { var ( err error existed = make([]bool, len(keys)) + exp = time.Now().Add(ttl) + ttlB = make([]byte, 8) ) + binary.LittleEndian.PutUint64(ttlB, uint64(ttl)) + txn := mgr.ds.NewTransaction(false) defer txn.Discard() + ttltxn := txn.(ds.TTLDatastore) for i, key := range keys { // Check if the key existed previously. - if existed[i], err = txn.Has(key); err != nil { + if existed[i], err = ttltxn.Has(key); err != nil { log.Errorf("transaction failed and aborted while checking key existence: %s, cause: %v", key.String(), err) return nil, err } - // The key embeds a hash of the value, so if it existed, we can safely skip the insert. + // The key embeds a hash of the value, so if it existed, we can safely skip the insert and + // just update the TTL. if existed[i] { + switch mode { + case ttlOverride: + err = ttltxn.SetTTL(key, ttl) + case ttlExtend: + var curr time.Time + if curr, err = ttltxn.GetExpiration(key); err != nil && exp.After(curr) { + err = ttltxn.SetTTL(key, ttl) + } + } + if err != nil { + // mode will be printed as an int + log.Errorf("failed while updating the ttl for key: %s, mode: %v, cause: %v", key.String(), mode, err) + return nil, err + } continue } - // Attempt to add the key. - if err = txn.Put(key, addrs[i].Bytes()); err != nil { + // format: bytes(ttl) || bytes(multiaddr) + value := append(ttlB, addrs[i].Bytes()...) + if err = ttltxn.PutWithTTL(key, value, ttl); err != nil { log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err) return nil, err } @@ -221,25 +257,81 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr) ([]bool, er // UpdateAddrs will update any addresses for a given peer and TTL combination to // have a new TTL. func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { - prefix := ds.NewKey(p.Pretty()) - mgr.ttlManager.adjustTTLs(prefix, oldTTL, newTTL) + mgr.cache.Remove(p.Pretty()) + + var err error + for i := 0; i < mgr.writeRetries; i++ { + if err = mgr.dbUpdateTTL(p, oldTTL, newTTL); err == nil { + break + } + log.Errorf("failed to update ttlsfor peer %s: %s\n", p.Pretty(), err) + } + + if err != nil { + log.Errorf("failed to avoid write conflict when updating ttls for peer %s after %d retries: %v\n", + p.Pretty(), mgr.writeRetries, err) + } +} + +func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error { + var ( + prefix = ds.NewKey(p.Pretty()) + q = query.Query{Prefix: prefix.String(), KeysOnly: false} + oldb, newb = make([]byte, 8), make([]byte, 8) + results query.Results + err error + ) + + binary.LittleEndian.PutUint64(oldb, uint64(oldTTL)) + binary.LittleEndian.PutUint64(newb, uint64(newTTL)) + + txn := mgr.ds.NewTransaction(false) + defer txn.Discard() + + if results, err = txn.Query(q); err != nil { + return err + } + defer results.Close() + + ttltxn := txn.(ds.TTLDatastore) + for result := range results.Next() { + // format: bytes(ttl) || bytes(multiaddr) + if curr := result.Value[:8]; !bytes.Equal(curr, oldb) { + continue + } + newVal := append(newb, result.Value[8:]...) + if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), newVal, newTTL); err != nil { + return err + } + } + + if err := txn.Commit(); err != nil { + log.Errorf("failed to commit transaction when updating ttls, cause: %v", err) + return err + } + + return nil } // Addrs returns all of the non-expired addresses for a given peer. func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { var ( prefix = ds.NewKey(p.Pretty()) - q = query.Query{Prefix: prefix.String(), KeysOnly: false} + q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true} results query.Results err error ) - // Check the cache. - if entry, ok := mgr.cache.Get(p.Pretty()); ok { - e := entry.([]ma.Multiaddr) - addrs := make([]ma.Multiaddr, len(e)) - copy(addrs, e) - return addrs + // Check the cache and return the entry only if it hasn't expired; if expired, remove. + if e, ok := mgr.cache.Get(p.Pretty()); ok { + entry := e.(cacheEntry) + if entry.expiration.After(time.Now()) { + addrs := make([]ma.Multiaddr, len(entry.addrs)) + copy(addrs, entry.addrs) + return addrs + } else { + mgr.cache.Remove(p.Pretty()) + } } txn := mgr.ds.NewTransaction(true) @@ -252,16 +344,24 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { defer results.Close() var addrs []ma.Multiaddr + // used to set the expiration for the entire cache entry + earliestExp := maxTime for result := range results.Next() { - if addr, err := ma.NewMultiaddrBytes(result.Value); err == nil { + // extract multiaddr from value: bytes(ttl) || bytes(multiaddr) + if addr, err := ma.NewMultiaddrBytes(result.Value[8:]); err == nil { addrs = append(addrs, addr) } + + if exp := result.Expiration; !exp.IsZero() && exp.Before(earliestExp) { + earliestExp = exp + } } // Store a copy in the cache. addrsCpy := make([]ma.Multiaddr, len(addrs)) copy(addrsCpy, addrs) - mgr.cache.Add(p.Pretty(), addrsCpy) + entry := cacheEntry{addrs: addrsCpy, expiration: earliestExp} + mgr.cache.Add(p.Pretty(), entry) return addrs } @@ -320,7 +420,7 @@ func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { if e, ok := mgr.cache.Peek(p.Pretty()); ok { mgr.cache.Remove(p.Pretty()) - keys, _, _ := keysAndAddrs(p, e.([]ma.Multiaddr)) + keys, _, _ := keysAndAddrs(p, e.(cacheEntry).addrs) deleteFn = func() error { return mgr.dbDelete(keys) } @@ -342,9 +442,6 @@ func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { if err != nil { log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), mgr.writeRetries) } - - // Perform housekeeping. - mgr.ttlManager.clear(prefix) } // dbDelete transactionally deletes the provided keys. @@ -495,7 +592,7 @@ func (mgr *ttlManager) deleteTTLs(keys []ds.Key) { } } -func (mgr *ttlManager) insertTTLs(keys []ds.Key, ttl time.Duration) { +func (mgr *ttlManager) bumpTTLs(keys []ds.Key, ttl time.Duration) { mgr.Lock() defer mgr.Unlock() diff --git a/test/addr_book_suite.go b/test/addr_book_suite.go index dbff04a..c80f3f0 100644 --- a/test/addr_book_suite.go +++ b/test/addr_book_suite.go @@ -161,7 +161,7 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { testHas(t, addrs2, m.Addrs(ids[1])) // After a wait, addrs[0] is gone. - time.Sleep(1200 * time.Millisecond) + time.Sleep(3000 * time.Millisecond) testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) @@ -172,7 +172,7 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) - time.Sleep(1200 * time.Millisecond) + time.Sleep(3000 * time.Millisecond) // First addrs is gone in both. testHas(t, addrs1[1:], m.Addrs(ids[0])) From 9277207b35b9614ebaea179ebbf257c037321c28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 Sep 2018 12:48:34 +0100 Subject: [PATCH 02/11] remove deprecated ttlManager. --- pstoreds/addr_book.go | 139 ------------------------------------------ 1 file changed, 139 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 7ee3de9..ef3309d 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -5,7 +5,6 @@ import ( "context" "encoding/binary" "errors" - "sync" "time" "github.com/hashicorp/golang-lru" @@ -501,141 +500,3 @@ func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) { return keys, nil } - -type ttlEntry struct { - TTL time.Duration - ExpiresAt time.Time -} - -type ttlManager struct { - sync.RWMutex - entries map[ds.Key]*ttlEntry - - ctx context.Context - cancel context.CancelFunc - ticker *time.Ticker - ds ds.TxnDatastore - cache cache -} - -func newTTLManager(parent context.Context, d ds.Datastore, c *cache, tick time.Duration) *ttlManager { - ctx, cancel := context.WithCancel(parent) - txnDs, ok := d.(ds.TxnDatastore) - if !ok { - panic("must construct ttlManager with transactional datastore") - } - mgr := &ttlManager{ - entries: make(map[ds.Key]*ttlEntry), - ctx: ctx, - cancel: cancel, - ticker: time.NewTicker(tick), - ds: txnDs, - cache: *c, - } - - go func() { - for { - select { - case <-mgr.ctx.Done(): - mgr.ticker.Stop() - return - case <-mgr.ticker.C: - mgr.tick() - } - } - }() - - return mgr -} - -// To be called by TTL manager's coroutine only. -func (mgr *ttlManager) tick() { - mgr.Lock() - defer mgr.Unlock() - - now := time.Now() - var toDel []ds.Key - for key, entry := range mgr.entries { - if entry.ExpiresAt.After(now) { - continue - } - toDel = append(toDel, key) - } - - if len(toDel) == 0 { - return - } - - txn := mgr.ds.NewTransaction(false) - defer txn.Discard() - - for _, key := range toDel { - if err := txn.Delete(key); err != nil { - log.Error("failed to delete TTL key: %v, cause: %v", key.String(), err) - break - } - mgr.cache.Remove(key.Parent().Name()) - delete(mgr.entries, key) - } - - if err := txn.Commit(); err != nil { - log.Error("failed to commit TTL deletion, cause: %v", err) - } -} - -func (mgr *ttlManager) deleteTTLs(keys []ds.Key) { - mgr.Lock() - defer mgr.Unlock() - - for _, key := range keys { - delete(mgr.entries, key) - } -} - -func (mgr *ttlManager) bumpTTLs(keys []ds.Key, ttl time.Duration) { - mgr.Lock() - defer mgr.Unlock() - - expiration := time.Now().Add(ttl) - for _, key := range keys { - if entry, ok := mgr.entries[key]; !ok || (ok && entry.ExpiresAt.Before(expiration)) { - mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration} - } - } -} - -func (mgr *ttlManager) setTTLs(keys []ds.Key, ttl time.Duration) { - mgr.Lock() - defer mgr.Unlock() - - expiration := time.Now().Add(ttl) - for _, key := range keys { - mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration} - } -} - -func (mgr *ttlManager) adjustTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) { - mgr.Lock() - defer mgr.Unlock() - - now := time.Now() - var keys []ds.Key - for key, entry := range mgr.entries { - if key.IsDescendantOf(prefix) && entry.TTL == oldTTL { - keys = append(keys, key) - entry.TTL = newTTL - entry.ExpiresAt = now.Add(newTTL) - } - } -} - -func (mgr *ttlManager) clear(prefix ds.Key) { - mgr.Lock() - defer mgr.Unlock() - - for key := range mgr.entries { - if key.IsDescendantOf(prefix) { - delete(mgr.entries, key) - } - } -} From afc9ee92b0f50c5fcb83a1e7aae29b1aee9d4fe5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 Sep 2018 12:49:29 +0100 Subject: [PATCH 03/11] performance optimisation round. --- pstoreds/addr_book.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index ef3309d..5da4d4d 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -445,17 +445,19 @@ func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { // dbDelete transactionally deletes the provided keys. func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error { + var err error + txn := mgr.ds.NewTransaction(false) defer txn.Discard() for _, key := range keys { - if err := txn.Delete(key); err != nil { + if err = txn.Delete(key); err != nil { log.Errorf("failed to delete key: %s, cause: %v", key.String(), err) return err } } - if err := txn.Commit(); err != nil { + if err = txn.Commit(); err != nil { log.Errorf("failed to commit transaction when deleting keys, cause: %v", err) return err } @@ -477,9 +479,10 @@ func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) { return nil, err } - var keys []ds.Key + var keys = make([]ds.Key, 0, 4) // cap: 4 to reduce allocs + var key ds.Key for result := range results.Next() { - key := ds.RawKey(result.Key) + key = ds.RawKey(result.Key) keys = append(keys, key) if err = txn.Delete(key); err != nil { @@ -488,7 +491,7 @@ func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) { } } - if err := results.Close(); err != nil { + if err = results.Close(); err != nil { log.Errorf("failed to close cursor, cause: %v", err) return nil, err } From 7e645fa115cefb310d3a97141ef03aa7f32407ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 Sep 2018 13:28:27 +0100 Subject: [PATCH 04/11] do not prettify cache keys (peer ids). --- pstoreds/addr_book.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 5da4d4d..c9a0cd7 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -145,7 +145,7 @@ func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error { return err } - mgr.cache.Remove(p.Pretty()) + mgr.cache.Remove(p) // Attempt transactional KV deletion. for i := 0; i < mgr.writeRetries; i++ { if err = mgr.dbDelete(keys); err == nil { @@ -169,8 +169,7 @@ func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati return err } - mgr.cache.Remove(p.Pretty()) - + mgr.cache.Remove(p) // Attempt transactional KV insertion. var existed []bool for i := 0; i < mgr.writeRetries; i++ { @@ -256,7 +255,7 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Du // UpdateAddrs will update any addresses for a given peer and TTL combination to // have a new TTL. func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { - mgr.cache.Remove(p.Pretty()) + mgr.cache.Remove(p) var err error for i := 0; i < mgr.writeRetries; i++ { @@ -322,14 +321,14 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { ) // Check the cache and return the entry only if it hasn't expired; if expired, remove. - if e, ok := mgr.cache.Get(p.Pretty()); ok { + if e, ok := mgr.cache.Get(p); ok { entry := e.(cacheEntry) if entry.expiration.After(time.Now()) { addrs := make([]ma.Multiaddr, len(entry.addrs)) copy(addrs, entry.addrs) return addrs } else { - mgr.cache.Remove(p.Pretty()) + mgr.cache.Remove(p) } } @@ -360,7 +359,7 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { addrsCpy := make([]ma.Multiaddr, len(addrs)) copy(addrsCpy, addrs) entry := cacheEntry{addrs: addrsCpy, expiration: earliestExp} - mgr.cache.Add(p.Pretty(), entry) + mgr.cache.Add(p, entry) return addrs } @@ -417,8 +416,8 @@ func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { deleteFn func() error ) - if e, ok := mgr.cache.Peek(p.Pretty()); ok { - mgr.cache.Remove(p.Pretty()) + if e, ok := mgr.cache.Peek(p); ok { + mgr.cache.Remove(p) keys, _, _ := keysAndAddrs(p, e.(cacheEntry).addrs) deleteFn = func() error { return mgr.dbDelete(keys) From 7c5cf50d9bfbb5132ff2ee008ce5cf2d6bb84c46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 Sep 2018 16:09:32 +0100 Subject: [PATCH 05/11] adjust ttl tests as badger doesn't support sub-second ttl. --- test/addr_book_suite.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/addr_book_suite.go b/test/addr_book_suite.go index d78e7cc..92bc1fa 100644 --- a/test/addr_book_suite.go +++ b/test/addr_book_suite.go @@ -154,25 +154,27 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { testHas(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs1[0]. - m.UpdateAddrs(ids[0], time.Hour, 100*time.Microsecond) + // Badger does not support subsecond TTLs. + // https://github.com/dgraph-io/badger/issues/339 + m.UpdateAddrs(ids[0], time.Hour, 1*time.Second) // No immediate effect. testHas(t, addrs1, m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) // After a wait, addrs[0] is gone. - time.Sleep(100 * time.Millisecond) + time.Sleep(1500 * time.Millisecond) testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs2[0]. - m.UpdateAddrs(ids[1], time.Hour, 100*time.Microsecond) + m.UpdateAddrs(ids[1], time.Hour, 1*time.Second) // No immediate effect. testHas(t, addrs1[1:2], m.Addrs(ids[0])) testHas(t, addrs2, m.Addrs(ids[1])) - time.Sleep(100 * time.Millisecond) + time.Sleep(1500 * time.Millisecond) // First addrs is gone in both. testHas(t, addrs1[1:], m.Addrs(ids[0])) From b75ed37f2b1921a5874c283ea584d702c9a734dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 13 Sep 2018 16:09:47 +0100 Subject: [PATCH 06/11] reintroduce import prefixes. --- pstoreds/addr_book.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index f8fafa8..9946243 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -7,16 +7,16 @@ import ( "errors" "time" - "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru" ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" + query "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" mh "github.com/multiformats/go-multihash" pstore "github.com/libp2p/go-libp2p-peerstore" - "github.com/libp2p/go-libp2p-peerstore/pstoremem" + pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" ) var ( From 7976022e14d9797e70ea2098c4bdb25a72a8eea2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 Sep 2018 12:35:17 +0100 Subject: [PATCH 07/11] gx bubble up go-datastore and go-ds-badger. --- package.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index fa7404c..37053f3 100644 --- a/package.json +++ b/package.json @@ -57,14 +57,14 @@ }, { "author": "magik6k", - "hash": "QmUCfrikzKVGAfpE31RPwPd32fu1DYxSG7HTGCadba5Wza", + "hash": "QmaiEBFgkgB1wjrPRxru5PyXPEkx58WuMjXNaR1Q9QNRjn", "name": "go-ds-badger", - "version": "1.6.1" + "version": "1.7.0" }, { - "hash": "QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w", + "hash": "QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV", "name": "go-datastore", - "version": "3.1.0" + "version": "3.2.0" }, { "hash": "QmQjMHF8ptRgx4E57UFMiT4YM6kqaJeYxZ1MCDX23aw4rK", From 3a4d8096cfec583162e6d374d08dc8b7d517045d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 Sep 2018 15:21:53 +0100 Subject: [PATCH 08/11] remove unnecessary return values. --- pstoreds/addr_book.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 9946243..684007d 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -424,8 +424,7 @@ func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { } } else { deleteFn = func() error { - _, err := mgr.dbDeleteIter(prefix) - return err + return mgr.dbDeleteIter(prefix) } } @@ -466,7 +465,7 @@ func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error { // dbDeleteIter removes all entries whose keys are prefixed with the argument. // it returns a slice of the removed keys in case it's needed -func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) { +func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) error { q := query.Query{Prefix: prefix.String(), KeysOnly: true} txn := mgr.ds.NewTransaction(false) @@ -475,7 +474,7 @@ func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) { results, err := txn.Query(q) if err != nil { log.Errorf("failed to fetch all keys prefixed with: %s, cause: %v", prefix.String(), err) - return nil, err + return err } var keys = make([]ds.Key, 0, 4) // cap: 4 to reduce allocs @@ -486,19 +485,19 @@ func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) { if err = txn.Delete(key); err != nil { log.Errorf("failed to delete key: %s, cause: %v", key.String(), err) - return nil, err + return err } } if err = results.Close(); err != nil { log.Errorf("failed to close cursor, cause: %v", err) - return nil, err + return err } if err = txn.Commit(); err != nil { log.Errorf("failed to commit transaction when deleting keys, cause: %v", err) - return nil, err + return err } - return keys, nil + return nil } From 4b2c1212e7a99d06a2bfefeb62e9e769a7ddf165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 14 Sep 2018 18:27:23 +0100 Subject: [PATCH 09/11] add a test and fix broken logic. --- pstoreds/addr_book.go | 2 +- test/addr_book_suite.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 684007d..d9d762c 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -223,7 +223,7 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Du err = ttltxn.SetTTL(key, ttl) case ttlExtend: var curr time.Time - if curr, err = ttltxn.GetExpiration(key); err != nil && exp.After(curr) { + if curr, err = ttltxn.GetExpiration(key); err == nil && exp.After(curr) { err = ttltxn.SetTTL(key, ttl) } } diff --git a/test/addr_book_suite.go b/test/addr_book_suite.go index 92bc1fa..7cb4cf8 100644 --- a/test/addr_book_suite.go +++ b/test/addr_book_suite.go @@ -93,6 +93,20 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { testHas(t, addrs, ab.Addrs(id)) }) + + t.Run("adding an existing address with a later expiration extends its ttl", func(t *testing.T) { + id := generatePeerIds(1)[0] + addrs := generateAddrs(3) + + ab.AddAddrs(id, addrs, time.Second) + + // same address as before but with a higher TTL + ab.AddAddrs(id, addrs[2:], time.Hour) + + // after the initial TTL has expired, check that only the third address is present. + time.Sleep(1200 * time.Millisecond) + testHas(t, addrs[2:], ab.Addrs(id)) + }) } } From 5b9ad98cc230b09461eb7ca13c8ffaece17fb97b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 18 Sep 2018 18:01:10 +0100 Subject: [PATCH 10/11] remove noop Stop() in address book. --- pstoreds/addr_book.go | 5 ----- pstoreds/ds_test.go | 14 +++++--------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index d9d762c..493ba8a 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -80,11 +80,6 @@ func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (*dsA return mgr, nil } -// Stop will signal the TTL manager to stop and block until it returns. -func (mgr *dsAddrBook) Stop() { - // noop -} - func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) { var ( keys = make([]ds.Key, len(addrs)) diff --git a/pstoreds/ds_test.go b/pstoreds/ds_test.go index 20bef6a..fbfdf8a 100644 --- a/pstoreds/ds_test.go +++ b/pstoreds/ds_test.go @@ -172,9 +172,9 @@ func badgerStore(t testing.TB) (ds.TxnDatastore, func()) { func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory { return func() (pstore.Peerstore, func()) { - ds, closeFunc := badgerStore(tb) + store, closeFunc := badgerStore(tb) - ps, err := NewPeerstore(context.Background(), ds, opts) + ps, err := NewPeerstore(context.Background(), store, opts) if err != nil { tb.Fatal(err) } @@ -185,17 +185,13 @@ func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory { func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory { return func() (pstore.AddrBook, func()) { - ds, closeDB := badgerStore(tb) + store, closeFunc := badgerStore(tb) - mgr, err := NewAddrBook(context.Background(), ds, opts) + ab, err := NewAddrBook(context.Background(), store, opts) if err != nil { tb.Fatal(err) } - closeFunc := func() { - mgr.Stop() - closeDB() - } - return mgr, closeFunc + return ab, closeFunc } } From 0f81bdf419af041b5c876aa89b6dac60b54c82de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 18 Sep 2018 19:01:24 +0100 Subject: [PATCH 11/11] introduce struct for persisted value. --- pstoreds/addr_book.go | 58 +++++++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 493ba8a..35fb314 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -1,7 +1,6 @@ package pstoreds import ( - "bytes" "context" "encoding/binary" "errors" @@ -52,6 +51,24 @@ type cacheEntry struct { addrs []ma.Multiaddr } +type addrRecord struct { + ttl time.Duration + addr ma.Multiaddr +} + +func (ar *addrRecord) MarshalBinary() ([]byte, error) { + ttlB := make([]byte, 8) + binary.LittleEndian.PutUint64(ttlB, uint64(ar.ttl)) + return append(ttlB, ar.addr.Bytes()...), nil +} + +func (ar *addrRecord) UnmarshalBinary(b []byte) error { + ar.ttl = time.Duration(binary.LittleEndian.Uint64(b)) + // this had been serialized by us, no need to check for errors + ar.addr, _ = ma.NewMultiaddrBytes(b[8:]) + return nil +} + // NewAddrBook initializes a new address book given a // Datastore instance, a context for managing the TTL manager, // and the interval at which the TTL manager should sweep the Datastore. @@ -194,11 +211,8 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Du err error existed = make([]bool, len(keys)) exp = time.Now().Add(ttl) - ttlB = make([]byte, 8) ) - binary.LittleEndian.PutUint64(ttlB, uint64(ttl)) - txn := mgr.ds.NewTransaction(false) defer txn.Discard() @@ -230,8 +244,11 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Du continue } - // format: bytes(ttl) || bytes(multiaddr) - value := append(ttlB, addrs[i].Bytes()...) + r := &addrRecord{ + ttl: ttl, + addr: addrs[i], + } + value, _ := r.MarshalBinary() if err = ttltxn.PutWithTTL(key, value, ttl); err != nil { log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err) return nil, err @@ -267,16 +284,12 @@ func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time. func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error { var ( - prefix = ds.NewKey(p.Pretty()) - q = query.Query{Prefix: prefix.String(), KeysOnly: false} - oldb, newb = make([]byte, 8), make([]byte, 8) - results query.Results - err error + prefix = ds.NewKey(p.Pretty()) + q = query.Query{Prefix: prefix.String(), KeysOnly: false} + results query.Results + err error ) - binary.LittleEndian.PutUint64(oldb, uint64(oldTTL)) - binary.LittleEndian.PutUint64(newb, uint64(newTTL)) - txn := mgr.ds.NewTransaction(false) defer txn.Discard() @@ -286,13 +299,16 @@ func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time. defer results.Close() ttltxn := txn.(ds.TTLDatastore) + r := &addrRecord{} for result := range results.Next() { - // format: bytes(ttl) || bytes(multiaddr) - if curr := result.Value[:8]; !bytes.Equal(curr, oldb) { + r.UnmarshalBinary(result.Value) + if r.ttl != oldTTL { continue } - newVal := append(newb, result.Value[8:]...) - if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), newVal, newTTL); err != nil { + + r.ttl = newTTL + value, _ := r.MarshalBinary() + if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), value, newTTL); err != nil { return err } } @@ -336,12 +352,12 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { defer results.Close() var addrs []ma.Multiaddr + var r addrRecord // used to set the expiration for the entire cache entry earliestExp := maxTime for result := range results.Next() { - // extract multiaddr from value: bytes(ttl) || bytes(multiaddr) - if addr, err := ma.NewMultiaddrBytes(result.Value[8:]); err == nil { - addrs = append(addrs, addr) + if err = r.UnmarshalBinary(result.Value); err == nil { + addrs = append(addrs, r.addr) } if exp := result.Expiration; !exp.IsZero() && exp.Before(earliestExp) {