Merge pull request #39 from libp2p/fix/allocations

reduce allocations and garbage collect the in-memory peerstore
This commit is contained in:
Steven Allen 2018-10-03 17:43:17 -07:00 committed by GitHub
commit 2a27e91a67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -26,31 +26,47 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool {
return t.After(e.Expires) return t.After(e.Expires)
} }
type addrSlice []expiringAddr
var _ pstore.AddrBook = (*memoryAddrBook)(nil) var _ pstore.AddrBook = (*memoryAddrBook)(nil)
// memoryAddrBook manages addresses. // memoryAddrBook manages addresses.
type memoryAddrBook struct { type memoryAddrBook struct {
addrmu sync.Mutex addrmu sync.Mutex
addrs map[peer.ID]addrSlice addrs map[peer.ID]map[string]expiringAddr
nextGC time.Time
subManager *AddrSubManager subManager *AddrSubManager
} }
func NewAddrBook() pstore.AddrBook { func NewAddrBook() pstore.AddrBook {
return &memoryAddrBook{ return &memoryAddrBook{
addrs: make(map[peer.ID]addrSlice), addrs: make(map[peer.ID]map[string]expiringAddr),
subManager: NewAddrSubManager(), subManager: NewAddrSubManager(),
} }
} }
// gc garbage collects the in-memory address book. The caller *must* hold the addrmu lock.
func (mab *memoryAddrBook) gc() {
now := time.Now()
if !now.After(mab.nextGC) {
return
}
for p, amap := range mab.addrs {
for k, addr := range amap {
if addr.ExpiredBy(now) {
delete(amap, k)
}
}
if len(amap) == 0 {
delete(mab.addrs, p)
}
}
mab.nextGC = time.Now().Add(pstore.AddressTTL)
}
func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice { func (mab *memoryAddrBook) PeersWithAddrs() peer.IDSlice {
mab.addrmu.Lock() mab.addrmu.Lock()
defer mab.addrmu.Unlock() defer mab.addrmu.Unlock()
if mab.addrs == nil {
return nil
}
pids := make(peer.IDSlice, 0, len(mab.addrs)) pids := make(peer.IDSlice, 0, len(mab.addrs))
for pid := range mab.addrs { for pid := range mab.addrs {
@ -76,20 +92,17 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
return return
} }
oldAddrs := mab.addrs[p] amap := mab.addrs[p]
amap := make(map[string]expiringAddr, len(oldAddrs)) if amap == nil {
for _, ea := range oldAddrs { amap = make(map[string]expiringAddr, len(addrs))
amap[string(ea.Addr.Bytes())] = ea mab.addrs[p] = amap
} }
// only expand ttls
exp := time.Now().Add(ttl) exp := time.Now().Add(ttl)
for _, addr := range addrs { for _, addr := range addrs {
if addr == nil { if addr == nil {
log.Warningf("was passed nil multiaddr for %s", p) log.Warningf("was passed nil multiaddr for %s", p)
continue continue
} }
addrstr := string(addr.Bytes()) addrstr := string(addr.Bytes())
a, found := amap[addrstr] a, found := amap[addrstr]
if !found || exp.After(a.Expires) { if !found || exp.After(a.Expires) {
@ -98,11 +111,7 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
mab.subManager.BroadcastAddr(p, addr) mab.subManager.BroadcastAddr(p, addr)
} }
} }
newAddrs := make([]expiringAddr, 0, len(amap)) mab.gc()
for _, ea := range amap {
newAddrs = append(newAddrs, ea)
}
mab.addrs[p] = newAddrs
} }
// SetAddr calls mgr.SetAddrs(p, addr, ttl) // SetAddr calls mgr.SetAddrs(p, addr, ttl)
@ -116,10 +125,10 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
mab.addrmu.Lock() mab.addrmu.Lock()
defer mab.addrmu.Unlock() defer mab.addrmu.Unlock()
oldAddrs := mab.addrs[p] amap := mab.addrs[p]
amap := make(map[string]expiringAddr, len(oldAddrs)) if amap == nil {
for _, ea := range oldAddrs { amap = make(map[string]expiringAddr, len(addrs))
amap[string(ea.Addr.Bytes())] = ea mab.addrs[p] = amap
} }
exp := time.Now().Add(ttl) exp := time.Now().Add(ttl)
@ -129,21 +138,17 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
continue continue
} }
// re-set all of them for new ttl. // re-set all of them for new ttl.
addrs := string(addr.Bytes()) addrstr := string(addr.Bytes())
if ttl > 0 { if ttl > 0 {
amap[addrs] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl} amap[addrstr] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
mab.subManager.BroadcastAddr(p, addr) mab.subManager.BroadcastAddr(p, addr)
} else { } else {
delete(amap, addrs) delete(amap, addrstr)
} }
} }
newAddrs := make([]expiringAddr, 0, len(amap)) mab.gc()
for _, ea := range amap {
newAddrs = append(newAddrs, ea)
}
mab.addrs[p] = newAddrs
} }
// UpdateAddrs updates the addresses associated with the given peer that have // UpdateAddrs updates the addresses associated with the given peer that have
@ -152,22 +157,20 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t
mab.addrmu.Lock() mab.addrmu.Lock()
defer mab.addrmu.Unlock() defer mab.addrmu.Unlock()
if mab.addrs == nil { amap, found := mab.addrs[p]
return
}
addrs, found := mab.addrs[p]
if !found { if !found {
return return
} }
exp := time.Now().Add(newTTL) exp := time.Now().Add(newTTL)
for i := range addrs { for k, addr := range amap {
if aexp := &addrs[i]; oldTTL == aexp.TTL { if oldTTL == addr.TTL {
aexp.TTL = newTTL addr.TTL = newTTL
aexp.Expires = exp addr.Expires = exp
amap[k] = addr
} }
} }
mab.gc()
} }
// Addresses returns all known (and valid) addresses for a given // Addresses returns all known (and valid) addresses for a given
@ -175,32 +178,21 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
mab.addrmu.Lock() mab.addrmu.Lock()
defer mab.addrmu.Unlock() defer mab.addrmu.Unlock()
// not initialized? nothing to give. amap, found := mab.addrs[p]
if mab.addrs == nil {
return nil
}
maddrs, found := mab.addrs[p]
if !found { if !found {
return nil return nil
} }
now := time.Now() now := time.Now()
good := make([]ma.Multiaddr, 0, len(maddrs)) good := make([]ma.Multiaddr, 0, len(amap))
cleaned := make([]expiringAddr, 0, len(maddrs)) for k, m := range amap {
for _, m := range maddrs {
if !m.ExpiredBy(now) { if !m.ExpiredBy(now) {
cleaned = append(cleaned, m)
good = append(good, m.Addr) good = append(good, m.Addr)
} else {
delete(amap, k)
} }
} }
// clean up the expired ones.
if len(cleaned) == 0 {
delete(mab.addrs, p)
} else {
mab.addrs[p] = cleaned
}
return good return good
} }