From d88042c4229f4cf52bd06504018502cd352a452b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 5 Feb 2019 16:52:27 +0000 Subject: [PATCH] improve code readability. --- pstoreds/addr_book.go | 140 +++++++++++++++++++-------------------- pstoreds/addr_book_gc.go | 22 +++--- test/addr_book_suite.go | 4 +- 3 files changed, 80 insertions(+), 86 deletions(-) diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 8ea007f..cff7a24 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -30,6 +30,7 @@ const ( var ( log = logging.Logger("peerstore/ds") + // Peer addresses are stored under the following db key pattern: // /peers/addrs/ addrBookBase = ds.NewKey("/peers/addrs") @@ -47,9 +48,9 @@ type addrsRecord struct { dirty bool } -// Flush writes the record to the datastore by calling ds.Put, unless the record is -// marked for deletion, in which case the deletion is executed via ds.Delete. -func (r *addrsRecord) Flush(ds dsWriter) (err error) { +// flush writes the record to the datastore by calling ds.Put, unless the record is +// marked for deletion, in which case we call ds.Delete. +func (r *addrsRecord) flush(ds dsWriter) (err error) { key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID))) if len(r.Addrs) == 0 { return ds.Delete(key) @@ -93,7 +94,7 @@ func (r *addrsRecord) Clean() (chgd bool) { if len(r.Addrs) == 0 { // this is a ghost record; let's signal it has to be written. - // Flush() will take care of doing the deletion. + // flush() will take care of doing the deletion. return true } @@ -151,7 +152,7 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd } ctx, cancelFn := context.WithCancel(ctx) - mgr := &dsAddrBook{ + ab = &dsAddrBook{ ctx: ctx, cancelFn: cancelFn, opts: opts, @@ -161,10 +162,11 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd flushJobCh: make(chan *addrsRecord, 32), } - // kick off periodic GC. - go mgr.background() + // kick off background processes. + go ab.flusher() + go ab.gc() - return mgr, nil + return ab, nil } func (ab *dsAddrBook) Close() { @@ -176,7 +178,7 @@ func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) { select { case ab.flushJobCh <- pr: default: - log.Warningf("flush queue is full; could not flush peer %v", pr.Id.ID.Pretty()) + log.Warningf("flush queue is full; could not flush record for peer %s", pr.Id.ID) } } @@ -184,9 +186,9 @@ func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) { // datastore upon a miss, and returning a newly initialized record if the peer doesn't exist. // // loadRecord calls Clean() on the record before returning it. If the record changes -// as a result and `update=true`, an async flush is scheduled. +// as a result and the update argument is true, an async flush is queued. // -// If `cache=true`, the record is inserted in the cache when loaded from the datastore. +// If the cache argument is true, the record is inserted in the cache when loaded from the datastore. func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) { if e, ok := ab.cache.Get(id); ok { pr = e.(*addrsRecord) @@ -196,23 +198,22 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs return pr, nil } + pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id))) data, err := ab.ds.Get(key) - if err != nil && err != ds.ErrNotFound { - return nil, err - } - - if err == nil { - pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} + switch err { + case ds.ErrNotFound: + pr.Id = &pb.ProtoPeerID{ID: id} + case nil: if err = pr.Unmarshal(data); err != nil { return nil, err } if pr.Clean() && update { ab.asyncFlush(pr) } - } else { - pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{Id: &pb.ProtoPeerID{ID: id}}} + default: + return nil, err } if cache { @@ -221,64 +222,59 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs return pr, nil } -// background runs the housekeeping process that takes care of: -// -// * GCing expired addresses from the datastore at regular intervals. -// * persisting asynchronous flushes to the datastore. -func (ab *dsAddrBook) background() { - // goroutine that takes care of executing flush jobs. - go func() { - ab.closeDone.Add(1) - for { - select { - case fj := <-ab.flushJobCh: - if cached, ok := ab.cache.Peek(fj.Id.ID); ok { - // Only continue flushing if the record we have in memory is the same as for which the flush - // job was requested. If it's not in memory, it has been evicted and we don't know if we hold - // the latest state or not. Similarly, if it's cached but the pointer is different, it means - // it was evicted and has been reloaded, so we're also uncertain if we hold the latest state. - if pr := cached.(*addrsRecord); pr == fj { - pr.RLock() - pr.Flush(ab.ds) - pr.RUnlock() - } - } - case <-ab.ctx.Done(): - ab.closeDone.Done() - return - } - } - }() - - // goroutine that takes care of GC. - go func() { +// flusher is a goroutine that takes care of persisting asynchronous flushes to the datastore. +func (ab *dsAddrBook) flusher() { + ab.closeDone.Add(1) + for { select { - case <-time.After(ab.opts.GCInitialDelay): + case fj := <-ab.flushJobCh: + if cached, ok := ab.cache.Peek(fj.Id.ID); ok { + // Only continue flushing if the record we have in memory is the same as for which the flush + // job was requested. If it's not in memory, it has been evicted and we don't know if we hold + // the latest state or not. Similarly, if it's cached but the pointer is different, it means + // it was evicted and has been reloaded, so we're also uncertain if we hold the latest state. + if pr := cached.(*addrsRecord); pr != fj { + pr.RLock() + pr.flush(ab.ds) + pr.RUnlock() + } + } + case <-ab.ctx.Done(): - // yield if we have been cancelled/closed before the delay elapses. + ab.closeDone.Done() return } + } +} - ab.closeDone.Add(1) - purgeTimer := time.NewTicker(ab.opts.GCPurgeInterval) - lookaheadTimer := time.NewTicker(ab.opts.GCLookaheadInterval) +// gc is a goroutine that prunes expired addresses from the datastore at regular intervals. +func (ab *dsAddrBook) gc() { + select { + case <-time.After(ab.opts.GCInitialDelay): + case <-ab.ctx.Done(): + // yield if we have been cancelled/closed before the delay elapses. + return + } - for { - select { - case <-purgeTimer.C: - ab.purgeCycle() + ab.closeDone.Add(1) + purgeTimer := time.NewTicker(ab.opts.GCPurgeInterval) + lookaheadTimer := time.NewTicker(ab.opts.GCLookaheadInterval) - case <-lookaheadTimer.C: - ab.populateLookahead() + for { + select { + case <-purgeTimer.C: + ab.purgeCycle() - case <-ab.ctx.Done(): - purgeTimer.Stop() - lookaheadTimer.Stop() - ab.closeDone.Done() - return - } + case <-lookaheadTimer.C: + ab.populateLookahead() + + case <-ab.ctx.Done(): + purgeTimer.Stop() + lookaheadTimer.Stop() + ab.closeDone.Done() + return } - }() + } } // AddAddr will add a new address if it's not already in the AddrBook. @@ -331,7 +327,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D } if pr.Clean() { - pr.Flush(ab.ds) + pr.flush(ab.ds) } } @@ -391,7 +387,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio defer pr.Unlock() newExp := time.Now().Add(ttl).Unix() - existed := make([]bool, len(addrs)) // keeps track of which addrs we found + existed := make([]bool, len(addrs)) // keeps track of which addrs we found. Outer: for i, incoming := range addrs { @@ -431,7 +427,7 @@ Outer: pr.Addrs = append(pr.Addrs, added...) pr.dirty = true pr.Clean() - return pr.Flush(ab.ds) + return pr.flush(ab.ds) } func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) { @@ -464,7 +460,7 @@ func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) { pr.dirty = true pr.Clean() - return pr.Flush(ab.ds) + return pr.flush(ab.ds) } func cleanAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { diff --git a/pstoreds/addr_book_gc.go b/pstoreds/addr_book_gc.go index 2c45b86..f175831 100644 --- a/pstoreds/addr_book_gc.go +++ b/pstoreds/addr_book_gc.go @@ -35,10 +35,11 @@ var ( } ) -// cyclicBatch is similar to go-datastore autobatch, but it's driven by an actual Batch facility offered by the -// datastore. It populates an ongoing batch with operations and automatically flushes it after N pending operations -// have been reached. `N` is currently hardcoded to 20. An explicit `Commit()` closes this cyclic batch, erroring all -// further operations. +// cyclicBatch buffers datastore write operations and automatically flushes them after gcOpsPerBatch (20) have been +// queued. An explicit `Commit()` closes this cyclic batch, erroring all further operations. +// +// It is similar to go-datastore autobatch, but it's driven by an actual Batch facility offered by the +// datastore. type cyclicBatch struct { ds.Batch ds ds.Batching @@ -99,11 +100,8 @@ func (cb *cyclicBatch) Commit() error { return nil } -// purgeCycle runs a single GC cycle, operating within the lookahead window. -// -// It scans the lookahead region for entries that need to be visited, and performs a Clean() on them. An errors trigger -// the removal of the GC entry, in order to prevent unactionable items from accumulating. If the error happened to be -// temporary, the entry will be revisited in the next lookahead window. +// purgeCycle runs a single GC purge cycle. It operates within the lookahead window if lookahead is enabled; else it +// visits all entries in the datastore, deleting the addresses that have expired. func (ab *dsAddrBook) purgeCycle() { if atomic.LoadInt32(&ab.gcLookaheadRunning) > 0 { // yield if lookahead is running. @@ -111,7 +109,7 @@ func (ab *dsAddrBook) purgeCycle() { } var id peer.ID - record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} + record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs. batch, err := newCyclicBatch(ab.ds) if err != nil { log.Warningf("failed while creating batch to purge GC entries: %v", err) @@ -187,7 +185,7 @@ func (ab *dsAddrBook) purgeCycle() { cached := e.(*addrsRecord) cached.Lock() if cached.Clean() { - if err = cached.Flush(batch); err != nil { + if err = cached.flush(batch); err != nil { log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err) } } @@ -212,7 +210,7 @@ func (ab *dsAddrBook) purgeCycle() { continue } if record.Clean() { - err = record.Flush(batch) + err = record.flush(batch) if err != nil { log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err) } diff --git a/test/addr_book_suite.go b/test/addr_book_suite.go index dfd720a..94f8fb5 100644 --- a/test/addr_book_suite.go +++ b/test/addr_book_suite.go @@ -90,8 +90,8 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { }) t.Run("adding an existing address with an earlier expiration is noop", func(t *testing.T) { - id := generatePeerIds(1)[0] - addrs := generateAddrs(3) + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(3) ab.AddAddrs(id, addrs, time.Hour)