From fec786e9efd72f72357731ef1ccd4d24a3e2d773 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 6 Feb 2019 09:35:10 +0000 Subject: [PATCH] pstore ds: make gc lookahead window optional. --- pstoreds/addr_book.go | 5 +- pstoreds/addr_book_gc.go | 111 +++++++++++++++++++++++++++++----- pstoreds/addr_book_gc_test.go | 60 +++++++++++++++--- pstoreds/peerstore.go | 29 +++++---- test/addr_book_suite.go | 99 ++++++++++++------------------ test/utils.go | 22 +++++++ 6 files changed, 229 insertions(+), 97 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index ff2dfa4..cf766ba 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -154,11 +154,12 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd flushJobCh: make(chan *addrsRecord, 32), } - ab.gc = newAddressBookGc(ctx, ab) + if ab.gc, err = newAddressBookGc(ctx, ab); err != nil { + return nil, err + } // kick off background processes. go ab.flusher() - go ab.gc.background() return ab, nil } diff --git a/pstoreds/addr_book_gc.go b/pstoreds/addr_book_gc.go index 80a7a61..931dad2 100644 --- a/pstoreds/addr_book_gc.go +++ b/pstoreds/addr_book_gc.go @@ -24,12 +24,18 @@ var ( gcOpsPerBatch = 20 // queries - purgeQuery = query.Query{ + purgeLookaheadQuery = query.Query{ Prefix: gcLookaheadBase.String(), Orders: []query.Order{query.OrderByKey{}}, KeysOnly: true, } + purgeStoreQuery = query.Query{ + Prefix: addrBookBase.String(), + Orders: []query.Order{query.OrderByKey{}}, + KeysOnly: false, + } + populateLookaheadQuery = query.Query{ Prefix: addrBookBase.String(), Orders: []query.Order{query.OrderByKey{}}, @@ -39,18 +45,45 @@ var ( // dsAddrBookGc encapsulates the GC behaviour to maintain a datastore-backed address book. type dsAddrBookGc struct { - ctx context.Context - ab *dsAddrBook - running chan struct{} - currWindowEnd int64 + ctx context.Context + ab *dsAddrBook + running chan struct{} + lookaheadEnabled bool + purgeFunc func() + currWindowEnd int64 } -func newAddressBookGc(ctx context.Context, ab *dsAddrBook) *dsAddrBookGc { - return &dsAddrBookGc{ - ctx: ctx, - ab: ab, - running: make(chan struct{}, 1), +func newAddressBookGc(ctx context.Context, ab *dsAddrBook) (*dsAddrBookGc, error) { + if ab.opts.GCPurgeInterval < 0 { + return nil, fmt.Errorf("negative GC purge interval provided: %s", ab.opts.GCPurgeInterval) } + if ab.opts.GCLookaheadInterval < 0 { + return nil, fmt.Errorf("negative GC lookahead interval provided: %s", ab.opts.GCLookaheadInterval) + } + if ab.opts.GCInitialDelay < 0 { + return nil, fmt.Errorf("negative GC initial delay provided: %s", ab.opts.GCInitialDelay) + } + + lookaheadEnabled := ab.opts.GCLookaheadInterval > 0 + gc := &dsAddrBookGc{ + ctx: ctx, + ab: ab, + running: make(chan struct{}, 1), + lookaheadEnabled: lookaheadEnabled, + } + + if lookaheadEnabled { + gc.purgeFunc = gc.purgeLookahead + } else { + gc.purgeFunc = gc.purgeStore + } + + // do not start GC timers if purge is disabled; this GC can only be triggered manually. + if ab.opts.GCPurgeInterval > 0 { + go gc.background() + } + + return gc, nil } // gc prunes expired addresses from the datastore at regular intervals. It should be spawned as a goroutine. @@ -69,7 +102,7 @@ func (gc *dsAddrBookGc) background() { defer purgeTimer.Stop() var lookaheadCh <-chan time.Time - if gc.ab.opts.GCLookaheadInterval > 0 { + if gc.lookaheadEnabled { lookaheadTimer := time.NewTicker(gc.ab.opts.GCLookaheadInterval) lookaheadCh = lookaheadTimer.C defer lookaheadTimer.Stop() @@ -78,7 +111,7 @@ func (gc *dsAddrBookGc) background() { for { select { case <-purgeTimer.C: - gc.purgeCycle() + gc.purgeFunc() case <-lookaheadCh: // will never trigger if lookahead is disabled (nil Duration). @@ -92,7 +125,7 @@ func (gc *dsAddrBookGc) background() { // purgeCycle runs a single GC purge cycle. It operates within the lookahead window if lookahead is enabled; else it // visits all entries in the datastore, deleting the addresses that have expired. -func (gc *dsAddrBookGc) purgeCycle() { +func (gc *dsAddrBookGc) purgeLookahead() { select { case gc.running <- struct{}{}: defer func() { <-gc.running }() @@ -136,7 +169,7 @@ func (gc *dsAddrBookGc) purgeCycle() { } } - results, err := gc.ab.ds.Query(purgeQuery) + results, err := gc.ab.ds.Query(purgeLookaheadQuery) if err != nil { log.Warningf("failed while fetching entries to purge: %v", err) return @@ -216,12 +249,62 @@ func (gc *dsAddrBookGc) purgeCycle() { } } +func (gc *dsAddrBookGc) purgeStore() { + select { + case gc.running <- struct{}{}: + defer func() { <-gc.running }() + default: + // yield if lookahead is running. + return + } + + record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs. + batch, err := newCyclicBatch(gc.ab.ds) + if err != nil { + log.Warningf("failed while creating batch to purge GC entries: %v", err) + } + + results, err := gc.ab.ds.Query(purgeStoreQuery) + if err != nil { + log.Warningf("failed while opening iterator: %v", err) + return + } + defer results.Close() + + // keys: /peers/addrs/ + for result := range results.Next() { + record.Reset() + if err = record.Unmarshal(result.Value); err != nil { + // TODO log + continue + } + + id := record.Id.ID + if !record.Clean() { + continue + } + + if err := record.flush(batch); err != nil { + log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id, err) + } + gc.ab.cache.Remove(id) + } + + if err = batch.Commit(); err != nil { + log.Warningf("failed to commit GC purge batch: %v", err) + } +} + // populateLookahead populates the lookahead window by scanning the entire store and picking entries whose earliest // expiration falls within the window period. // // Those entries are stored in the lookahead region in the store, indexed by the timestamp when they need to be // visited, to facilitate temporal range scans. func (gc *dsAddrBookGc) populateLookahead() { + if gc.ab.opts.GCLookaheadInterval == 0 { + return + } + select { case gc.running <- struct{}{}: defer func() { <-gc.running }() diff --git a/pstoreds/addr_book_gc_test.go b/pstoreds/addr_book_gc_test.go index 3d43363..9486543 100644 --- a/pstoreds/addr_book_gc_test.go +++ b/pstoreds/addr_book_gc_test.go @@ -4,16 +4,17 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore/query" - "github.com/libp2p/go-libp2p-peerstore" - "github.com/libp2p/go-libp2p-peerstore/test" + query "github.com/ipfs/go-datastore/query" + pstore "github.com/libp2p/go-libp2p-peerstore" + test "github.com/libp2p/go-libp2p-peerstore/test" + ma "github.com/multiformats/go-multiaddr" ) var lookaheadQuery = query.Query{Prefix: gcLookaheadBase.String(), KeysOnly: true} type testProbe struct { t *testing.T - ab peerstore.AddrBook + ab pstore.AddrBook } func (tp *testProbe) countLookaheadEntries() (i int) { @@ -119,7 +120,7 @@ func TestGCPurging(t *testing.T) { } <-time.After(2 * time.Second) - gc.purgeCycle() + gc.purgeLookahead() if i := tp.countLookaheadEntries(); i != 3 { t.Errorf("expected 3 GC lookahead entries, got: %v", i) } @@ -128,13 +129,13 @@ func TestGCPurging(t *testing.T) { tp.clearCache() <-time.After(5 * time.Second) - gc.purgeCycle() + gc.purgeLookahead() if i := tp.countLookaheadEntries(); i != 3 { t.Errorf("expected 3 GC lookahead entries, got: %v", i) } <-time.After(5 * time.Second) - gc.purgeCycle() + gc.purgeLookahead() if i := tp.countLookaheadEntries(); i != 1 { t.Errorf("expected 1 GC lookahead entries, got: %v", i) } @@ -176,6 +177,51 @@ func TestGCDelay(t *testing.T) { } } +func TestGCLookaheadDisabled(t *testing.T) { + ids := test.GeneratePeerIDs(10) + addrs := test.GenerateAddrs(100) + + opts := DefaultOpts() + + // effectively disable automatic GC for this test. + opts.GCInitialDelay = 90 * time.Hour + opts.GCLookaheadInterval = 0 // disable lookahead + opts.GCPurgeInterval = 9 * time.Hour + + factory := addressBookFactory(t, badgerStore, opts) + ab, closeFn := factory() + defer closeFn() + + tp := &testProbe{t, ab} + + // four peers: + // ids[0] has 10 addresses, all of which expire in 500ms. + // ids[1] has 20 addresses; 50% expire in 500ms and 50% in 10 hours. + // ids[2] has 10 addresses; all expire in 10 hours. + // ids[3] has 60 addresses; all expire in 10 hours. + ab.AddAddrs(ids[0], addrs[:10], 500*time.Millisecond) + ab.AddAddrs(ids[1], addrs[10:20], 500*time.Millisecond) + ab.AddAddrs(ids[1], addrs[20:30], 10*time.Hour) + ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour) + ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour) + + time.Sleep(100 * time.Millisecond) + + if i := tp.countLookaheadEntries(); i != 0 { + t.Errorf("expected no GC lookahead entries, got: %v", i) + } + + time.Sleep(500 * time.Millisecond) + gc := ab.(*dsAddrBook).gc + gc.purgeFunc() + + var empty []ma.Multiaddr + test.AssertAddressesEqual(t, empty, ab.Addrs(ids[0])) + test.AssertAddressesEqual(t, addrs[20:30], ab.Addrs(ids[1])) + test.AssertAddressesEqual(t, addrs[30:40], ab.Addrs(ids[2])) + test.AssertAddressesEqual(t, addrs[40:], ab.Addrs(ids[3])) +} + func BenchmarkLookaheadCycle(b *testing.B) { ids := test.GeneratePeerIDs(100) addrs := test.GenerateAddrs(100) diff --git a/pstoreds/peerstore.go b/pstoreds/peerstore.go index 71378d0..7f4a1a7 100644 --- a/pstoreds/peerstore.go +++ b/pstoreds/peerstore.go @@ -18,26 +18,30 @@ type Options struct { // The size of the in-memory cache. A value of 0 or lower disables the cache. CacheSize uint - // Sweep interval to purge expired addresses from the datastore. + // Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run + // automatically, but it'll be available on demand via explicit calls. GCPurgeInterval time.Duration - // Interval to renew the GC lookahead window. + // Interval to renew the GC lookahead window. If this is a zero value, lookahead will be disabled and we'll + // traverse the entire datastore for every purge cycle. GCLookaheadInterval time.Duration - // Initial delay before GC routines start. Intended to give the system time to initialise before starting GC. + // Initial delay before GC processes start. Intended to give the system breathing room to fully boot + // before starting GC. GCInitialDelay time.Duration } // DefaultOpts returns the default options for a persistent peerstore: -// * Cache size: 1024 -// * GC prune interval: 5 minutes -// * GC lookahead interval: 12 hours -// * WriteRetries: 5 +// +// * Cache size: 1024. +// * GC purge interval: 10 minutes. +// * GC lookahead interval: disabled. +// * GC initial delay: 60 seconds. func DefaultOpts() Options { return Options{ CacheSize: 1024, - GCPurgeInterval: 5 * time.Minute, - GCLookaheadInterval: 12 * time.Hour, + GCPurgeInterval: 10 * time.Minute, + GCLookaheadInterval: 0, GCInitialDelay: 60 * time.Second, } } @@ -88,12 +92,11 @@ func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.R return peer.IDSlice{}, nil } - ids := make(peer.IDSlice, len(idset)) - i := 0 + ids := make(peer.IDSlice, 0, len(idset)) for id := range idset { pid, _ := base32.RawStdEncoding.DecodeString(id) - ids[i], _ = peer.IDFromBytes(pid) - i++ + id, _ := peer.IDFromBytes(pid) + ids = append(ids, id) } return ids, nil } diff --git a/test/addr_book_suite.go b/test/addr_book_suite.go index 94f8fb5..c01fe98 100644 --- a/test/addr_book_suite.go +++ b/test/addr_book_suite.go @@ -5,7 +5,6 @@ import ( "time" pstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" ) var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){ @@ -44,7 +43,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { ab.AddAddr(id, addrs[0], time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("idempotent add single address", func(t *testing.T) { @@ -54,7 +53,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { ab.AddAddr(id, addrs[0], time.Hour) ab.AddAddr(id, addrs[0], time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("add multiple addresses", func(t *testing.T) { @@ -62,7 +61,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { addrs := GenerateAddrs(3) ab.AddAddrs(id, addrs, time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("idempotent add multiple addresses", func(t *testing.T) { @@ -72,7 +71,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { ab.AddAddrs(id, addrs, time.Hour) ab.AddAddrs(id, addrs, time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("adding an existing address with a later expiration extends its ttl", func(t *testing.T) { @@ -86,7 +85,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { // after the initial TTL has expired, check that only the third address is present. time.Sleep(1200 * time.Millisecond) - testHas(t, addrs[2:], ab.Addrs(id)) + AssertAddressesEqual(t, addrs[2:], ab.Addrs(id)) }) t.Run("adding an existing address with an earlier expiration is noop", func(t *testing.T) { @@ -101,7 +100,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { // after the initial TTL has expired, check that all three addresses are still present (i.e. the TTL on // the modified one was not shortened). time.Sleep(2100 * time.Millisecond) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) } } @@ -114,16 +113,16 @@ func testClearWorks(ab pstore.AddrBook) func(t *testing.T) { ab.AddAddrs(ids[0], addrs[0:3], time.Hour) ab.AddAddrs(ids[1], addrs[3:], time.Hour) - testHas(t, addrs[0:3], ab.Addrs(ids[0])) - testHas(t, addrs[3:], ab.Addrs(ids[1])) + AssertAddressesEqual(t, addrs[0:3], ab.Addrs(ids[0])) + AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1])) ab.ClearAddrs(ids[0]) - testHas(t, nil, ab.Addrs(ids[0])) - testHas(t, addrs[3:], ab.Addrs(ids[1])) + AssertAddressesEqual(t, nil, ab.Addrs(ids[0])) + AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1])) ab.ClearAddrs(ids[1]) - testHas(t, nil, ab.Addrs(ids[0])) - testHas(t, nil, ab.Addrs(ids[1])) + AssertAddressesEqual(t, nil, ab.Addrs(ids[0])) + AssertAddressesEqual(t, nil, ab.Addrs(ids[1])) } } @@ -133,7 +132,7 @@ func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) { addrs := GenerateAddrs(100) m.SetAddrs(id, addrs, time.Hour) - testHas(t, addrs, m.Addrs(id)) + AssertAddressesEqual(t, addrs, m.Addrs(id)) // remove two addresses. m.SetAddr(id, addrs[50], -1) @@ -143,7 +142,7 @@ func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) { survivors := append(addrs[0:50], addrs[51:]...) survivors = append(survivors[0:74], survivors[75:]...) - testHas(t, survivors, m.Addrs(id)) + AssertAddressesEqual(t, survivors, m.Addrs(id)) } } @@ -167,8 +166,8 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { m.SetAddr(ids[1], addrs2[1], time.Minute) // Sanity check. - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs1[0]. // Badger does not support subsecond TTLs. @@ -176,26 +175,26 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { m.UpdateAddrs(ids[0], time.Hour, 1*time.Second) // No immediate effect. - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) // After a wait, addrs[0] is gone. time.Sleep(1500 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs2[0]. m.UpdateAddrs(ids[1], time.Hour, 1*time.Second) // No immediate effect. - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) time.Sleep(1500 * time.Millisecond) // First addrs is gone in both. - testHas(t, addrs1[1:], m.Addrs(ids[0])) - testHas(t, addrs2[1:], m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1])) }) } @@ -219,39 +218,39 @@ func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) { m.AddAddrs(ids[0], addrs1, time.Hour) m.AddAddrs(ids[1], addrs2, time.Hour) - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.AddAddrs(ids[0], addrs1, 2*time.Hour) m.AddAddrs(ids[1], addrs2, 2*time.Hour) - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:3], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:3], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2[1:], m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1])) m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, nil, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, nil, m.Addrs(ids[1])) m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, nil, m.Addrs(ids[0])) - testHas(t, nil, m.Addrs(ids[1])) + AssertAddressesEqual(t, nil, m.Addrs(ids[0])) + AssertAddressesEqual(t, nil, m.Addrs(ids[1])) } } @@ -307,25 +306,3 @@ func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) { }) } } - -func testHas(t *testing.T, exp, act []ma.Multiaddr) { - t.Helper() - if len(exp) != len(act) { - t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act)) - } - - for _, a := range exp { - found := false - - for _, b := range act { - if a.Equal(b) { - found = true - break - } - } - - if !found { - t.Fatalf("expected address %s not found", a) - } - } -} diff --git a/test/utils.go b/test/utils.go index 15b73c8..d7af3ab 100644 --- a/test/utils.go +++ b/test/utils.go @@ -72,3 +72,25 @@ func GeneratePeerIDs(count int) []peer.ID { } return ids } + +func AssertAddressesEqual(t *testing.T, exp, act []ma.Multiaddr) { + t.Helper() + if len(exp) != len(act) { + t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act)) + } + + for _, a := range exp { + found := false + + for _, b := range act { + if a.Equal(b) { + found = true + break + } + } + + if !found { + t.Fatalf("expected address %s not found", a) + } + } +}