mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2025-03-28 13:40:05 +08:00
Merge pull request #200 from libp2p/marco/with-clock-2
feat: Use a clock interface in pstoreds as well
This commit is contained in:
commit
9f3a96b88d
1
go.mod
1
go.mod
@ -5,6 +5,7 @@ go 1.17
|
|||||||
retract v0.2.9 // Contains backwards-incompatible changes. Use v0.3.0 instead.
|
retract v0.2.9 // Contains backwards-incompatible changes. Use v0.3.0 instead.
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/benbjohnson/clock v1.3.0
|
||||||
github.com/gogo/protobuf v1.3.2
|
github.com/gogo/protobuf v1.3.2
|
||||||
github.com/hashicorp/golang-lru v0.5.4
|
github.com/hashicorp/golang-lru v0.5.4
|
||||||
github.com/ipfs/go-datastore v0.5.0
|
github.com/ipfs/go-datastore v0.5.0
|
||||||
|
2
go.sum
2
go.sum
@ -6,6 +6,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
|
|||||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||||
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
|
||||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||||
|
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
|
||||||
|
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||||
github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw=
|
github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw=
|
||||||
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
|
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
|
||||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
|
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
|
||||||
|
@ -83,11 +83,11 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
|
|||||||
// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.)
|
// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.)
|
||||||
//
|
//
|
||||||
// If the return value is true, the caller should perform a flush immediately to sync the record with the store.
|
// If the return value is true, the caller should perform a flush immediately to sync the record with the store.
|
||||||
func (r *addrsRecord) clean() (chgd bool) {
|
func (r *addrsRecord) clean(now time.Time) (chgd bool) {
|
||||||
now := time.Now().Unix()
|
nowUnix := now.Unix()
|
||||||
addrsLen := len(r.Addrs)
|
addrsLen := len(r.Addrs)
|
||||||
|
|
||||||
if !r.dirty && !r.hasExpiredAddrs(now) {
|
if !r.dirty && !r.hasExpiredAddrs(nowUnix) {
|
||||||
// record is not dirty, and we have no expired entries to purge.
|
// record is not dirty, and we have no expired entries to purge.
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -104,7 +104,7 @@ func (r *addrsRecord) clean() (chgd bool) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Addrs = removeExpired(r.Addrs, now)
|
r.Addrs = removeExpired(r.Addrs, nowUnix)
|
||||||
|
|
||||||
return r.dirty || len(r.Addrs) != addrsLen
|
return r.dirty || len(r.Addrs) != addrsLen
|
||||||
}
|
}
|
||||||
@ -144,6 +144,23 @@ type dsAddrBook struct {
|
|||||||
// controls children goroutine lifetime.
|
// controls children goroutine lifetime.
|
||||||
childrenDone sync.WaitGroup
|
childrenDone sync.WaitGroup
|
||||||
cancelFn func()
|
cancelFn func()
|
||||||
|
|
||||||
|
clock clock
|
||||||
|
}
|
||||||
|
|
||||||
|
type clock interface {
|
||||||
|
Now() time.Time
|
||||||
|
After(d time.Duration) <-chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type realclock struct{}
|
||||||
|
|
||||||
|
func (rc realclock) Now() time.Time {
|
||||||
|
return time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc realclock) After(d time.Duration) <-chan time.Time {
|
||||||
|
return time.After(d)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ pstore.AddrBook = (*dsAddrBook)(nil)
|
var _ pstore.AddrBook = (*dsAddrBook)(nil)
|
||||||
@ -176,6 +193,11 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd
|
|||||||
opts: opts,
|
opts: opts,
|
||||||
cancelFn: cancelFn,
|
cancelFn: cancelFn,
|
||||||
subsManager: pstoremem.NewAddrSubManager(),
|
subsManager: pstoremem.NewAddrSubManager(),
|
||||||
|
clock: realclock{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.Clock != nil {
|
||||||
|
ab.clock = opts.Clock
|
||||||
}
|
}
|
||||||
|
|
||||||
if opts.CacheSize > 0 {
|
if opts.CacheSize > 0 {
|
||||||
@ -212,7 +234,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
|
|||||||
pr.Lock()
|
pr.Lock()
|
||||||
defer pr.Unlock()
|
defer pr.Unlock()
|
||||||
|
|
||||||
if pr.clean() && update {
|
if pr.clean(ab.clock.Now()) && update {
|
||||||
err = pr.flush(ab.ds)
|
err = pr.flush(ab.ds)
|
||||||
}
|
}
|
||||||
return pr, err
|
return pr, err
|
||||||
@ -231,7 +253,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// this record is new and local for now (not in cache), so we don't need to lock.
|
// this record is new and local for now (not in cache), so we don't need to lock.
|
||||||
if pr.clean() && update {
|
if pr.clean(ab.clock.Now()) && update {
|
||||||
err = pr.flush(ab.ds)
|
err = pr.flush(ab.ds)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -383,7 +405,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
|
|||||||
pr.Lock()
|
pr.Lock()
|
||||||
defer pr.Unlock()
|
defer pr.Unlock()
|
||||||
|
|
||||||
newExp := time.Now().Add(newTTL).Unix()
|
newExp := ab.clock.Now().Add(newTTL).Unix()
|
||||||
for _, entry := range pr.Addrs {
|
for _, entry := range pr.Addrs {
|
||||||
if entry.Ttl != int64(oldTTL) {
|
if entry.Ttl != int64(oldTTL) {
|
||||||
continue
|
continue
|
||||||
@ -392,7 +414,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
|
|||||||
pr.dirty = true
|
pr.dirty = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if pr.clean() {
|
if pr.clean(ab.clock.Now()) {
|
||||||
pr.flush(ab.ds)
|
pr.flush(ab.ds)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -461,7 +483,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
|
|||||||
// return nil
|
// return nil
|
||||||
// }
|
// }
|
||||||
|
|
||||||
newExp := time.Now().Add(ttl).Unix()
|
newExp := ab.clock.Now().Add(ttl).Unix()
|
||||||
addrsMap := make(map[string]*pb.AddrBookRecord_AddrEntry, len(pr.Addrs))
|
addrsMap := make(map[string]*pb.AddrBookRecord_AddrEntry, len(pr.Addrs))
|
||||||
for _, addr := range pr.Addrs {
|
for _, addr := range pr.Addrs {
|
||||||
addrsMap[string(addr.Addr.Bytes())] = addr
|
addrsMap[string(addr.Addr.Bytes())] = addr
|
||||||
@ -521,7 +543,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
|
|||||||
// }
|
// }
|
||||||
|
|
||||||
pr.dirty = true
|
pr.dirty = true
|
||||||
pr.clean()
|
pr.clean(ab.clock.Now())
|
||||||
return pr.flush(ab.ds)
|
return pr.flush(ab.ds)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -567,7 +589,7 @@ func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
|
|||||||
pr.Addrs = deleteInPlace(pr.Addrs, addrs)
|
pr.Addrs = deleteInPlace(pr.Addrs, addrs)
|
||||||
|
|
||||||
pr.dirty = true
|
pr.dirty = true
|
||||||
pr.clean()
|
pr.clean(ab.clock.Now())
|
||||||
return pr.flush(ab.ds)
|
return pr.flush(ab.ds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ var (
|
|||||||
// queries
|
// queries
|
||||||
purgeLookaheadQuery = query.Query{
|
purgeLookaheadQuery = query.Query{
|
||||||
Prefix: gcLookaheadBase.String(),
|
Prefix: gcLookaheadBase.String(),
|
||||||
Orders: []query.Order{query.OrderByKey{}},
|
Orders: []query.Order{query.OrderByFunction(orderByTimestampInKey)},
|
||||||
KeysOnly: true,
|
KeysOnly: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,7 +95,7 @@ func (gc *dsAddrBookGc) background() {
|
|||||||
defer gc.ab.childrenDone.Done()
|
defer gc.ab.childrenDone.Done()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(gc.ab.opts.GCInitialDelay):
|
case <-gc.ab.clock.After(gc.ab.opts.GCInitialDelay):
|
||||||
case <-gc.ab.ctx.Done():
|
case <-gc.ab.ctx.Done():
|
||||||
// yield if we have been cancelled/closed before the delay elapses.
|
// yield if we have been cancelled/closed before the delay elapses.
|
||||||
return
|
return
|
||||||
@ -180,7 +180,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
|
|||||||
}
|
}
|
||||||
defer results.Close()
|
defer results.Close()
|
||||||
|
|
||||||
now := time.Now().Unix()
|
now := gc.ab.clock.Now().Unix()
|
||||||
|
|
||||||
// keys: /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32>
|
// keys: /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32>
|
||||||
// values: nil
|
// values: nil
|
||||||
@ -214,7 +214,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
|
|||||||
if e, ok := gc.ab.cache.Peek(id); ok {
|
if e, ok := gc.ab.cache.Peek(id); ok {
|
||||||
cached := e.(*addrsRecord)
|
cached := e.(*addrsRecord)
|
||||||
cached.Lock()
|
cached.Lock()
|
||||||
if cached.clean() {
|
if cached.clean(gc.ab.clock.Now()) {
|
||||||
if err = cached.flush(batch); err != nil {
|
if err = cached.flush(batch); err != nil {
|
||||||
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
|
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
|
||||||
}
|
}
|
||||||
@ -239,7 +239,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
|
|||||||
dropInError(gcKey, err, "unmarshalling entry")
|
dropInError(gcKey, err, "unmarshalling entry")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if record.clean() {
|
if record.clean(gc.ab.clock.Now()) {
|
||||||
err = record.flush(batch)
|
err = record.flush(batch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
|
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
|
||||||
@ -284,7 +284,7 @@ func (gc *dsAddrBookGc) purgeStore() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
id := record.Id.ID
|
id := record.Id.ID
|
||||||
if !record.clean() {
|
if !record.clean(gc.ab.clock.Now()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -317,7 +317,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
until := time.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()
|
until := gc.ab.clock.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()
|
||||||
|
|
||||||
var id peer.ID
|
var id peer.ID
|
||||||
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
|
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
|
||||||
@ -386,3 +386,25 @@ func (gc *dsAddrBookGc) populateLookahead() {
|
|||||||
|
|
||||||
gc.currWindowEnd = until
|
gc.currWindowEnd = until
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// orderByTimestampInKey orders the results by comparing the timestamp in the
|
||||||
|
// key. A lexiographic sort by itself is wrong since "10" is less than "2", but
|
||||||
|
// as an int 2 is obviously less than 10.
|
||||||
|
func orderByTimestampInKey(a, b query.Entry) int {
|
||||||
|
aKey := ds.RawKey(a.Key)
|
||||||
|
aInt, err := strconv.ParseInt(aKey.Parent().Name(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
bKey := ds.RawKey(b.Key)
|
||||||
|
bInt, err := strconv.ParseInt(bKey.Parent().Name(), 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
if aInt < bInt {
|
||||||
|
return -1
|
||||||
|
} else if aInt == bInt {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
mockClock "github.com/benbjohnson/clock"
|
||||||
query "github.com/ipfs/go-datastore/query"
|
query "github.com/ipfs/go-datastore/query"
|
||||||
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
||||||
test "github.com/libp2p/go-libp2p-peerstore/test"
|
test "github.com/libp2p/go-libp2p-peerstore/test"
|
||||||
@ -90,6 +91,8 @@ func TestGCPurging(t *testing.T) {
|
|||||||
opts.GCInitialDelay = 90 * time.Hour
|
opts.GCInitialDelay = 90 * time.Hour
|
||||||
opts.GCLookaheadInterval = 20 * time.Second
|
opts.GCLookaheadInterval = 20 * time.Second
|
||||||
opts.GCPurgeInterval = 1 * time.Second
|
opts.GCPurgeInterval = 1 * time.Second
|
||||||
|
clk := mockClock.NewMock()
|
||||||
|
opts.Clock = clk
|
||||||
|
|
||||||
factory := addressBookFactory(t, leveldbStore, opts)
|
factory := addressBookFactory(t, leveldbStore, opts)
|
||||||
ab, closeFn := factory()
|
ab, closeFn := factory()
|
||||||
@ -120,7 +123,7 @@ func TestGCPurging(t *testing.T) {
|
|||||||
t.Errorf("expected 4 GC lookahead entries, got: %v", i)
|
t.Errorf("expected 4 GC lookahead entries, got: %v", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
<-time.After(2 * time.Second)
|
clk.Add(2 * time.Second)
|
||||||
gc.purgeLookahead()
|
gc.purgeLookahead()
|
||||||
if i := tp.countLookaheadEntries(); i != 3 {
|
if i := tp.countLookaheadEntries(); i != 3 {
|
||||||
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
|
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
|
||||||
@ -129,13 +132,13 @@ func TestGCPurging(t *testing.T) {
|
|||||||
// Purge the cache, to exercise a different path in the purge cycle.
|
// Purge the cache, to exercise a different path in the purge cycle.
|
||||||
tp.clearCache()
|
tp.clearCache()
|
||||||
|
|
||||||
<-time.After(5 * time.Second)
|
clk.Add(5 * time.Second)
|
||||||
gc.purgeLookahead()
|
gc.purgeLookahead()
|
||||||
if i := tp.countLookaheadEntries(); i != 3 {
|
if i := tp.countLookaheadEntries(); i != 3 {
|
||||||
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
|
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
<-time.After(5 * time.Second)
|
clk.Add(5 * time.Second)
|
||||||
gc.purgeLookahead()
|
gc.purgeLookahead()
|
||||||
if i := tp.countLookaheadEntries(); i != 1 {
|
if i := tp.countLookaheadEntries(); i != 1 {
|
||||||
t.Errorf("expected 1 GC lookahead entries, got: %v", i)
|
t.Errorf("expected 1 GC lookahead entries, got: %v", i)
|
||||||
@ -157,6 +160,8 @@ func TestGCDelay(t *testing.T) {
|
|||||||
opts.GCInitialDelay = 2 * time.Second
|
opts.GCInitialDelay = 2 * time.Second
|
||||||
opts.GCLookaheadInterval = 1 * time.Minute
|
opts.GCLookaheadInterval = 1 * time.Minute
|
||||||
opts.GCPurgeInterval = 30 * time.Second
|
opts.GCPurgeInterval = 30 * time.Second
|
||||||
|
clk := mockClock.NewMock()
|
||||||
|
opts.Clock = clk
|
||||||
|
|
||||||
factory := addressBookFactory(t, leveldbStore, opts)
|
factory := addressBookFactory(t, leveldbStore, opts)
|
||||||
ab, closeFn := factory()
|
ab, closeFn := factory()
|
||||||
@ -172,7 +177,7 @@ func TestGCDelay(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// after the initial delay has passed.
|
// after the initial delay has passed.
|
||||||
<-time.After(3 * time.Second)
|
clk.Add(3 * time.Second)
|
||||||
if i := tp.countLookaheadEntries(); i != 1 {
|
if i := tp.countLookaheadEntries(); i != 1 {
|
||||||
t.Errorf("expected 1 lookahead entry, got: %d", i)
|
t.Errorf("expected 1 lookahead entry, got: %d", i)
|
||||||
}
|
}
|
||||||
@ -188,6 +193,8 @@ func TestGCLookaheadDisabled(t *testing.T) {
|
|||||||
opts.GCInitialDelay = 90 * time.Hour
|
opts.GCInitialDelay = 90 * time.Hour
|
||||||
opts.GCLookaheadInterval = 0 // disable lookahead
|
opts.GCLookaheadInterval = 0 // disable lookahead
|
||||||
opts.GCPurgeInterval = 9 * time.Hour
|
opts.GCPurgeInterval = 9 * time.Hour
|
||||||
|
clk := mockClock.NewMock()
|
||||||
|
opts.Clock = clk
|
||||||
|
|
||||||
factory := addressBookFactory(t, leveldbStore, opts)
|
factory := addressBookFactory(t, leveldbStore, opts)
|
||||||
ab, closeFn := factory()
|
ab, closeFn := factory()
|
||||||
@ -206,13 +213,13 @@ func TestGCLookaheadDisabled(t *testing.T) {
|
|||||||
ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour)
|
ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour)
|
||||||
ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour)
|
ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour)
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
clk.Add(100 * time.Millisecond)
|
||||||
|
|
||||||
if i := tp.countLookaheadEntries(); i != 0 {
|
if i := tp.countLookaheadEntries(); i != 0 {
|
||||||
t.Errorf("expected no GC lookahead entries, got: %v", i)
|
t.Errorf("expected no GC lookahead entries, got: %v", i)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
clk.Add(500 * time.Millisecond)
|
||||||
gc := ab.(*dsAddrBook).gc
|
gc := ab.(*dsAddrBook).gc
|
||||||
gc.purgeFunc()
|
gc.purgeFunc()
|
||||||
|
|
||||||
|
@ -15,6 +15,8 @@ import (
|
|||||||
|
|
||||||
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
||||||
pt "github.com/libp2p/go-libp2p-peerstore/test"
|
pt "github.com/libp2p/go-libp2p-peerstore/test"
|
||||||
|
|
||||||
|
mockClock "github.com/benbjohnson/clock"
|
||||||
)
|
)
|
||||||
|
|
||||||
type datastoreFactory func(tb testing.TB) (ds.Batching, func())
|
type datastoreFactory func(tb testing.TB) (ds.Batching, func())
|
||||||
@ -50,16 +52,20 @@ func TestDsAddrBook(t *testing.T) {
|
|||||||
opts := DefaultOpts()
|
opts := DefaultOpts()
|
||||||
opts.GCPurgeInterval = 1 * time.Second
|
opts.GCPurgeInterval = 1 * time.Second
|
||||||
opts.CacheSize = 1024
|
opts.CacheSize = 1024
|
||||||
|
clk := mockClock.NewMock()
|
||||||
|
opts.Clock = clk
|
||||||
|
|
||||||
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
|
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run(name+" Cacheless", func(t *testing.T) {
|
t.Run(name+" Cacheless", func(t *testing.T) {
|
||||||
opts := DefaultOpts()
|
opts := DefaultOpts()
|
||||||
opts.GCPurgeInterval = 1 * time.Second
|
opts.GCPurgeInterval = 1 * time.Second
|
||||||
opts.CacheSize = 0
|
opts.CacheSize = 0
|
||||||
|
clk := mockClock.NewMock()
|
||||||
|
opts.Clock = clk
|
||||||
|
|
||||||
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
|
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts), clk)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,8 @@ type Options struct {
|
|||||||
// Initial delay before GC processes start. Intended to give the system breathing room to fully boot
|
// Initial delay before GC processes start. Intended to give the system breathing room to fully boot
|
||||||
// before starting GC.
|
// before starting GC.
|
||||||
GCInitialDelay time.Duration
|
GCInitialDelay time.Duration
|
||||||
|
|
||||||
|
Clock clock
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm:
|
// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm:
|
||||||
@ -50,6 +52,7 @@ func DefaultOpts() Options {
|
|||||||
GCPurgeInterval: 2 * time.Hour,
|
GCPurgeInterval: 2 * time.Hour,
|
||||||
GCLookaheadInterval: 0,
|
GCLookaheadInterval: 0,
|
||||||
GCInitialDelay: 60 * time.Second,
|
GCInitialDelay: 60 * time.Second,
|
||||||
|
Clock: realclock{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
||||||
|
|
||||||
|
mockClock "github.com/benbjohnson/clock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
@ -43,11 +44,12 @@ func TestPeerstoreProtoStoreLimits(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestInMemoryAddrBook(t *testing.T) {
|
func TestInMemoryAddrBook(t *testing.T) {
|
||||||
|
clk := mockClock.NewMock()
|
||||||
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
|
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
|
||||||
ps, err := NewPeerstore()
|
ps, err := NewPeerstore(WithClock(clk))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return ps, func() { ps.Close() }
|
return ps, func() { ps.Close() }
|
||||||
})
|
}, clk)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInMemoryKeyBook(t *testing.T) {
|
func TestInMemoryKeyBook(t *testing.T) {
|
||||||
|
@ -1,18 +1,20 @@
|
|||||||
package test
|
package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
mockClock "github.com/benbjohnson/clock"
|
||||||
"github.com/libp2p/go-libp2p-core/crypto"
|
"github.com/libp2p/go-libp2p-core/crypto"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/libp2p/go-libp2p-core/record"
|
"github.com/libp2p/go-libp2p-core/record"
|
||||||
"github.com/libp2p/go-libp2p-core/test"
|
"github.com/libp2p/go-libp2p-core/test"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
||||||
)
|
)
|
||||||
|
|
||||||
var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){
|
var addressBookSuite = map[string]func(book pstore.AddrBook, clk *mockClock.Mock) func(*testing.T){
|
||||||
"AddAddress": testAddAddress,
|
"AddAddress": testAddAddress,
|
||||||
"Clear": testClearWorks,
|
"Clear": testClearWorks,
|
||||||
"SetNegativeTTLClears": testSetNegativeTTLClears,
|
"SetNegativeTTLClears": testSetNegativeTTLClears,
|
||||||
@ -26,13 +28,13 @@ var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){
|
|||||||
|
|
||||||
type AddrBookFactory func() (pstore.AddrBook, func())
|
type AddrBookFactory func() (pstore.AddrBook, func())
|
||||||
|
|
||||||
func TestAddrBook(t *testing.T, factory AddrBookFactory) {
|
func TestAddrBook(t *testing.T, factory AddrBookFactory, clk *mockClock.Mock) {
|
||||||
for name, test := range addressBookSuite {
|
for name, test := range addressBookSuite {
|
||||||
// Create a new peerstore.
|
// Create a new peerstore.
|
||||||
ab, closeFunc := factory()
|
ab, closeFunc := factory()
|
||||||
|
|
||||||
// Run the test.
|
// Run the test.
|
||||||
t.Run(name, test(ab))
|
t.Run(name, test(ab, clk))
|
||||||
|
|
||||||
// Cleanup.
|
// Cleanup.
|
||||||
if closeFunc != nil {
|
if closeFunc != nil {
|
||||||
@ -41,7 +43,7 @@ func TestAddrBook(t *testing.T, factory AddrBookFactory) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
|
func testAddAddress(ab pstore.AddrBook, clk *mockClock.Mock) func(*testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
t.Run("add a single address", func(t *testing.T) {
|
t.Run("add a single address", func(t *testing.T) {
|
||||||
id := GeneratePeerIDs(1)[0]
|
id := GeneratePeerIDs(1)[0]
|
||||||
@ -90,7 +92,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
|
|||||||
ab.AddAddrs(id, addrs[2:], time.Hour)
|
ab.AddAddrs(id, addrs[2:], time.Hour)
|
||||||
|
|
||||||
// after the initial TTL has expired, check that only the third address is present.
|
// after the initial TTL has expired, check that only the third address is present.
|
||||||
time.Sleep(1200 * time.Millisecond)
|
clk.Add(1200 * time.Millisecond)
|
||||||
AssertAddressesEqual(t, addrs[2:], ab.Addrs(id))
|
AssertAddressesEqual(t, addrs[2:], ab.Addrs(id))
|
||||||
|
|
||||||
// make sure we actually set the TTL
|
// make sure we actually set the TTL
|
||||||
@ -109,7 +111,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
|
|||||||
|
|
||||||
// after the initial TTL has expired, check that all three addresses are still present (i.e. the TTL on
|
// after the initial TTL has expired, check that all three addresses are still present (i.e. the TTL on
|
||||||
// the modified one was not shortened).
|
// the modified one was not shortened).
|
||||||
time.Sleep(2100 * time.Millisecond)
|
clk.Add(2100 * time.Millisecond)
|
||||||
AssertAddressesEqual(t, addrs, ab.Addrs(id))
|
AssertAddressesEqual(t, addrs, ab.Addrs(id))
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -119,11 +121,11 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
|
|||||||
|
|
||||||
ab.AddAddrs(id, addrs, 4*time.Second)
|
ab.AddAddrs(id, addrs, 4*time.Second)
|
||||||
// 4 seconds left
|
// 4 seconds left
|
||||||
time.Sleep(2 * time.Second)
|
clk.Add(2 * time.Second)
|
||||||
// 2 second left
|
// 2 second left
|
||||||
ab.AddAddrs(id, addrs, 3*time.Second)
|
ab.AddAddrs(id, addrs, 3*time.Second)
|
||||||
// 3 seconds left
|
// 3 seconds left
|
||||||
time.Sleep(1 * time.Second)
|
clk.Add(1 * time.Second)
|
||||||
// 2 seconds left.
|
// 2 seconds left.
|
||||||
|
|
||||||
// We still have the address.
|
// We still have the address.
|
||||||
@ -136,7 +138,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testClearWorks(ab pstore.AddrBook) func(t *testing.T) {
|
func testClearWorks(ab pstore.AddrBook, clk *mockClock.Mock) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
ids := GeneratePeerIDs(2)
|
ids := GeneratePeerIDs(2)
|
||||||
addrs := GenerateAddrs(5)
|
addrs := GenerateAddrs(5)
|
||||||
@ -157,7 +159,7 @@ func testClearWorks(ab pstore.AddrBook) func(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) {
|
func testSetNegativeTTLClears(m pstore.AddrBook, clk *mockClock.Mock) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
id := GeneratePeerIDs(1)[0]
|
id := GeneratePeerIDs(1)[0]
|
||||||
addrs := GenerateAddrs(100)
|
addrs := GenerateAddrs(100)
|
||||||
@ -201,7 +203,7 @@ func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
|
func testUpdateTTLs(m pstore.AddrBook, clk *mockClock.Mock) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
t.Run("update ttl of peer with no addrs", func(t *testing.T) {
|
t.Run("update ttl of peer with no addrs", func(t *testing.T) {
|
||||||
id := GeneratePeerIDs(1)[0]
|
id := GeneratePeerIDs(1)[0]
|
||||||
@ -246,7 +248,7 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
||||||
|
|
||||||
// After a wait, addrs[0] is gone.
|
// After a wait, addrs[0] is gone.
|
||||||
time.Sleep(2 * time.Second)
|
clk.Add(2 * time.Second)
|
||||||
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||||
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
||||||
|
|
||||||
@ -257,7 +259,7 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||||
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
clk.Add(2 * time.Second)
|
||||||
|
|
||||||
// First addrs is gone in both.
|
// First addrs is gone in both.
|
||||||
AssertAddressesEqual(t, addrs1[1:], m.Addrs(ids[0]))
|
AssertAddressesEqual(t, addrs1[1:], m.Addrs(ids[0]))
|
||||||
@ -267,7 +269,7 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNilAddrsDontBreak(m pstore.AddrBook) func(t *testing.T) {
|
func testNilAddrsDontBreak(m pstore.AddrBook, clk *mockClock.Mock) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
id := GeneratePeerIDs(1)[0]
|
id := GeneratePeerIDs(1)[0]
|
||||||
|
|
||||||
@ -276,7 +278,7 @@ func testNilAddrsDontBreak(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) {
|
func testAddressesExpire(m pstore.AddrBook, clk *mockClock.Mock) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
ids := GeneratePeerIDs(2)
|
ids := GeneratePeerIDs(2)
|
||||||
addrs1 := GenerateAddrs(3)
|
addrs1 := GenerateAddrs(3)
|
||||||
@ -295,33 +297,33 @@ func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
||||||
|
|
||||||
m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond)
|
m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond)
|
||||||
<-time.After(100 * time.Millisecond)
|
clk.Add(100 * time.Millisecond)
|
||||||
AssertAddressesEqual(t, addrs1[1:3], m.Addrs(ids[0]))
|
AssertAddressesEqual(t, addrs1[1:3], m.Addrs(ids[0]))
|
||||||
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
||||||
|
|
||||||
m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond)
|
m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond)
|
||||||
<-time.After(100 * time.Millisecond)
|
clk.Add(100 * time.Millisecond)
|
||||||
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||||
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
|
||||||
|
|
||||||
m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond)
|
m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond)
|
||||||
<-time.After(100 * time.Millisecond)
|
clk.Add(100 * time.Millisecond)
|
||||||
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||||
AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1]))
|
AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1]))
|
||||||
|
|
||||||
m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond)
|
m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond)
|
||||||
<-time.After(100 * time.Millisecond)
|
clk.Add(100 * time.Millisecond)
|
||||||
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||||
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
|
||||||
|
|
||||||
m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond)
|
m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond)
|
||||||
<-time.After(100 * time.Millisecond)
|
clk.Add(100 * time.Millisecond)
|
||||||
AssertAddressesEqual(t, nil, m.Addrs(ids[0]))
|
AssertAddressesEqual(t, nil, m.Addrs(ids[0]))
|
||||||
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
|
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testClearWithIterator(m pstore.AddrBook) func(t *testing.T) {
|
func testClearWithIterator(m pstore.AddrBook, clk *mockClock.Mock) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
ids := GeneratePeerIDs(2)
|
ids := GeneratePeerIDs(2)
|
||||||
addrs := GenerateAddrs(100)
|
addrs := GenerateAddrs(100)
|
||||||
@ -348,7 +350,7 @@ func testClearWithIterator(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) {
|
func testPeersWithAddrs(m pstore.AddrBook, clk *mockClock.Mock) func(t *testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
// cannot run in parallel as the store is modified.
|
// cannot run in parallel as the store is modified.
|
||||||
// go runs sequentially in the specified order
|
// go runs sequentially in the specified order
|
||||||
@ -374,7 +376,7 @@ func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCertifiedAddresses(m pstore.AddrBook) func(*testing.T) {
|
func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
cab := m.(pstore.CertifiedAddrBook)
|
cab := m.(pstore.CertifiedAddrBook)
|
||||||
|
|
||||||
@ -485,7 +487,7 @@ func testCertifiedAddresses(m pstore.AddrBook) func(*testing.T) {
|
|||||||
test.AssertNilError(t, err)
|
test.AssertNilError(t, err)
|
||||||
AssertAddressesEqual(t, certifiedAddrs, m.Addrs(id))
|
AssertAddressesEqual(t, certifiedAddrs, m.Addrs(id))
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
clk.Add(2 * time.Second)
|
||||||
if cab.GetPeerRecord(id) != nil {
|
if cab.GetPeerRecord(id) != nil {
|
||||||
t.Error("expected signed peer record to be removed when addresses expire")
|
t.Error("expected signed peer record to be removed when addresses expire")
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,3 @@
|
|||||||
{
|
{
|
||||||
"version": "v0.6.0"
|
"version": "v0.7.0"
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user