Update AddrManagerDatastore to require a Batching Datastore

This commit is contained in:
Cole Brown 2018-06-15 13:46:35 -04:00
parent 3778829de8
commit 4a521e2e3b
3 changed files with 53 additions and 19 deletions

View File

@ -12,13 +12,18 @@ import (
mh "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash"
) )
// DatastoreAddrManager is an address manager backed by a Datastore with both an
// in-memory TTL manager and an in-memory address stream manager.
type DatastoreAddrManager struct { type DatastoreAddrManager struct {
ds ds.Datastore ds ds.Batching
ttlManager *ttlmanager ttlManager *ttlmanager
subsManager *AddrSubManager subsManager *AddrSubManager
} }
func NewDatastoreAddrManager(ctx context.Context, ds ds.Datastore, ttlInterval time.Duration) *DatastoreAddrManager { // NewDatastoreAddrManager initializes a new DatastoreAddrManager given a
// Datastore instance, a context for managing the TTL manager, and the interval
// at which the TTL manager should sweep the Datastore.
func NewDatastoreAddrManager(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) *DatastoreAddrManager {
mgr := &DatastoreAddrManager{ mgr := &DatastoreAddrManager{
ds: ds, ds: ds,
ttlManager: newTTLManager(ctx, ds, ttlInterval), ttlManager: newTTLManager(ctx, ds, ttlInterval),
@ -27,6 +32,7 @@ func NewDatastoreAddrManager(ctx context.Context, ds ds.Datastore, ttlInterval t
return mgr return mgr
} }
// Stop will signal the TTL manager to stop and block until it returns.
func (mgr *DatastoreAddrManager) Stop() { func (mgr *DatastoreAddrManager) Stop() {
mgr.ttlManager.stop() mgr.ttlManager.stop()
} }
@ -44,24 +50,38 @@ func peerIDFromKey(key ds.Key) (peer.ID, error) {
return peer.IDB58Decode(idstring) return peer.IDB58Decode(idstring)
} }
// AddAddr will add a new address if it's not already in the AddrBook.
func (mgr *DatastoreAddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { func (mgr *DatastoreAddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
} }
// AddAddrs will add many new addresses if they're not already in the AddrBook.
func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 { if ttl <= 0 {
return return
} }
mgr.SetAddrs(p, addrs, ttl) mgr.setAddrs(p, addrs, ttl, true)
} }
// SetAddr will add or update the TTL of an address in the AddrBook.
func (mgr *DatastoreAddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { func (mgr *DatastoreAddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl) mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
} }
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mgr.setAddrs(p, addrs, ttl, false)
}
func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) {
var keys []ds.Key var keys []ds.Key
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
defer batch.Commit()
for _, addr := range addrs { for _, addr := range addrs {
if addr == nil { if addr == nil {
continue continue
@ -75,24 +95,31 @@ func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl t
keys = append(keys, key) keys = append(keys, key)
if ttl <= 0 { if ttl <= 0 {
mgr.ds.Delete(key) batch.Delete(key)
continue continue
} }
if has, err := mgr.ds.Has(key); err != nil || !has { has, err := mgr.ds.Has(key)
if err != nil || !has {
mgr.subsManager.BroadcastAddr(p, addr) mgr.subsManager.BroadcastAddr(p, addr)
} }
if err := mgr.ds.Put(key, addr.Bytes()); err != nil {
log.Error(err) if !has || !add {
if err := batch.Put(key, addr.Bytes()); err != nil {
log.Error(err)
}
} }
} }
mgr.ttlManager.setTTLs(keys, ttl) mgr.ttlManager.setTTLs(keys, ttl)
} }
// UpdateAddrs will update any addresses for a given peer and TTL combination to
// have a new TTL.
func (mgr *DatastoreAddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { func (mgr *DatastoreAddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
prefix := ds.NewKey(p.Pretty()) prefix := ds.NewKey(p.Pretty())
mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL) mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL)
} }
// Addrs Returns all of the non-expired addresses for a given peer.
func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr { func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr {
prefix := ds.NewKey(p.Pretty()) prefix := ds.NewKey(p.Pretty())
q := query.Query{Prefix: prefix.String()} q := query.Query{Prefix: prefix.String()}
@ -115,6 +142,7 @@ func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr {
return addrs return addrs
} }
// Peers returns all of the peer IDs for which the AddrBook has addresses.
func (mgr *DatastoreAddrManager) Peers() []peer.ID { func (mgr *DatastoreAddrManager) Peers() []peer.ID {
q := query.Query{} q := query.Query{}
results, err := mgr.ds.Query(q) results, err := mgr.ds.Query(q)
@ -140,11 +168,14 @@ func (mgr *DatastoreAddrManager) Peers() []peer.ID {
return ids return ids
} }
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := mgr.Addrs(p) initial := mgr.Addrs(p)
return mgr.subsManager.AddrStream(ctx, p, initial) return mgr.subsManager.AddrStream(ctx, p, initial)
} }
// ClearAddrs will delete all known addresses for a peer ID.
func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) { func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
prefix := ds.NewKey(p.Pretty()) prefix := ds.NewKey(p.Pretty())
q := query.Query{Prefix: prefix.String()} q := query.Query{Prefix: prefix.String()}
@ -153,9 +184,15 @@ func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
log.Error(err) log.Error(err)
return return
} }
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
defer batch.Commit()
for result := range results.Next() { for result := range results.Next() {
mgr.ds.Delete(ds.NewKey(result.Key)) batch.Delete(ds.NewKey(result.Key))
} }
mgr.ttlManager.clear(ds.NewKey(p.Pretty())) mgr.ttlManager.clear(ds.NewKey(p.Pretty()))
} }
@ -168,7 +205,7 @@ type ttlentry struct {
} }
type ttlmanager struct { type ttlmanager struct {
sync.Mutex sync.RWMutex
entries map[ds.Key]*ttlentry entries map[ds.Key]*ttlentry
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -180,7 +217,6 @@ type ttlmanager struct {
func newTTLManager(parent context.Context, d ds.Datastore, tick time.Duration) *ttlmanager { func newTTLManager(parent context.Context, d ds.Datastore, tick time.Duration) *ttlmanager {
ctx, cancel := context.WithCancel(parent) ctx, cancel := context.WithCancel(parent)
mgr := &ttlmanager{ mgr := &ttlmanager{
Mutex: sync.Mutex{},
entries: make(map[ds.Key]*ttlentry), entries: make(map[ds.Key]*ttlentry),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -212,8 +248,8 @@ func (mgr *ttlmanager) stop() {
// For internal use only // For internal use only
func (mgr *ttlmanager) tick() { func (mgr *ttlmanager) tick() {
mgr.Lock() mgr.RLock()
defer mgr.Unlock() defer mgr.RUnlock()
now := time.Now() now := time.Now()
for key, entry := range mgr.entries { for key, entry := range mgr.entries {

View File

@ -1,13 +1,11 @@
package peerstore package peerstore
import ( import (
"testing" "context"
"time"
"io/ioutil" "io/ioutil"
"os" "os"
"testing"
"context" "time"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-ds-badger" "github.com/ipfs/go-ds-badger"
@ -55,7 +53,7 @@ func testHas(t *testing.T, exp, act []ma.Multiaddr) {
} }
} }
func setupBadgerDatastore(t *testing.T) (datastore.Datastore, func()) { func setupBadgerDatastore(t *testing.T) (datastore.Batching, func()) {
dataPath, err := ioutil.TempDir(os.TempDir(), "badger") dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -204,7 +204,7 @@ func NewPeerstore() Peerstore {
// NewPeerstoreDatastore creates a threadsafe collection of peers backed by a // NewPeerstoreDatastore creates a threadsafe collection of peers backed by a
// Datastore to prevent excess memory pressure. // Datastore to prevent excess memory pressure.
func NewPeerstoreDatastore(ctx context.Context, ds datastore.Datastore) Peerstore { func NewPeerstoreDatastore(ctx context.Context, ds datastore.Batching) Peerstore {
return &peerstore{ return &peerstore{
keybook: newKeybook(), keybook: newKeybook(),
metrics: NewMetrics(), metrics: NewMetrics(),