improve code readability.

This commit is contained in:
Raúl Kripalani 2019-02-05 16:52:27 +00:00
parent 0baddf577e
commit d88042c422
3 changed files with 80 additions and 86 deletions

View File

@ -30,6 +30,7 @@ const (
var ( var (
log = logging.Logger("peerstore/ds") log = logging.Logger("peerstore/ds")
// Peer addresses are stored under the following db key pattern: // Peer addresses are stored under the following db key pattern:
// /peers/addrs/<b32 peer id no padding> // /peers/addrs/<b32 peer id no padding>
addrBookBase = ds.NewKey("/peers/addrs") addrBookBase = ds.NewKey("/peers/addrs")
@ -47,9 +48,9 @@ type addrsRecord struct {
dirty bool dirty bool
} }
// Flush writes the record to the datastore by calling ds.Put, unless the record is // flush writes the record to the datastore by calling ds.Put, unless the record is
// marked for deletion, in which case the deletion is executed via ds.Delete. // marked for deletion, in which case we call ds.Delete.
func (r *addrsRecord) Flush(ds dsWriter) (err error) { func (r *addrsRecord) flush(ds dsWriter) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID))) key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID)))
if len(r.Addrs) == 0 { if len(r.Addrs) == 0 {
return ds.Delete(key) return ds.Delete(key)
@ -93,7 +94,7 @@ func (r *addrsRecord) Clean() (chgd bool) {
if len(r.Addrs) == 0 { if len(r.Addrs) == 0 {
// this is a ghost record; let's signal it has to be written. // this is a ghost record; let's signal it has to be written.
// Flush() will take care of doing the deletion. // flush() will take care of doing the deletion.
return true return true
} }
@ -151,7 +152,7 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd
} }
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
mgr := &dsAddrBook{ ab = &dsAddrBook{
ctx: ctx, ctx: ctx,
cancelFn: cancelFn, cancelFn: cancelFn,
opts: opts, opts: opts,
@ -161,10 +162,11 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd
flushJobCh: make(chan *addrsRecord, 32), flushJobCh: make(chan *addrsRecord, 32),
} }
// kick off periodic GC. // kick off background processes.
go mgr.background() go ab.flusher()
go ab.gc()
return mgr, nil return ab, nil
} }
func (ab *dsAddrBook) Close() { func (ab *dsAddrBook) Close() {
@ -176,7 +178,7 @@ func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) {
select { select {
case ab.flushJobCh <- pr: case ab.flushJobCh <- pr:
default: default:
log.Warningf("flush queue is full; could not flush peer %v", pr.Id.ID.Pretty()) log.Warningf("flush queue is full; could not flush record for peer %s", pr.Id.ID)
} }
} }
@ -184,9 +186,9 @@ func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) {
// datastore upon a miss, and returning a newly initialized record if the peer doesn't exist. // datastore upon a miss, and returning a newly initialized record if the peer doesn't exist.
// //
// loadRecord calls Clean() on the record before returning it. If the record changes // loadRecord calls Clean() on the record before returning it. If the record changes
// as a result and `update=true`, an async flush is scheduled. // as a result and the update argument is true, an async flush is queued.
// //
// If `cache=true`, the record is inserted in the cache when loaded from the datastore. // If the cache argument is true, the record is inserted in the cache when loaded from the datastore.
func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) { func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) {
if e, ok := ab.cache.Get(id); ok { if e, ok := ab.cache.Get(id); ok {
pr = e.(*addrsRecord) pr = e.(*addrsRecord)
@ -196,23 +198,22 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
return pr, nil return pr, nil
} }
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id))) key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
data, err := ab.ds.Get(key) data, err := ab.ds.Get(key)
if err != nil && err != ds.ErrNotFound { switch err {
return nil, err case ds.ErrNotFound:
} pr.Id = &pb.ProtoPeerID{ID: id}
case nil:
if err == nil {
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
if err = pr.Unmarshal(data); err != nil { if err = pr.Unmarshal(data); err != nil {
return nil, err return nil, err
} }
if pr.Clean() && update { if pr.Clean() && update {
ab.asyncFlush(pr) ab.asyncFlush(pr)
} }
} else { default:
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{Id: &pb.ProtoPeerID{ID: id}}} return nil, err
} }
if cache { if cache {
@ -221,64 +222,59 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
return pr, nil return pr, nil
} }
// background runs the housekeeping process that takes care of: // flusher is a goroutine that takes care of persisting asynchronous flushes to the datastore.
// func (ab *dsAddrBook) flusher() {
// * GCing expired addresses from the datastore at regular intervals. ab.closeDone.Add(1)
// * persisting asynchronous flushes to the datastore. for {
func (ab *dsAddrBook) background() {
// goroutine that takes care of executing flush jobs.
go func() {
ab.closeDone.Add(1)
for {
select {
case fj := <-ab.flushJobCh:
if cached, ok := ab.cache.Peek(fj.Id.ID); ok {
// Only continue flushing if the record we have in memory is the same as for which the flush
// job was requested. If it's not in memory, it has been evicted and we don't know if we hold
// the latest state or not. Similarly, if it's cached but the pointer is different, it means
// it was evicted and has been reloaded, so we're also uncertain if we hold the latest state.
if pr := cached.(*addrsRecord); pr == fj {
pr.RLock()
pr.Flush(ab.ds)
pr.RUnlock()
}
}
case <-ab.ctx.Done():
ab.closeDone.Done()
return
}
}
}()
// goroutine that takes care of GC.
go func() {
select { select {
case <-time.After(ab.opts.GCInitialDelay): case fj := <-ab.flushJobCh:
if cached, ok := ab.cache.Peek(fj.Id.ID); ok {
// Only continue flushing if the record we have in memory is the same as for which the flush
// job was requested. If it's not in memory, it has been evicted and we don't know if we hold
// the latest state or not. Similarly, if it's cached but the pointer is different, it means
// it was evicted and has been reloaded, so we're also uncertain if we hold the latest state.
if pr := cached.(*addrsRecord); pr != fj {
pr.RLock()
pr.flush(ab.ds)
pr.RUnlock()
}
}
case <-ab.ctx.Done(): case <-ab.ctx.Done():
// yield if we have been cancelled/closed before the delay elapses. ab.closeDone.Done()
return return
} }
}
}
ab.closeDone.Add(1) // gc is a goroutine that prunes expired addresses from the datastore at regular intervals.
purgeTimer := time.NewTicker(ab.opts.GCPurgeInterval) func (ab *dsAddrBook) gc() {
lookaheadTimer := time.NewTicker(ab.opts.GCLookaheadInterval) select {
case <-time.After(ab.opts.GCInitialDelay):
case <-ab.ctx.Done():
// yield if we have been cancelled/closed before the delay elapses.
return
}
for { ab.closeDone.Add(1)
select { purgeTimer := time.NewTicker(ab.opts.GCPurgeInterval)
case <-purgeTimer.C: lookaheadTimer := time.NewTicker(ab.opts.GCLookaheadInterval)
ab.purgeCycle()
case <-lookaheadTimer.C: for {
ab.populateLookahead() select {
case <-purgeTimer.C:
ab.purgeCycle()
case <-ab.ctx.Done(): case <-lookaheadTimer.C:
purgeTimer.Stop() ab.populateLookahead()
lookaheadTimer.Stop()
ab.closeDone.Done() case <-ab.ctx.Done():
return purgeTimer.Stop()
} lookaheadTimer.Stop()
ab.closeDone.Done()
return
} }
}() }
} }
// AddAddr will add a new address if it's not already in the AddrBook. // AddAddr will add a new address if it's not already in the AddrBook.
@ -331,7 +327,7 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D
} }
if pr.Clean() { if pr.Clean() {
pr.Flush(ab.ds) pr.flush(ab.ds)
} }
} }
@ -391,7 +387,7 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
defer pr.Unlock() defer pr.Unlock()
newExp := time.Now().Add(ttl).Unix() newExp := time.Now().Add(ttl).Unix()
existed := make([]bool, len(addrs)) // keeps track of which addrs we found existed := make([]bool, len(addrs)) // keeps track of which addrs we found.
Outer: Outer:
for i, incoming := range addrs { for i, incoming := range addrs {
@ -431,7 +427,7 @@ Outer:
pr.Addrs = append(pr.Addrs, added...) pr.Addrs = append(pr.Addrs, added...)
pr.dirty = true pr.dirty = true
pr.Clean() pr.Clean()
return pr.Flush(ab.ds) return pr.flush(ab.ds)
} }
func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) { func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
@ -464,7 +460,7 @@ func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
pr.dirty = true pr.dirty = true
pr.Clean() pr.Clean()
return pr.Flush(ab.ds) return pr.flush(ab.ds)
} }
func cleanAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { func cleanAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {

View File

@ -35,10 +35,11 @@ var (
} }
) )
// cyclicBatch is similar to go-datastore autobatch, but it's driven by an actual Batch facility offered by the // cyclicBatch buffers datastore write operations and automatically flushes them after gcOpsPerBatch (20) have been
// datastore. It populates an ongoing batch with operations and automatically flushes it after N pending operations // queued. An explicit `Commit()` closes this cyclic batch, erroring all further operations.
// have been reached. `N` is currently hardcoded to 20. An explicit `Commit()` closes this cyclic batch, erroring all //
// further operations. // It is similar to go-datastore autobatch, but it's driven by an actual Batch facility offered by the
// datastore.
type cyclicBatch struct { type cyclicBatch struct {
ds.Batch ds.Batch
ds ds.Batching ds ds.Batching
@ -99,11 +100,8 @@ func (cb *cyclicBatch) Commit() error {
return nil return nil
} }
// purgeCycle runs a single GC cycle, operating within the lookahead window. // purgeCycle runs a single GC purge cycle. It operates within the lookahead window if lookahead is enabled; else it
// // visits all entries in the datastore, deleting the addresses that have expired.
// It scans the lookahead region for entries that need to be visited, and performs a Clean() on them. An errors trigger
// the removal of the GC entry, in order to prevent unactionable items from accumulating. If the error happened to be
// temporary, the entry will be revisited in the next lookahead window.
func (ab *dsAddrBook) purgeCycle() { func (ab *dsAddrBook) purgeCycle() {
if atomic.LoadInt32(&ab.gcLookaheadRunning) > 0 { if atomic.LoadInt32(&ab.gcLookaheadRunning) > 0 {
// yield if lookahead is running. // yield if lookahead is running.
@ -111,7 +109,7 @@ func (ab *dsAddrBook) purgeCycle() {
} }
var id peer.ID var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(ab.ds) batch, err := newCyclicBatch(ab.ds)
if err != nil { if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err) log.Warningf("failed while creating batch to purge GC entries: %v", err)
@ -187,7 +185,7 @@ func (ab *dsAddrBook) purgeCycle() {
cached := e.(*addrsRecord) cached := e.(*addrsRecord)
cached.Lock() cached.Lock()
if cached.Clean() { if cached.Clean() {
if err = cached.Flush(batch); err != nil { if err = cached.flush(batch); err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err) log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
} }
} }
@ -212,7 +210,7 @@ func (ab *dsAddrBook) purgeCycle() {
continue continue
} }
if record.Clean() { if record.Clean() {
err = record.Flush(batch) err = record.flush(batch)
if err != nil { if err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err) log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
} }

View File

@ -90,8 +90,8 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
}) })
t.Run("adding an existing address with an earlier expiration is noop", func(t *testing.T) { t.Run("adding an existing address with an earlier expiration is noop", func(t *testing.T) {
id := generatePeerIds(1)[0] id := GeneratePeerIDs(1)[0]
addrs := generateAddrs(3) addrs := GenerateAddrs(3)
ab.AddAddrs(id, addrs, time.Hour) ab.AddAddrs(id, addrs, time.Hour)