Merge pull request #199 from libp2p/marco/with-clock

feat: use a clock interface to better support testing for pstoremem
This commit is contained in:
Marco Munizaga 2022-05-26 14:43:13 -07:00 committed by GitHub
commit 3bf1f8cee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 10 deletions

View File

@ -49,6 +49,16 @@ func (segments *addrSegments) get(p peer.ID) *addrSegment {
return segments[byte(p[len(p)-1])] return segments[byte(p[len(p)-1])]
} }
type clock interface {
Now() time.Time
}
type realclock struct{}
func (rc realclock) Now() time.Time {
return time.Now()
}
// memoryAddrBook manages addresses. // memoryAddrBook manages addresses.
type memoryAddrBook struct { type memoryAddrBook struct {
segments addrSegments segments addrSegments
@ -57,6 +67,7 @@ type memoryAddrBook struct {
cancel func() cancel func()
subManager *AddrSubManager subManager *AddrSubManager
clock clock
} }
var _ pstore.AddrBook = (*memoryAddrBook)(nil) var _ pstore.AddrBook = (*memoryAddrBook)(nil)
@ -76,12 +87,22 @@ func NewAddrBook() *memoryAddrBook {
}(), }(),
subManager: NewAddrSubManager(), subManager: NewAddrSubManager(),
cancel: cancel, cancel: cancel,
clock: realclock{},
} }
ab.refCount.Add(1) ab.refCount.Add(1)
go ab.background(ctx) go ab.background(ctx)
return ab return ab
} }
type AddrBookOption func(book *memoryAddrBook) error
func WithClock(clock clock) AddrBookOption {
return func(book *memoryAddrBook) error {
book.clock = clock
return nil
}
}
// background periodically schedules a gc // background periodically schedules a gc
func (mab *memoryAddrBook) background(ctx context.Context) { func (mab *memoryAddrBook) background(ctx context.Context) {
defer mab.refCount.Done() defer mab.refCount.Done()
@ -106,7 +127,7 @@ func (mab *memoryAddrBook) Close() error {
// gc garbage collects the in-memory address book. // gc garbage collects the in-memory address book.
func (mab *memoryAddrBook) gc() { func (mab *memoryAddrBook) gc() {
now := time.Now() now := mab.clock.Now()
for _, s := range mab.segments { for _, s := range mab.segments {
s.Lock() s.Lock()
for p, amap := range s.addrs { for p, amap := range s.addrs {
@ -208,7 +229,7 @@ func (mab *memoryAddrBook) addAddrsUnlocked(s *addrSegment, p peer.ID, addrs []m
s.addrs[p] = amap s.addrs[p] = amap
} }
exp := time.Now().Add(ttl) exp := mab.clock.Now().Add(ttl)
for _, addr := range addrs { for _, addr := range addrs {
if addr == nil { if addr == nil {
log.Warnw("was passed nil multiaddr", "peer", p) log.Warnw("was passed nil multiaddr", "peer", p)
@ -253,7 +274,7 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
s.addrs[p] = amap s.addrs[p] = amap
} }
exp := time.Now().Add(ttl) exp := mab.clock.Now().Add(ttl)
for _, addr := range addrs { for _, addr := range addrs {
if addr == nil { if addr == nil {
log.Warnw("was passed nil multiaddr", "peer", p) log.Warnw("was passed nil multiaddr", "peer", p)
@ -278,7 +299,7 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t
s := mab.segments.get(p) s := mab.segments.get(p)
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
exp := time.Now().Add(newTTL) exp := mab.clock.Now().Add(newTTL)
amap, found := s.addrs[p] amap, found := s.addrs[p]
if !found { if !found {
return return
@ -303,11 +324,10 @@ func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
return validAddrs(s.addrs[p]) return validAddrs(mab.clock.Now(), s.addrs[p])
} }
func validAddrs(amap map[string]*expiringAddr) []ma.Multiaddr { func validAddrs(now time.Time, amap map[string]*expiringAddr) []ma.Multiaddr {
now := time.Now()
good := make([]ma.Multiaddr, 0, len(amap)) good := make([]ma.Multiaddr, 0, len(amap))
if amap == nil { if amap == nil {
return good return good
@ -332,7 +352,7 @@ func (mab *memoryAddrBook) GetPeerRecord(p peer.ID) *record.Envelope {
// although the signed record gets garbage collected when all addrs inside it are expired, // although the signed record gets garbage collected when all addrs inside it are expired,
// we may be in between the expiration time and the GC interval // we may be in between the expiration time and the GC interval
// so, we check to see if we have any valid signed addrs before returning the record // so, we check to see if we have any valid signed addrs before returning the record
if len(validAddrs(s.addrs[p])) == 0 { if len(validAddrs(mab.clock.Now(), s.addrs[p])) == 0 {
return nil return nil
} }

View File

@ -26,12 +26,21 @@ type Option interface{}
// NewPeerstore creates an in-memory threadsafe collection of peers. // NewPeerstore creates an in-memory threadsafe collection of peers.
// It's the caller's responsibility to call RemovePeer to ensure // It's the caller's responsibility to call RemovePeer to ensure
// that memory consumption of the peerstore doesn't grow unboundedly. // that memory consumption of the peerstore doesn't grow unboundedly.
func NewPeerstore(opts ...Option) (*pstoremem, error) { func NewPeerstore(opts ...Option) (ps *pstoremem, err error) {
ab := NewAddrBook()
defer func() {
if err != nil {
ab.Close()
}
}()
var protoBookOpts []ProtoBookOption var protoBookOpts []ProtoBookOption
for _, opt := range opts { for _, opt := range opts {
switch o := opt.(type) { switch o := opt.(type) {
case ProtoBookOption: case ProtoBookOption:
protoBookOpts = append(protoBookOpts, o) protoBookOpts = append(protoBookOpts, o)
case AddrBookOption:
o(ab)
default: default:
return nil, fmt.Errorf("unexpected peer store option: %v", o) return nil, fmt.Errorf("unexpected peer store option: %v", o)
} }
@ -43,7 +52,7 @@ func NewPeerstore(opts ...Option) (*pstoremem, error) {
return &pstoremem{ return &pstoremem{
Metrics: pstore.NewMetrics(), Metrics: pstore.NewMetrics(),
memoryKeyBook: NewKeyBook(), memoryKeyBook: NewKeyBook(),
memoryAddrBook: NewAddrBook(), memoryAddrBook: ab,
memoryProtoBook: pb, memoryProtoBook: pb,
memoryPeerMetadata: NewPeerMetadata(), memoryPeerMetadata: NewPeerMetadata(),
}, nil }, nil