mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2024-12-27 23:40:16 +08:00
fix raciness in init logic; dedicated goroutines for GC and flushing.
This commit is contained in:
parent
19eea773fa
commit
8fa350be35
@ -146,7 +146,7 @@ type dsAddrBook struct {
|
||||
|
||||
flushJobCh chan *addrsRecord
|
||||
cancelFn func()
|
||||
closedCh chan struct{}
|
||||
closeDone sync.WaitGroup
|
||||
|
||||
gcCurrWindowEnd int64
|
||||
gcLookaheadRunning int32
|
||||
@ -173,7 +173,6 @@ func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab *
|
||||
ds: store,
|
||||
subsManager: pstoremem.NewAddrSubManager(),
|
||||
flushJobCh: make(chan *addrsRecord, 32),
|
||||
closedCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// kick off periodic GC.
|
||||
@ -184,7 +183,7 @@ func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab *
|
||||
|
||||
func (ab *dsAddrBook) Close() {
|
||||
ab.cancelFn()
|
||||
<-ab.closedCh
|
||||
ab.closeDone.Wait()
|
||||
}
|
||||
|
||||
func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) {
|
||||
@ -247,46 +246,59 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
|
||||
// * GCing expired addresses from the datastore at regular intervals.
|
||||
// * persisting asynchronous flushes to the datastore.
|
||||
func (ab *dsAddrBook) background() {
|
||||
// placeholder tickers.
|
||||
pruneTimer, lookaheadTimer := new(time.Ticker), new(time.Ticker)
|
||||
|
||||
// populate the tickers after the initial delay has passed.
|
||||
// goroutine that takes care of executing flush jobs.
|
||||
go func() {
|
||||
select {
|
||||
case <-time.After(ab.opts.GCInitialDelay):
|
||||
pruneTimer = time.NewTicker(ab.opts.GCPurgeInterval)
|
||||
lookaheadTimer = time.NewTicker(ab.opts.GCLookaheadInterval)
|
||||
ab.closeDone.Add(1)
|
||||
for {
|
||||
select {
|
||||
case fj := <-ab.flushJobCh:
|
||||
if cached, ok := ab.cache.Peek(fj.Id.ID); ok {
|
||||
// Only continue flushing if the record we have in memory is the same as for which the flush
|
||||
// job was requested. If it's not in memory, it has been evicted and we don't know if we hold
|
||||
// the latest state or not. Similarly, if it's cached but the pointer is different, it means
|
||||
// it was evicted and has been reloaded, so we're also uncertain if we hold the latest state.
|
||||
if pr := cached.(*addrsRecord); pr == fj {
|
||||
pr.RLock()
|
||||
pr.Flush(ab.ds)
|
||||
pr.RUnlock()
|
||||
}
|
||||
}
|
||||
case <-ab.ctx.Done():
|
||||
ab.closeDone.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
// goroutine that takes care of GC.
|
||||
go func() {
|
||||
select {
|
||||
case fj := <-ab.flushJobCh:
|
||||
if cached, ok := ab.cache.Peek(fj.Id.ID); ok {
|
||||
// Only continue flushing if the record we have in memory is the same as for which the flush
|
||||
// job was requested. If it's not in memory, it has been evicted and we don't know if we hold
|
||||
// the latest state or not. Similarly, if it's cached but the pointer is different, it means
|
||||
// it was evicted and has been reloaded, so we're also uncertain if we hold the latest state.
|
||||
if pr := cached.(*addrsRecord); pr == fj {
|
||||
pr.RLock()
|
||||
pr.Flush(ab.ds)
|
||||
pr.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
case <-pruneTimer.C:
|
||||
ab.purgeCycle()
|
||||
|
||||
case <-lookaheadTimer.C:
|
||||
ab.populateLookahead()
|
||||
|
||||
case <-time.After(ab.opts.GCInitialDelay):
|
||||
case <-ab.ctx.Done():
|
||||
pruneTimer.Stop()
|
||||
lookaheadTimer.Stop()
|
||||
close(ab.closedCh)
|
||||
// yield if we have been cancelled/closed before the delay elapses.
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
ab.closeDone.Add(1)
|
||||
purgeTimer := time.NewTicker(ab.opts.GCPurgeInterval)
|
||||
lookaheadTimer := time.NewTicker(ab.opts.GCLookaheadInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-purgeTimer.C:
|
||||
ab.purgeCycle()
|
||||
|
||||
case <-lookaheadTimer.C:
|
||||
ab.populateLookahead()
|
||||
|
||||
case <-ab.ctx.Done():
|
||||
purgeTimer.Stop()
|
||||
lookaheadTimer.Stop()
|
||||
ab.closeDone.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var purgeQuery = query.Query{Prefix: gcLookaheadBase.String(), KeysOnly: true}
|
||||
|
@ -142,3 +142,33 @@ func TestGCPurging(t *testing.T) {
|
||||
t.Errorf("expected remaining peer to be #3, got: %v, expected: %v", p, ids[3])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGCDelay(t *testing.T) {
|
||||
ids := test.GeneratePeerIDs(10)
|
||||
addrs := test.GenerateAddrs(100)
|
||||
|
||||
opts := DefaultOpts()
|
||||
|
||||
opts.GCInitialDelay = 2 * time.Second
|
||||
opts.GCLookaheadInterval = 2 * time.Second
|
||||
opts.GCPurgeInterval = 6 * time.Hour
|
||||
|
||||
factory := addressBookFactory(t, badgerStore, opts)
|
||||
ab, closeFn := factory()
|
||||
defer closeFn()
|
||||
|
||||
tp := &testProbe{t, ab}
|
||||
|
||||
ab.AddAddrs(ids[0], addrs, 1*time.Second)
|
||||
|
||||
// immediately after we should be having no lookahead entries.
|
||||
if i := tp.countLookaheadEntries(); i != 0 {
|
||||
t.Errorf("expected no lookahead entries, got: %d", i)
|
||||
}
|
||||
|
||||
// delay + lookahead interval = 4 seconds + 2 seconds for some slack = 6 seconds.
|
||||
<-time.After(6 * time.Second)
|
||||
if i := tp.countLookaheadEntries(); i != 1 {
|
||||
t.Errorf("expected 1 lookahead entry, got: %d", i)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user