diff --git a/addr_manager_ds.go b/addr_manager_ds.go index 125d6f0..22ac30d 100644 --- a/addr_manager_ds.go +++ b/addr_manager_ds.go @@ -12,13 +12,18 @@ import ( mh "github.com/multiformats/go-multihash" ) +// DatastoreAddrManager is an address manager backed by a Datastore with both an +// in-memory TTL manager and an in-memory address stream manager. type DatastoreAddrManager struct { - ds ds.Datastore + ds ds.Batching ttlManager *ttlmanager subsManager *AddrSubManager } -func NewDatastoreAddrManager(ctx context.Context, ds ds.Datastore, ttlInterval time.Duration) *DatastoreAddrManager { +// NewDatastoreAddrManager initializes a new DatastoreAddrManager given a +// Datastore instance, a context for managing the TTL manager, and the interval +// at which the TTL manager should sweep the Datastore. +func NewDatastoreAddrManager(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) *DatastoreAddrManager { mgr := &DatastoreAddrManager{ ds: ds, ttlManager: newTTLManager(ctx, ds, ttlInterval), @@ -27,6 +32,7 @@ func NewDatastoreAddrManager(ctx context.Context, ds ds.Datastore, ttlInterval t return mgr } +// Stop will signal the TTL manager to stop and block until it returns. func (mgr *DatastoreAddrManager) Stop() { mgr.ttlManager.stop() } @@ -44,24 +50,38 @@ func peerIDFromKey(key ds.Key) (peer.ID, error) { return peer.IDB58Decode(idstring) } +// AddAddr will add a new address if it's not already in the AddrBook. func (mgr *DatastoreAddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) } +// AddAddrs will add many new addresses if they're not already in the AddrBook. func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { if ttl <= 0 { return } - mgr.SetAddrs(p, addrs, ttl) + mgr.setAddrs(p, addrs, ttl, true) } +// SetAddr will add or update the TTL of an address in the AddrBook. func (mgr *DatastoreAddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl) } +// SetAddrs will add or update the TTLs of addresses in the AddrBook. func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + mgr.setAddrs(p, addrs, ttl, false) +} + +func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) { var keys []ds.Key + batch, err := mgr.ds.Batch() + if err != nil { + log.Error(err) + return + } + defer batch.Commit() for _, addr := range addrs { if addr == nil { continue @@ -75,24 +95,31 @@ func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl t keys = append(keys, key) if ttl <= 0 { - mgr.ds.Delete(key) + batch.Delete(key) continue } - if has, err := mgr.ds.Has(key); err != nil || !has { + has, err := mgr.ds.Has(key) + if err != nil || !has { mgr.subsManager.BroadcastAddr(p, addr) } - if err := mgr.ds.Put(key, addr.Bytes()); err != nil { - log.Error(err) + + if !has || !add { + if err := batch.Put(key, addr.Bytes()); err != nil { + log.Error(err) + } } } mgr.ttlManager.setTTLs(keys, ttl) } +// UpdateAddrs will update any addresses for a given peer and TTL combination to +// have a new TTL. func (mgr *DatastoreAddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { prefix := ds.NewKey(p.Pretty()) mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL) } +// Addrs Returns all of the non-expired addresses for a given peer. func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr { prefix := ds.NewKey(p.Pretty()) q := query.Query{Prefix: prefix.String()} @@ -115,6 +142,7 @@ func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr { return addrs } +// Peers returns all of the peer IDs for which the AddrBook has addresses. func (mgr *DatastoreAddrManager) Peers() []peer.ID { q := query.Query{} results, err := mgr.ds.Query(q) @@ -140,11 +168,14 @@ func (mgr *DatastoreAddrManager) Peers() []peer.ID { return ids } +// AddrStream returns a channel on which all new addresses discovered for a +// given peer ID will be published. func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { initial := mgr.Addrs(p) return mgr.subsManager.AddrStream(ctx, p, initial) } +// ClearAddrs will delete all known addresses for a peer ID. func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) { prefix := ds.NewKey(p.Pretty()) q := query.Query{Prefix: prefix.String()} @@ -153,9 +184,15 @@ func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) { log.Error(err) return } + batch, err := mgr.ds.Batch() + if err != nil { + log.Error(err) + return + } + defer batch.Commit() for result := range results.Next() { - mgr.ds.Delete(ds.NewKey(result.Key)) + batch.Delete(ds.NewKey(result.Key)) } mgr.ttlManager.clear(ds.NewKey(p.Pretty())) } @@ -168,7 +205,7 @@ type ttlentry struct { } type ttlmanager struct { - sync.Mutex + sync.RWMutex entries map[ds.Key]*ttlentry ctx context.Context cancel context.CancelFunc @@ -180,7 +217,6 @@ type ttlmanager struct { func newTTLManager(parent context.Context, d ds.Datastore, tick time.Duration) *ttlmanager { ctx, cancel := context.WithCancel(parent) mgr := &ttlmanager{ - Mutex: sync.Mutex{}, entries: make(map[ds.Key]*ttlentry), ctx: ctx, cancel: cancel, @@ -212,8 +248,8 @@ func (mgr *ttlmanager) stop() { // For internal use only func (mgr *ttlmanager) tick() { - mgr.Lock() - defer mgr.Unlock() + mgr.RLock() + defer mgr.RUnlock() now := time.Now() for key, entry := range mgr.entries { diff --git a/addr_manager_test.go b/addr_manager_test.go index 1f49188..f3c2254 100644 --- a/addr_manager_test.go +++ b/addr_manager_test.go @@ -1,13 +1,11 @@ package peerstore import ( - "testing" - "time" - + "context" "io/ioutil" "os" - - "context" + "testing" + "time" "github.com/ipfs/go-datastore" "github.com/ipfs/go-ds-badger" @@ -55,7 +53,7 @@ func testHas(t *testing.T, exp, act []ma.Multiaddr) { } } -func setupBadgerDatastore(t *testing.T) (datastore.Datastore, func()) { +func setupBadgerDatastore(t *testing.T) (datastore.Batching, func()) { dataPath, err := ioutil.TempDir(os.TempDir(), "badger") if err != nil { t.Fatal(err) diff --git a/peerstore.go b/peerstore.go index f964322..38296a3 100644 --- a/peerstore.go +++ b/peerstore.go @@ -204,7 +204,7 @@ func NewPeerstore() Peerstore { // NewPeerstoreDatastore creates a threadsafe collection of peers backed by a // Datastore to prevent excess memory pressure. -func NewPeerstoreDatastore(ctx context.Context, ds datastore.Datastore) Peerstore { +func NewPeerstoreDatastore(ctx context.Context, ds datastore.Batching) Peerstore { return &peerstore{ keybook: newKeybook(), metrics: NewMetrics(),