improve docs; cyclic batch arg; validations.

This commit is contained in:
Raúl Kripalani 2019-02-06 14:35:39 +00:00
parent fec786e9ef
commit 3966498df4
3 changed files with 50 additions and 29 deletions

View File

@ -31,7 +31,7 @@ const (
var (
log = logging.Logger("peerstore/ds")
// Peer addresses are stored under the following db key pattern:
// Peer addresses are stored db key pattern:
// /peers/addrs/<b32 peer id no padding>
addrBookBase = ds.NewKey("/peers/addrs")
)
@ -44,7 +44,7 @@ type addrsRecord struct {
}
// flush writes the record to the datastore by calling ds.Put, unless the record is
// marked for deletion, in which case we call ds.Delete.
// marked for deletion, in which case we call ds.Delete. To be called within a lock.
func (r *addrsRecord) flush(write ds.Write) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID)))
if len(r.Addrs) == 0 {
@ -63,20 +63,19 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
return nil
}
// Clean is called on records to perform housekeeping. The return value signals if the record was changed
// as a result of the cleaning.
// Clean is called on records to perform housekeeping. The return value indicates if the record was changed
// as a result of this call.
//
// Clean does the following:
// * sorts the addresses by expiration (soonest expiring first).
// * removes the addresses that have expired.
// * sorts addresses by expiration (soonest expiring first).
// * removes expired addresses.
//
// It short-circuits optimistically when we know there's nothing to do.
// It short-circuits optimistically when there's nothing to do.
//
// Clean is called from several points:
// * when accessing and loading an entry.
// * when accessing an entry.
// * when performing periodic GC.
// * after an entry has been modified (e.g. addresses have been added or removed,
// TTLs updated, etc.)
// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.)
//
// If the return value is true, the caller can perform a flush immediately, or can schedule an async
// flush, depending on the context.
@ -101,8 +100,8 @@ func (r *addrsRecord) Clean() (chgd bool) {
})
}
// since addresses are sorted by expiration, we find the first survivor and split the
// slice on its index.
// since addresses are sorted by expiration, we find the first
// survivor and split the slice on its index.
pivot := -1
for i, addr := range r.Addrs {
if addr.Expiry > now {
@ -115,8 +114,8 @@ func (r *addrsRecord) Clean() (chgd bool) {
return r.dirty || pivot >= 0
}
// dsAddrBook is an address book backed by a Datastore with a GC-like procedure
// to purge expired entries. It uses an in-memory address stream manager.
// dsAddrBook is an address book backed by a Datastore with a GC procedure to purge expired entries. It uses an
// in-memory address stream manager. See the NewAddrBook for more information.
type dsAddrBook struct {
ctx context.Context
opts Options
@ -133,8 +132,25 @@ type dsAddrBook struct {
var _ pstore.AddrBook = (*dsAddrBook)(nil)
// NewAddrBook initializes a new address book given a Datastore instance, a context for managing the TTL manager,
// and the interval at which the TTL manager should sweep the Datastore.
// NewAddrBook initializes a new datastore-backed address book. It serves as a drop-in replacement for pstoremem
// (memory-backed peerstore), and works with any datastore implementing the ds.Batching interface.
//
// Addresses and peer records are serialized into protobuf, storing one datastore entry per peer, along with metadata
// to control address expiration. To alleviate disk access and serde overhead, we internally use a read/write-through
// ARC cache, the size of which is adjustable via Options.CacheSize.
//
// The user has a choice of two GC algorithms:
//
// - lookahead GC: minimises the amount of full store traversals by maintaining a time-indexed list of entries that
// need to be visited within the period specified in Options.GCLookaheadInterval. This is useful in scenarios with
// considerable TTL variance, coupled with datastores whose native iterators return entries in lexicographical key
// order. Enable this mode by passing a value Options.GCLookaheadInterval > 0. Lookahead windows are jumpy, not
// sliding. Purges operate exclusively over the lookahead window with periodicity Options.GCPurgeInterval.
//
// - full-purge GC (default): performs a full visit of the store with periodicity Options.GCPurgeInterval. Useful when
// the range of possible TTL values is small and the values themselves are also extreme, e.g. 10 minutes or
// permanent, popular values used in other libp2p modules. In this cited case, optimizing with lookahead windows
// makes little sense.
func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAddrBook, err error) {
var cache cache = new(noopCache)
if opts.CacheSize > 0 {
@ -158,7 +174,6 @@ func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAd
return nil, err
}
// kick off background processes.
go ab.flusher()
return ab, nil
@ -180,7 +195,7 @@ func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) {
// loadRecord is a read-through fetch. It fetches a record from cache, falling back to the
// datastore upon a miss, and returning a newly initialized record if the peer doesn't exist.
//
// loadRecord calls Clean() on the record before returning it. If the record changes
// loadRecord calls Clean() on existing recordsrecord before returning it. If the record changes
// as a result and the update argument is true, an async flush is queued.
//
// If the cache argument is true, the record is inserted in the cache when loaded from the datastore.

View File

@ -16,13 +16,12 @@ import (
)
var (
// GC lookahead entries are stored in keys with pattern:
// GC lookahead entries are stored in key pattern:
// /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32> => nil
// in databases with lexicographical key order, this time-indexing allows us to visit
// only the timeslice we are interested in.
gcLookaheadBase = ds.NewKey("/peers/gc/addrs")
// in GC routines, how many operations do we place in a batch before it's committed.
gcOpsPerBatch = 20
// queries
purgeLookaheadQuery = query.Query{
Prefix: gcLookaheadBase.String(),
@ -43,7 +42,6 @@ var (
}
)
// dsAddrBookGc encapsulates the GC behaviour to maintain a datastore-backed address book.
type dsAddrBookGc struct {
ctx context.Context
ab *dsAddrBook
@ -63,6 +61,10 @@ func newAddressBookGc(ctx context.Context, ab *dsAddrBook) (*dsAddrBookGc, error
if ab.opts.GCInitialDelay < 0 {
return nil, fmt.Errorf("negative GC initial delay provided: %s", ab.opts.GCInitialDelay)
}
if ab.opts.GCLookaheadInterval > 0 && ab.opts.GCLookaheadInterval < ab.opts.GCPurgeInterval {
return nil, fmt.Errorf("lookahead interval must be larger than purge interval, respectively: %s, %s",
ab.opts.GCLookaheadInterval, ab.opts.GCPurgeInterval)
}
lookaheadEnabled := ab.opts.GCLookaheadInterval > 0
gc := &dsAddrBookGc{
@ -136,7 +138,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds)
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err)
}
@ -259,7 +261,7 @@ func (gc *dsAddrBookGc) purgeStore() {
}
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds)
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err)
}
@ -324,7 +326,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
}
defer results.Close()
batch, err := newCyclicBatch(gc.ab.ds)
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to populate lookahead GC window: %v", err)
return

View File

@ -6,18 +6,22 @@ import (
ds "github.com/ipfs/go-datastore"
)
// cyclicBatch buffers ds write operations and automatically flushes them after gcOpsPerBatch (20) have been
// how many operations are queued in a cyclic batch before we flush it.
var defaultOpsPerCyclicBatch = 20
// cyclicBatch buffers ds write operations and automatically flushes them after defaultOpsPerCyclicBatch (20) have been
// queued. An explicit `Commit()` closes this cyclic batch, erroring all further operations.
//
// It is similar to go-ds autobatch, but it's driven by an actual Batch facility offered by the
// ds.
type cyclicBatch struct {
threshold int
ds.Batch
ds ds.Batching
pending int
}
func newCyclicBatch(ds ds.Batching) (ds.Batch, error) {
func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) {
batch, err := ds.Batch()
if err != nil {
return nil, err
@ -29,7 +33,7 @@ func (cb *cyclicBatch) cycle() (err error) {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if cb.pending < gcOpsPerBatch {
if cb.pending < cb.threshold {
// we haven't reached the threshold yet.
return nil
}