From 8fa350be354e7ca915e66f56ed04b46cf5c49165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 30 Nov 2018 17:32:51 +0000 Subject: [PATCH] fix raciness in init logic; dedicated goroutines for GC and flushing. --- pstoreds/addr_book.go | 82 ++++++++++++++++++++++++------------------ pstoreds/ds_gc_test.go | 30 ++++++++++++++++ 2 files changed, 77 insertions(+), 35 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 018726f..f19910a 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -146,7 +146,7 @@ type dsAddrBook struct { flushJobCh chan *addrsRecord cancelFn func() - closedCh chan struct{} + closeDone sync.WaitGroup gcCurrWindowEnd int64 gcLookaheadRunning int32 @@ -173,7 +173,6 @@ func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab * ds: store, subsManager: pstoremem.NewAddrSubManager(), flushJobCh: make(chan *addrsRecord, 32), - closedCh: make(chan struct{}), } // kick off periodic GC. @@ -184,7 +183,7 @@ func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab * func (ab *dsAddrBook) Close() { ab.cancelFn() - <-ab.closedCh + ab.closeDone.Wait() } func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) { @@ -247,46 +246,59 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs // * GCing expired addresses from the datastore at regular intervals. // * persisting asynchronous flushes to the datastore. func (ab *dsAddrBook) background() { - // placeholder tickers. - pruneTimer, lookaheadTimer := new(time.Ticker), new(time.Ticker) - - // populate the tickers after the initial delay has passed. + // goroutine that takes care of executing flush jobs. go func() { - select { - case <-time.After(ab.opts.GCInitialDelay): - pruneTimer = time.NewTicker(ab.opts.GCPurgeInterval) - lookaheadTimer = time.NewTicker(ab.opts.GCLookaheadInterval) + ab.closeDone.Add(1) + for { + select { + case fj := <-ab.flushJobCh: + if cached, ok := ab.cache.Peek(fj.Id.ID); ok { + // Only continue flushing if the record we have in memory is the same as for which the flush + // job was requested. If it's not in memory, it has been evicted and we don't know if we hold + // the latest state or not. Similarly, if it's cached but the pointer is different, it means + // it was evicted and has been reloaded, so we're also uncertain if we hold the latest state. + if pr := cached.(*addrsRecord); pr == fj { + pr.RLock() + pr.Flush(ab.ds) + pr.RUnlock() + } + } + case <-ab.ctx.Done(): + ab.closeDone.Done() + return + } } }() - for { + // goroutine that takes care of GC. + go func() { select { - case fj := <-ab.flushJobCh: - if cached, ok := ab.cache.Peek(fj.Id.ID); ok { - // Only continue flushing if the record we have in memory is the same as for which the flush - // job was requested. If it's not in memory, it has been evicted and we don't know if we hold - // the latest state or not. Similarly, if it's cached but the pointer is different, it means - // it was evicted and has been reloaded, so we're also uncertain if we hold the latest state. - if pr := cached.(*addrsRecord); pr == fj { - pr.RLock() - pr.Flush(ab.ds) - pr.RUnlock() - } - } - - case <-pruneTimer.C: - ab.purgeCycle() - - case <-lookaheadTimer.C: - ab.populateLookahead() - + case <-time.After(ab.opts.GCInitialDelay): case <-ab.ctx.Done(): - pruneTimer.Stop() - lookaheadTimer.Stop() - close(ab.closedCh) + // yield if we have been cancelled/closed before the delay elapses. return } - } + + ab.closeDone.Add(1) + purgeTimer := time.NewTicker(ab.opts.GCPurgeInterval) + lookaheadTimer := time.NewTicker(ab.opts.GCLookaheadInterval) + + for { + select { + case <-purgeTimer.C: + ab.purgeCycle() + + case <-lookaheadTimer.C: + ab.populateLookahead() + + case <-ab.ctx.Done(): + purgeTimer.Stop() + lookaheadTimer.Stop() + ab.closeDone.Done() + return + } + } + }() } var purgeQuery = query.Query{Prefix: gcLookaheadBase.String(), KeysOnly: true} diff --git a/pstoreds/ds_gc_test.go b/pstoreds/ds_gc_test.go index 6de9539..02b65bd 100644 --- a/pstoreds/ds_gc_test.go +++ b/pstoreds/ds_gc_test.go @@ -142,3 +142,33 @@ func TestGCPurging(t *testing.T) { t.Errorf("expected remaining peer to be #3, got: %v, expected: %v", p, ids[3]) } } + +func TestGCDelay(t *testing.T) { + ids := test.GeneratePeerIDs(10) + addrs := test.GenerateAddrs(100) + + opts := DefaultOpts() + + opts.GCInitialDelay = 2 * time.Second + opts.GCLookaheadInterval = 2 * time.Second + opts.GCPurgeInterval = 6 * time.Hour + + factory := addressBookFactory(t, badgerStore, opts) + ab, closeFn := factory() + defer closeFn() + + tp := &testProbe{t, ab} + + ab.AddAddrs(ids[0], addrs, 1*time.Second) + + // immediately after we should be having no lookahead entries. + if i := tp.countLookaheadEntries(); i != 0 { + t.Errorf("expected no lookahead entries, got: %d", i) + } + + // delay + lookahead interval = 4 seconds + 2 seconds for some slack = 6 seconds. + <-time.After(6 * time.Second) + if i := tp.countLookaheadEntries(); i != 1 { + t.Errorf("expected 1 lookahead entry, got: %d", i) + } +}