pstore ds: make gc lookahead window optional.

This commit is contained in:
Raúl Kripalani 2019-02-06 09:35:10 +00:00
parent 9da98d7fcf
commit fec786e9ef
6 changed files with 229 additions and 97 deletions

View File

@ -154,11 +154,12 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd
flushJobCh: make(chan *addrsRecord, 32),
}
ab.gc = newAddressBookGc(ctx, ab)
if ab.gc, err = newAddressBookGc(ctx, ab); err != nil {
return nil, err
}
// kick off background processes.
go ab.flusher()
go ab.gc.background()
return ab, nil
}

View File

@ -24,12 +24,18 @@ var (
gcOpsPerBatch = 20
// queries
purgeQuery = query.Query{
purgeLookaheadQuery = query.Query{
Prefix: gcLookaheadBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: true,
}
purgeStoreQuery = query.Query{
Prefix: addrBookBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: false,
}
populateLookaheadQuery = query.Query{
Prefix: addrBookBase.String(),
Orders: []query.Order{query.OrderByKey{}},
@ -39,18 +45,45 @@ var (
// dsAddrBookGc encapsulates the GC behaviour to maintain a datastore-backed address book.
type dsAddrBookGc struct {
ctx context.Context
ab *dsAddrBook
running chan struct{}
currWindowEnd int64
ctx context.Context
ab *dsAddrBook
running chan struct{}
lookaheadEnabled bool
purgeFunc func()
currWindowEnd int64
}
func newAddressBookGc(ctx context.Context, ab *dsAddrBook) *dsAddrBookGc {
return &dsAddrBookGc{
ctx: ctx,
ab: ab,
running: make(chan struct{}, 1),
func newAddressBookGc(ctx context.Context, ab *dsAddrBook) (*dsAddrBookGc, error) {
if ab.opts.GCPurgeInterval < 0 {
return nil, fmt.Errorf("negative GC purge interval provided: %s", ab.opts.GCPurgeInterval)
}
if ab.opts.GCLookaheadInterval < 0 {
return nil, fmt.Errorf("negative GC lookahead interval provided: %s", ab.opts.GCLookaheadInterval)
}
if ab.opts.GCInitialDelay < 0 {
return nil, fmt.Errorf("negative GC initial delay provided: %s", ab.opts.GCInitialDelay)
}
lookaheadEnabled := ab.opts.GCLookaheadInterval > 0
gc := &dsAddrBookGc{
ctx: ctx,
ab: ab,
running: make(chan struct{}, 1),
lookaheadEnabled: lookaheadEnabled,
}
if lookaheadEnabled {
gc.purgeFunc = gc.purgeLookahead
} else {
gc.purgeFunc = gc.purgeStore
}
// do not start GC timers if purge is disabled; this GC can only be triggered manually.
if ab.opts.GCPurgeInterval > 0 {
go gc.background()
}
return gc, nil
}
// gc prunes expired addresses from the datastore at regular intervals. It should be spawned as a goroutine.
@ -69,7 +102,7 @@ func (gc *dsAddrBookGc) background() {
defer purgeTimer.Stop()
var lookaheadCh <-chan time.Time
if gc.ab.opts.GCLookaheadInterval > 0 {
if gc.lookaheadEnabled {
lookaheadTimer := time.NewTicker(gc.ab.opts.GCLookaheadInterval)
lookaheadCh = lookaheadTimer.C
defer lookaheadTimer.Stop()
@ -78,7 +111,7 @@ func (gc *dsAddrBookGc) background() {
for {
select {
case <-purgeTimer.C:
gc.purgeCycle()
gc.purgeFunc()
case <-lookaheadCh:
// will never trigger if lookahead is disabled (nil Duration).
@ -92,7 +125,7 @@ func (gc *dsAddrBookGc) background() {
// purgeCycle runs a single GC purge cycle. It operates within the lookahead window if lookahead is enabled; else it
// visits all entries in the datastore, deleting the addresses that have expired.
func (gc *dsAddrBookGc) purgeCycle() {
func (gc *dsAddrBookGc) purgeLookahead() {
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
@ -136,7 +169,7 @@ func (gc *dsAddrBookGc) purgeCycle() {
}
}
results, err := gc.ab.ds.Query(purgeQuery)
results, err := gc.ab.ds.Query(purgeLookaheadQuery)
if err != nil {
log.Warningf("failed while fetching entries to purge: %v", err)
return
@ -216,12 +249,62 @@ func (gc *dsAddrBookGc) purgeCycle() {
}
}
func (gc *dsAddrBookGc) purgeStore() {
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if lookahead is running.
return
}
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds)
if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err)
}
results, err := gc.ab.ds.Query(purgeStoreQuery)
if err != nil {
log.Warningf("failed while opening iterator: %v", err)
return
}
defer results.Close()
// keys: /peers/addrs/<peer ID b32>
for result := range results.Next() {
record.Reset()
if err = record.Unmarshal(result.Value); err != nil {
// TODO log
continue
}
id := record.Id.ID
if !record.Clean() {
continue
}
if err := record.flush(batch); err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id, err)
}
gc.ab.cache.Remove(id)
}
if err = batch.Commit(); err != nil {
log.Warningf("failed to commit GC purge batch: %v", err)
}
}
// populateLookahead populates the lookahead window by scanning the entire store and picking entries whose earliest
// expiration falls within the window period.
//
// Those entries are stored in the lookahead region in the store, indexed by the timestamp when they need to be
// visited, to facilitate temporal range scans.
func (gc *dsAddrBookGc) populateLookahead() {
if gc.ab.opts.GCLookaheadInterval == 0 {
return
}
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()

View File

@ -4,16 +4,17 @@ import (
"testing"
"time"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-peerstore/test"
query "github.com/ipfs/go-datastore/query"
pstore "github.com/libp2p/go-libp2p-peerstore"
test "github.com/libp2p/go-libp2p-peerstore/test"
ma "github.com/multiformats/go-multiaddr"
)
var lookaheadQuery = query.Query{Prefix: gcLookaheadBase.String(), KeysOnly: true}
type testProbe struct {
t *testing.T
ab peerstore.AddrBook
ab pstore.AddrBook
}
func (tp *testProbe) countLookaheadEntries() (i int) {
@ -119,7 +120,7 @@ func TestGCPurging(t *testing.T) {
}
<-time.After(2 * time.Second)
gc.purgeCycle()
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}
@ -128,13 +129,13 @@ func TestGCPurging(t *testing.T) {
tp.clearCache()
<-time.After(5 * time.Second)
gc.purgeCycle()
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}
<-time.After(5 * time.Second)
gc.purgeCycle()
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 GC lookahead entries, got: %v", i)
}
@ -176,6 +177,51 @@ func TestGCDelay(t *testing.T) {
}
}
func TestGCLookaheadDisabled(t *testing.T) {
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
opts := DefaultOpts()
// effectively disable automatic GC for this test.
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 0 // disable lookahead
opts.GCPurgeInterval = 9 * time.Hour
factory := addressBookFactory(t, badgerStore, opts)
ab, closeFn := factory()
defer closeFn()
tp := &testProbe{t, ab}
// four peers:
// ids[0] has 10 addresses, all of which expire in 500ms.
// ids[1] has 20 addresses; 50% expire in 500ms and 50% in 10 hours.
// ids[2] has 10 addresses; all expire in 10 hours.
// ids[3] has 60 addresses; all expire in 10 hours.
ab.AddAddrs(ids[0], addrs[:10], 500*time.Millisecond)
ab.AddAddrs(ids[1], addrs[10:20], 500*time.Millisecond)
ab.AddAddrs(ids[1], addrs[20:30], 10*time.Hour)
ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour)
ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour)
time.Sleep(100 * time.Millisecond)
if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no GC lookahead entries, got: %v", i)
}
time.Sleep(500 * time.Millisecond)
gc := ab.(*dsAddrBook).gc
gc.purgeFunc()
var empty []ma.Multiaddr
test.AssertAddressesEqual(t, empty, ab.Addrs(ids[0]))
test.AssertAddressesEqual(t, addrs[20:30], ab.Addrs(ids[1]))
test.AssertAddressesEqual(t, addrs[30:40], ab.Addrs(ids[2]))
test.AssertAddressesEqual(t, addrs[40:], ab.Addrs(ids[3]))
}
func BenchmarkLookaheadCycle(b *testing.B) {
ids := test.GeneratePeerIDs(100)
addrs := test.GenerateAddrs(100)

View File

@ -18,26 +18,30 @@ type Options struct {
// The size of the in-memory cache. A value of 0 or lower disables the cache.
CacheSize uint
// Sweep interval to purge expired addresses from the datastore.
// Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run
// automatically, but it'll be available on demand via explicit calls.
GCPurgeInterval time.Duration
// Interval to renew the GC lookahead window.
// Interval to renew the GC lookahead window. If this is a zero value, lookahead will be disabled and we'll
// traverse the entire datastore for every purge cycle.
GCLookaheadInterval time.Duration
// Initial delay before GC routines start. Intended to give the system time to initialise before starting GC.
// Initial delay before GC processes start. Intended to give the system breathing room to fully boot
// before starting GC.
GCInitialDelay time.Duration
}
// DefaultOpts returns the default options for a persistent peerstore:
// * Cache size: 1024
// * GC prune interval: 5 minutes
// * GC lookahead interval: 12 hours
// * WriteRetries: 5
//
// * Cache size: 1024.
// * GC purge interval: 10 minutes.
// * GC lookahead interval: disabled.
// * GC initial delay: 60 seconds.
func DefaultOpts() Options {
return Options{
CacheSize: 1024,
GCPurgeInterval: 5 * time.Minute,
GCLookaheadInterval: 12 * time.Hour,
GCPurgeInterval: 10 * time.Minute,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
}
}
@ -88,12 +92,11 @@ func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.R
return peer.IDSlice{}, nil
}
ids := make(peer.IDSlice, len(idset))
i := 0
ids := make(peer.IDSlice, 0, len(idset))
for id := range idset {
pid, _ := base32.RawStdEncoding.DecodeString(id)
ids[i], _ = peer.IDFromBytes(pid)
i++
id, _ := peer.IDFromBytes(pid)
ids = append(ids, id)
}
return ids, nil
}

View File

@ -5,7 +5,6 @@ import (
"time"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){
@ -44,7 +43,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
ab.AddAddr(id, addrs[0], time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("idempotent add single address", func(t *testing.T) {
@ -54,7 +53,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
ab.AddAddr(id, addrs[0], time.Hour)
ab.AddAddr(id, addrs[0], time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("add multiple addresses", func(t *testing.T) {
@ -62,7 +61,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
addrs := GenerateAddrs(3)
ab.AddAddrs(id, addrs, time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("idempotent add multiple addresses", func(t *testing.T) {
@ -72,7 +71,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
ab.AddAddrs(id, addrs, time.Hour)
ab.AddAddrs(id, addrs, time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("adding an existing address with a later expiration extends its ttl", func(t *testing.T) {
@ -86,7 +85,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
// after the initial TTL has expired, check that only the third address is present.
time.Sleep(1200 * time.Millisecond)
testHas(t, addrs[2:], ab.Addrs(id))
AssertAddressesEqual(t, addrs[2:], ab.Addrs(id))
})
t.Run("adding an existing address with an earlier expiration is noop", func(t *testing.T) {
@ -101,7 +100,7 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
// after the initial TTL has expired, check that all three addresses are still present (i.e. the TTL on
// the modified one was not shortened).
time.Sleep(2100 * time.Millisecond)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
}
}
@ -114,16 +113,16 @@ func testClearWorks(ab pstore.AddrBook) func(t *testing.T) {
ab.AddAddrs(ids[0], addrs[0:3], time.Hour)
ab.AddAddrs(ids[1], addrs[3:], time.Hour)
testHas(t, addrs[0:3], ab.Addrs(ids[0]))
testHas(t, addrs[3:], ab.Addrs(ids[1]))
AssertAddressesEqual(t, addrs[0:3], ab.Addrs(ids[0]))
AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1]))
ab.ClearAddrs(ids[0])
testHas(t, nil, ab.Addrs(ids[0]))
testHas(t, addrs[3:], ab.Addrs(ids[1]))
AssertAddressesEqual(t, nil, ab.Addrs(ids[0]))
AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1]))
ab.ClearAddrs(ids[1])
testHas(t, nil, ab.Addrs(ids[0]))
testHas(t, nil, ab.Addrs(ids[1]))
AssertAddressesEqual(t, nil, ab.Addrs(ids[0]))
AssertAddressesEqual(t, nil, ab.Addrs(ids[1]))
}
}
@ -133,7 +132,7 @@ func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) {
addrs := GenerateAddrs(100)
m.SetAddrs(id, addrs, time.Hour)
testHas(t, addrs, m.Addrs(id))
AssertAddressesEqual(t, addrs, m.Addrs(id))
// remove two addresses.
m.SetAddr(id, addrs[50], -1)
@ -143,7 +142,7 @@ func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) {
survivors := append(addrs[0:50], addrs[51:]...)
survivors = append(survivors[0:74], survivors[75:]...)
testHas(t, survivors, m.Addrs(id))
AssertAddressesEqual(t, survivors, m.Addrs(id))
}
}
@ -167,8 +166,8 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
m.SetAddr(ids[1], addrs2[1], time.Minute)
// Sanity check.
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
// Will only affect addrs1[0].
// Badger does not support subsecond TTLs.
@ -176,26 +175,26 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
m.UpdateAddrs(ids[0], time.Hour, 1*time.Second)
// No immediate effect.
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
// After a wait, addrs[0] is gone.
time.Sleep(1500 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
// Will only affect addrs2[0].
m.UpdateAddrs(ids[1], time.Hour, 1*time.Second)
// No immediate effect.
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
time.Sleep(1500 * time.Millisecond)
// First addrs is gone in both.
testHas(t, addrs1[1:], m.Addrs(ids[0]))
testHas(t, addrs2[1:], m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1]))
})
}
@ -219,39 +218,39 @@ func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) {
m.AddAddrs(ids[0], addrs1, time.Hour)
m.AddAddrs(ids[1], addrs2, time.Hour)
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.AddAddrs(ids[0], addrs1, 2*time.Hour)
m.AddAddrs(ids[1], addrs2, 2*time.Hour)
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:3], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:3], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2[1:], m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1]))
m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, nil, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, nil, m.Addrs(ids[0]))
testHas(t, nil, m.Addrs(ids[1]))
AssertAddressesEqual(t, nil, m.Addrs(ids[0]))
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
}
}
@ -307,25 +306,3 @@ func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) {
})
}
}
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
t.Helper()
if len(exp) != len(act) {
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatalf("expected address %s not found", a)
}
}
}

View File

@ -72,3 +72,25 @@ func GeneratePeerIDs(count int) []peer.ID {
}
return ids
}
func AssertAddressesEqual(t *testing.T, exp, act []ma.Multiaddr) {
t.Helper()
if len(exp) != len(act) {
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatalf("expected address %s not found", a)
}
}
}