diff --git a/addr_manager.go b/addr_manager.go index 09319af..d03ad77 100644 --- a/addr_manager.go +++ b/addr_manager.go @@ -240,7 +240,7 @@ func (mgr *AddrManager) Addrs(p peer.ID) []ma.Multiaddr { return good } -// ClearAddresses removes all previously stored addresses +// ClearAddrs removes all previously stored addresses func (mgr *AddrManager) ClearAddrs(p peer.ID) { mgr.addrmu.Lock() defer mgr.addrmu.Unlock() @@ -249,6 +249,8 @@ func (mgr *AddrManager) ClearAddrs(p peer.ID) { delete(mgr.addrs, p) } +// AddrStream returns a channel on which all new addresses discovered for a +// given peer ID will be published. func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { mgr.addrmu.Lock() defer mgr.addrmu.Unlock() @@ -263,18 +265,22 @@ func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mul return mgr.subManager.AddrStream(ctx, p, initial) } +// An abstracted, pub-sub manager for address streams. Extracted from +// AddrManager in order to support additional implementations. type AddrSubManager struct { mu sync.RWMutex subs map[peer.ID][]*addrSub } +// NewAddrSubManager initializes an AddrSubManager. func NewAddrSubManager() *AddrSubManager { return &AddrSubManager{ - mu: sync.RWMutex{}, subs: make(map[peer.ID][]*addrSub), } } +// Used internally by the address stream coroutine to remove a subscription +// from the manager. func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) { mgr.mu.Lock() defer mgr.mu.Unlock() @@ -296,20 +302,20 @@ func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) { } } +// BroadcastAddr broadcasts a new address to all subscribed streams. func (mgr *AddrSubManager) BroadcastAddr(p peer.ID, addr ma.Multiaddr) { - mgr.mu.Lock() - defer mgr.mu.Unlock() + mgr.mu.RLock() + defer mgr.mu.RUnlock() - subs, ok := mgr.subs[p] - if !ok { - return - } - - for _, sub := range subs { - sub.pubAddr(addr) + if subs, ok := mgr.subs[p]; ok { + for _, sub := range subs { + sub.pubAddr(addr) + } } } +// AddrStream creates a new subscription for a given peer ID, pre-populating the +// channel with any addresses we might already have on file. func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr { mgr.mu.Lock() defer mgr.mu.Unlock()