Document and clean AddrSubManager

This commit is contained in:
Cole Brown 2018-06-15 13:54:04 -04:00
parent 4a521e2e3b
commit cfbbb58a31

View File

@ -240,7 +240,7 @@ func (mgr *AddrManager) Addrs(p peer.ID) []ma.Multiaddr {
return good return good
} }
// ClearAddresses removes all previously stored addresses // ClearAddrs removes all previously stored addresses
func (mgr *AddrManager) ClearAddrs(p peer.ID) { func (mgr *AddrManager) ClearAddrs(p peer.ID) {
mgr.addrmu.Lock() mgr.addrmu.Lock()
defer mgr.addrmu.Unlock() defer mgr.addrmu.Unlock()
@ -249,6 +249,8 @@ func (mgr *AddrManager) ClearAddrs(p peer.ID) {
delete(mgr.addrs, p) delete(mgr.addrs, p)
} }
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
mgr.addrmu.Lock() mgr.addrmu.Lock()
defer mgr.addrmu.Unlock() defer mgr.addrmu.Unlock()
@ -263,18 +265,22 @@ func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mul
return mgr.subManager.AddrStream(ctx, p, initial) return mgr.subManager.AddrStream(ctx, p, initial)
} }
// An abstracted, pub-sub manager for address streams. Extracted from
// AddrManager in order to support additional implementations.
type AddrSubManager struct { type AddrSubManager struct {
mu sync.RWMutex mu sync.RWMutex
subs map[peer.ID][]*addrSub subs map[peer.ID][]*addrSub
} }
// NewAddrSubManager initializes an AddrSubManager.
func NewAddrSubManager() *AddrSubManager { func NewAddrSubManager() *AddrSubManager {
return &AddrSubManager{ return &AddrSubManager{
mu: sync.RWMutex{},
subs: make(map[peer.ID][]*addrSub), subs: make(map[peer.ID][]*addrSub),
} }
} }
// Used internally by the address stream coroutine to remove a subscription
// from the manager.
func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) { func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
mgr.mu.Lock() mgr.mu.Lock()
defer mgr.mu.Unlock() defer mgr.mu.Unlock()
@ -296,20 +302,20 @@ func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
} }
} }
// BroadcastAddr broadcasts a new address to all subscribed streams.
func (mgr *AddrSubManager) BroadcastAddr(p peer.ID, addr ma.Multiaddr) { func (mgr *AddrSubManager) BroadcastAddr(p peer.ID, addr ma.Multiaddr) {
mgr.mu.Lock() mgr.mu.RLock()
defer mgr.mu.Unlock() defer mgr.mu.RUnlock()
subs, ok := mgr.subs[p] if subs, ok := mgr.subs[p]; ok {
if !ok { for _, sub := range subs {
return sub.pubAddr(addr)
} }
for _, sub := range subs {
sub.pubAddr(addr)
} }
} }
// AddrStream creates a new subscription for a given peer ID, pre-populating the
// channel with any addresses we might already have on file.
func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr { func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr {
mgr.mu.Lock() mgr.mu.Lock()
defer mgr.mu.Unlock() defer mgr.mu.Unlock()