Merge pull request #29 from libp2p/feat/persist_peerstore

Persist peerstore via Datastore
This commit is contained in:
bigs 2018-08-27 18:41:54 -04:00 committed by GitHub
commit bb3d4c617a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 734 additions and 117 deletions

View File

@ -7,8 +7,8 @@ import (
"sync"
"time"
peer "github.com/libp2p/go-libp2p-peer"
addr "github.com/libp2p/go-libp2p-peerstore/addr"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore/addr"
ma "github.com/multiformats/go-multiaddr"
)
@ -60,7 +60,7 @@ type AddrManager struct {
addrmu sync.Mutex // guards addrs
addrs map[peer.ID]addrSlice
addrSubs map[peer.ID][]*addrSub
subManager *AddrSubManager
}
// ensures the AddrManager is initialized.
@ -69,8 +69,8 @@ func (mgr *AddrManager) init() {
if mgr.addrs == nil {
mgr.addrs = make(map[peer.ID]addrSlice)
}
if mgr.addrSubs == nil {
mgr.addrSubs = make(map[peer.ID][]*addrSub)
if mgr.subManager == nil {
mgr.subManager = NewAddrSubManager()
}
}
@ -114,8 +114,6 @@ func (mgr *AddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
amap[string(ea.Addr.Bytes())] = ea
}
subs := mgr.addrSubs[p]
// only expand ttls
exp := time.Now().Add(ttl)
for _, addr := range addrs {
@ -129,9 +127,7 @@ func (mgr *AddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
if !found || exp.After(a.Expires) {
amap[addrstr] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
for _, sub := range subs {
sub.pubAddr(addr)
}
mgr.subManager.BroadcastAddr(p, addr)
}
}
newAddrs := make([]expiringAddr, 0, len(amap))
@ -161,8 +157,6 @@ func (mgr *AddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
amap[string(ea.Addr.Bytes())] = ea
}
subs := mgr.addrSubs[p]
exp := time.Now().Add(ttl)
for _, addr := range addrs {
if addr == nil {
@ -175,9 +169,7 @@ func (mgr *AddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
if ttl > 0 {
amap[addrs] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
for _, sub := range subs {
sub.pubAddr(addr)
}
mgr.subManager.BroadcastAddr(p, addr)
} else {
delete(amap, addrs)
}
@ -248,7 +240,7 @@ func (mgr *AddrManager) Addrs(p peer.ID) []ma.Multiaddr {
return good
}
// ClearAddresses removes all previously stored addresses
// ClearAddrs removes all previously stored addresses
func (mgr *AddrManager) ClearAddrs(p peer.ID) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
@ -257,58 +249,86 @@ func (mgr *AddrManager) ClearAddrs(p peer.ID) {
delete(mgr.addrs, p)
}
func (mgr *AddrManager) removeSub(p peer.ID, s *addrSub) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
subs := mgr.addrSubs[p]
if len(subs) == 1 {
if subs[0] != s {
return
}
delete(mgr.addrSubs, p)
return
}
for i, v := range subs {
if v == s {
subs[i] = subs[len(subs)-1]
subs[len(subs)-1] = nil
mgr.addrSubs[p] = subs[:len(subs)-1]
return
}
}
}
type addrSub struct {
pubch chan ma.Multiaddr
lk sync.Mutex
buffer []ma.Multiaddr
ctx context.Context
}
func (s *addrSub) pubAddr(a ma.Multiaddr) {
select {
case s.pubch <- a:
case <-s.ctx.Done():
}
}
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
mgr.init()
sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}
out := make(chan ma.Multiaddr)
mgr.addrSubs[p] = append(mgr.addrSubs[p], sub)
baseaddrslice := mgr.addrs[p]
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
for _, a := range baseaddrslice {
initial = append(initial, a.Addr)
}
return mgr.subManager.AddrStream(ctx, p, initial)
}
// An abstracted, pub-sub manager for address streams. Extracted from
// AddrManager in order to support additional implementations.
type AddrSubManager struct {
mu sync.RWMutex
subs map[peer.ID][]*addrSub
}
// NewAddrSubManager initializes an AddrSubManager.
func NewAddrSubManager() *AddrSubManager {
return &AddrSubManager{
subs: make(map[peer.ID][]*addrSub),
}
}
// Used internally by the address stream coroutine to remove a subscription
// from the manager.
func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
subs := mgr.subs[p]
if len(subs) == 1 {
if subs[0] != s {
return
}
delete(mgr.subs, p)
return
}
for i, v := range subs {
if v == s {
subs[i] = subs[len(subs)-1]
subs[len(subs)-1] = nil
mgr.subs[p] = subs[:len(subs)-1]
return
}
}
}
// BroadcastAddr broadcasts a new address to all subscribed streams.
func (mgr *AddrSubManager) BroadcastAddr(p peer.ID, addr ma.Multiaddr) {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
if subs, ok := mgr.subs[p]; ok {
for _, sub := range subs {
sub.pubAddr(addr)
}
}
}
// AddrStream creates a new subscription for a given peer ID, pre-populating the
// channel with any addresses we might already have on file.
func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr {
sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}
out := make(chan ma.Multiaddr)
mgr.mu.Lock()
if _, ok := mgr.subs[p]; ok {
mgr.subs[p] = append(mgr.subs[p], sub)
} else {
mgr.subs[p] = []*addrSub{sub}
}
mgr.mu.Unlock()
sort.Sort(addr.AddrList(initial))
go func(buffer []ma.Multiaddr) {
@ -360,3 +380,17 @@ func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mul
return out
}
type addrSub struct {
pubch chan ma.Multiaddr
lk sync.Mutex
buffer []ma.Multiaddr
ctx context.Context
}
func (s *addrSub) pubAddr(a ma.Multiaddr) {
select {
case s.pubch <- a:
case <-s.ctx.Done():
}
}

380
addr_manager_ds.go Normal file
View File

@ -0,0 +1,380 @@
package peerstore
import (
"context"
"sync"
"time"
"github.com/hashicorp/golang-lru"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
)
// Number of times to retry transactional writes
var dsWriteRetries = 5
// DatastoreAddrManager is an address manager backed by a Datastore with both an
// in-memory TTL manager and an in-memory address stream manager.
type DatastoreAddrManager struct {
cache *lru.ARCCache
ds ds.Batching
ttlManager *ttlmanager
subsManager *AddrSubManager
}
// NewDatastoreAddrManager initializes a new DatastoreAddrManager given a
// Datastore instance, a context for managing the TTL manager, and the interval
// at which the TTL manager should sweep the Datastore.
func NewDatastoreAddrManager(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) (*DatastoreAddrManager, error) {
cache, err := lru.NewARC(1024)
if err != nil {
return nil, err
}
mgr := &DatastoreAddrManager{
cache: cache,
ds: ds,
ttlManager: newTTLManager(ctx, ds, cache, ttlInterval),
subsManager: NewAddrSubManager(),
}
return mgr, nil
}
// Stop will signal the TTL manager to stop and block until it returns.
func (mgr *DatastoreAddrManager) Stop() {
mgr.ttlManager.cancel()
}
func peerAddressKey(p *peer.ID, addr *ma.Multiaddr) (ds.Key, error) {
hash, err := mh.Sum((*addr).Bytes(), mh.MURMUR3, -1)
if err != nil {
return ds.Key{}, nil
}
return ds.NewKey(peer.IDB58Encode(*p)).ChildString(hash.B58String()), nil
}
func peerIDFromKey(key ds.Key) (peer.ID, error) {
idstring := key.Parent().Name()
return peer.IDB58Decode(idstring)
}
// AddAddr will add a new address if it's not already in the AddrBook.
func (mgr *DatastoreAddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs will add many new addresses if they're not already in the AddrBook.
func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
return
}
mgr.setAddrs(p, addrs, ttl, true)
}
// SetAddr will add or update the TTL of an address in the AddrBook.
func (mgr *DatastoreAddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mgr.setAddrs(p, addrs, ttl, false)
}
func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) {
for i := 0; i < dsWriteRetries; i++ {
// keys to add to the TTL manager
var keys []ds.Key
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
for _, addr := range addrs {
if addr == nil {
continue
}
key, err := peerAddressKey(&p, &addr)
if err != nil {
log.Error(err)
continue
}
keys = append(keys, key)
if ttl <= 0 {
if err := batch.Delete(key); err != nil {
log.Error(err)
} else {
mgr.cache.Remove(key)
}
continue
}
has := mgr.cache.Contains(key)
if !has {
has, err = mgr.ds.Has(key)
}
if err != nil || !has {
mgr.subsManager.BroadcastAddr(p, addr)
}
// Allows us to support AddAddr and SetAddr in one function
if !has {
if err := batch.Put(key, addr.Bytes()); err != nil {
log.Error(err)
} else {
mgr.cache.Add(key, addr.Bytes())
}
}
}
if err := batch.Commit(); err != nil {
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
continue
}
mgr.ttlManager.setTTLs(keys, ttl, add)
return
}
log.Errorf("failed to avoid write conflict for peer %s after %d retries\n", p.Pretty(), dsWriteRetries)
}
// UpdateAddrs will update any addresses for a given peer and TTL combination to
// have a new TTL.
func (mgr *DatastoreAddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
prefix := ds.NewKey(p.Pretty())
mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL)
}
// Addrs Returns all of the non-expired addresses for a given peer.
func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr {
prefix := ds.NewKey(p.Pretty())
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
results, err := mgr.ds.Query(q)
if err != nil {
log.Error(err)
return nil
}
var addrs []ma.Multiaddr
for result := range results.Next() {
key := ds.RawKey(result.Key)
var addri interface{}
addri, ok := mgr.cache.Get(key)
if !ok {
addri, err = mgr.ds.Get(key)
if err != nil {
log.Error(err)
continue
}
}
addrbytes := addri.([]byte)
addr, err := ma.NewMultiaddrBytes(addrbytes)
if err != nil {
log.Error(err)
continue
}
addrs = append(addrs, addr)
}
return addrs
}
// Peers returns all of the peer IDs for which the AddrBook has addresses.
func (mgr *DatastoreAddrManager) Peers() []peer.ID {
q := query.Query{KeysOnly: true}
results, err := mgr.ds.Query(q)
if err != nil {
log.Error(err)
return []peer.ID{}
}
idset := make(map[peer.ID]struct{})
for result := range results.Next() {
key := ds.RawKey(result.Key)
id, err := peerIDFromKey(key)
if err != nil {
continue
}
idset[id] = struct{}{}
}
ids := make([]peer.ID, 0, len(idset))
for id := range idset {
ids = append(ids, id)
}
return ids
}
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := mgr.Addrs(p)
return mgr.subsManager.AddrStream(ctx, p, initial)
}
// ClearAddrs will delete all known addresses for a peer ID.
func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
prefix := ds.NewKey(p.Pretty())
for i := 0; i < dsWriteRetries; i++ {
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
results, err := mgr.ds.Query(q)
if err != nil {
log.Error(err)
return
}
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
for result := range results.Next() {
key := ds.NewKey(result.Key)
err := batch.Delete(key)
if err != nil {
// From inspectin badger, errors here signify a problem with
// the transaction as a whole, so we can log and abort.
log.Error(err)
return
}
mgr.cache.Remove(key)
}
if err = batch.Commit(); err != nil {
log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err)
continue
}
mgr.ttlManager.clear(ds.NewKey(p.Pretty()))
return
}
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries)
}
// ttlmanager
type ttlentry struct {
TTL time.Duration
ExpiresAt time.Time
}
type ttlmanager struct {
sync.RWMutex
entries map[ds.Key]*ttlentry
ctx context.Context
cancel context.CancelFunc
ticker *time.Ticker
ds ds.Batching
cache *lru.ARCCache
}
func newTTLManager(parent context.Context, d ds.Datastore, c *lru.ARCCache, tick time.Duration) *ttlmanager {
ctx, cancel := context.WithCancel(parent)
batching, ok := d.(ds.Batching)
if !ok {
panic("must construct ttlmanager with batching datastore")
}
mgr := &ttlmanager{
entries: make(map[ds.Key]*ttlentry),
ctx: ctx,
cancel: cancel,
ticker: time.NewTicker(tick),
ds: batching,
cache: c,
}
go func() {
for {
select {
case <-mgr.ctx.Done():
mgr.ticker.Stop()
return
case <-mgr.ticker.C:
mgr.tick()
}
}
}()
return mgr
}
// To be called by TTL manager's coroutine only.
func (mgr *ttlmanager) tick() {
mgr.Lock()
defer mgr.Unlock()
now := time.Now()
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
for key, entry := range mgr.entries {
if entry.ExpiresAt.Before(now) {
if err := batch.Delete(key); err != nil {
log.Error(err)
} else {
mgr.cache.Remove(key)
}
delete(mgr.entries, key)
}
}
err = batch.Commit()
if err != nil {
log.Error(err)
}
}
func (mgr *ttlmanager) setTTLs(keys []ds.Key, ttl time.Duration, add bool) {
mgr.Lock()
defer mgr.Unlock()
expiration := time.Now().Add(ttl)
for _, key := range keys {
update := true
if add {
if entry, ok := mgr.entries[key]; ok {
if entry.ExpiresAt.After(expiration) {
update = false
}
}
}
if update {
if ttl <= 0 {
delete(mgr.entries, key)
} else {
mgr.entries[key] = &ttlentry{TTL: ttl, ExpiresAt: expiration}
}
}
}
}
func (mgr *ttlmanager) updateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
mgr.Lock()
defer mgr.Unlock()
now := time.Now()
var keys []ds.Key
for key, entry := range mgr.entries {
if key.IsDescendantOf(prefix) && entry.TTL == oldTTL {
keys = append(keys, key)
entry.TTL = newTTL
entry.ExpiresAt = now.Add(newTTL)
}
}
}
func (mgr *ttlmanager) clear(prefix ds.Key) {
mgr.Lock()
defer mgr.Unlock()
for key := range mgr.entries {
if key.IsDescendantOf(prefix) {
delete(mgr.entries, key)
}
}
}

View File

@ -1,9 +1,14 @@
package peerstore
import (
"context"
"io/ioutil"
"os"
"testing"
"time"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-ds-badger"
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
@ -29,7 +34,7 @@ func MA(t *testing.T, m string) ma.Multiaddr {
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
t.Helper()
if len(exp) != len(act) {
t.Fatal("lengths not the same")
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
}
for _, a := range exp {
@ -48,8 +53,47 @@ func testHas(t *testing.T, exp, act []ma.Multiaddr) {
}
}
func TestAddresses(t *testing.T) {
func setupBadgerDatastore(t testing.TB) (datastore.Batching, func()) {
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
if err != nil {
t.Fatal(err)
}
ds, err := badger.NewDatastore(dataPath, nil)
if err != nil {
t.Fatal(err)
}
closer := func() {
ds.Close()
os.RemoveAll(dataPath)
}
return ds, closer
}
func setupDatastoreAddrManager(t *testing.T) (*DatastoreAddrManager, func()) {
ds, closeDB := setupBadgerDatastore(t)
mgr, err := NewDatastoreAddrManager(context.Background(), ds, 100*time.Microsecond)
if err != nil {
t.Fatal(err)
}
closer := func() {
mgr.Stop()
closeDB()
}
return mgr, closer
}
func runTestWithAddrManagers(t *testing.T, test func(*testing.T, AddrBook)) {
t.Log("AddrManager")
mgr1 := &AddrManager{}
test(t, mgr1)
t.Log("DatastoreAddrManager")
mgr2, closer2 := setupDatastoreAddrManager(t)
defer closer2()
test(t, mgr2)
}
func testAddresses(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
@ -73,7 +117,6 @@ func TestAddresses(t *testing.T) {
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
ttl := time.Hour
m := AddrManager{}
m.AddAddr(id1, ma11, ttl)
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl)
@ -92,21 +135,19 @@ func TestAddresses(t *testing.T) {
m.ClearAddrs(id5)
m.AddAddrs(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ttl) // clearing
if len(m.Peers()) != 5 {
t.Fatal("should have exactly two peers in the address book")
}
// test the Addresses return value
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
testHas(t, []ma.Multiaddr{ma31, ma32, ma33}, m.Addrs(id3))
testHas(t, []ma.Multiaddr{ma41, ma42, ma43, ma44}, m.Addrs(id4))
testHas(t, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, m.Addrs(id5))
}
func TestAddressesExpire(t *testing.T) {
func TestAddresses(t *testing.T) {
runTestWithAddrManagers(t, testAddresses)
}
func testAddressesExpire(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
@ -115,17 +156,12 @@ func TestAddressesExpire(t *testing.T) {
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
m := AddrManager{}
m.AddAddr(id1, ma11, time.Hour)
m.AddAddr(id1, ma12, time.Hour)
m.AddAddr(id1, ma13, time.Hour)
m.AddAddr(id2, ma24, time.Hour)
m.AddAddr(id2, ma25, time.Hour)
if len(m.Peers()) != 2 {
t.Fatal("should have exactly two peers in the address book")
}
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
@ -139,33 +175,36 @@ func TestAddressesExpire(t *testing.T) {
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
m.SetAddr(id1, ma11, time.Millisecond)
<-time.After(time.Millisecond)
<-time.After(time.Millisecond * 2)
testHas(t, []ma.Multiaddr{ma12, ma13}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
m.SetAddr(id1, ma13, time.Millisecond)
<-time.After(time.Millisecond)
<-time.After(time.Millisecond * 2)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
m.SetAddr(id2, ma24, time.Millisecond)
<-time.After(time.Millisecond)
<-time.After(time.Millisecond * 2)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma25}, m.Addrs(id2))
m.SetAddr(id2, ma25, time.Millisecond)
<-time.After(time.Millisecond)
<-time.After(time.Millisecond * 2)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, nil, m.Addrs(id2))
m.SetAddr(id1, ma12, time.Millisecond)
<-time.After(time.Millisecond)
<-time.After(time.Millisecond * 2)
testHas(t, nil, m.Addrs(id1))
testHas(t, nil, m.Addrs(id2))
}
func TestClearWorks(t *testing.T) {
func TestAddressesExpire(t *testing.T) {
runTestWithAddrManagers(t, testAddressesExpire)
}
func testClearWorks(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
@ -174,7 +213,6 @@ func TestClearWorks(t *testing.T) {
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
m := AddrManager{}
m.AddAddr(id1, ma11, time.Hour)
m.AddAddr(id1, ma12, time.Hour)
m.AddAddr(id1, ma13, time.Hour)
@ -191,11 +229,14 @@ func TestClearWorks(t *testing.T) {
testHas(t, nil, m.Addrs(id2))
}
func TestSetNegativeTTLClears(t *testing.T) {
func TestClearWorks(t *testing.T) {
runTestWithAddrManagers(t, testClearWorks)
}
func testSetNegativeTTLClears(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
m := AddrManager{}
m.SetAddr(id1, ma11, time.Hour)
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
@ -204,8 +245,11 @@ func TestSetNegativeTTLClears(t *testing.T) {
testHas(t, nil, m.Addrs(id1))
}
func TestSetNegativeTTLClears(t *testing.T) {
runTestWithAddrManagers(t, testSetNegativeTTLClears)
}
func TestUpdateTTLs(t *testing.T) {
func testUpdateTTLs(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
@ -213,8 +257,6 @@ func TestUpdateTTLs(t *testing.T) {
ma21 := MA(t, "/ip4/1.2.3.1/tcp/1121")
ma22 := MA(t, "/ip4/1.2.3.1/tcp/1122")
m := AddrManager{}
// Shouldn't panic.
m.UpdateAddrs(id1, time.Hour, time.Minute)
@ -230,30 +272,37 @@ func TestUpdateTTLs(t *testing.T) {
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
m.UpdateAddrs(id1, time.Hour, time.Millisecond)
m.UpdateAddrs(id1, time.Hour, time.Second)
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
time.Sleep(time.Millisecond)
time.Sleep(1200 * time.Millisecond)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
m.UpdateAddrs(id2, time.Hour, time.Millisecond)
m.UpdateAddrs(id2, time.Hour, time.Second)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
time.Sleep(time.Millisecond)
time.Sleep(1200 * time.Millisecond)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma22}, m.Addrs(id2))
}
func TestNilAddrsDontBreak(t *testing.T) {
func TestUpdateTTLs(t *testing.T) {
runTestWithAddrManagers(t, testUpdateTTLs)
}
func testNilAddrsDontBreak(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
m := AddrManager{}
m.SetAddr(id1, nil, time.Hour)
m.AddAddr(id1, nil, time.Hour)
}
func TestNilAddrsDontBreak(t *testing.T) {
runTestWithAddrManagers(t, testNilAddrsDontBreak)
}

56
benchmark_utils.go Normal file
View File

@ -0,0 +1,56 @@
package peerstore
import (
"context"
cr "crypto/rand"
"fmt"
"testing"
"github.com/mr-tron/base58/base58"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
)
type peerpair struct {
ID string
Addr ma.Multiaddr
}
func randomPeer(b *testing.B) *peerpair {
buf := make([]byte, 50)
for {
n, err := cr.Read(buf)
if err != nil {
b.Fatal(err)
}
if n > 0 {
break
}
}
id, err := mh.Encode(buf, mh.SHA2_256)
if err != nil {
b.Fatal(err)
}
b58ID := base58.Encode(id)
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/6666/ipfs/%s", b58ID))
if err != nil {
b.Fatal(err)
}
return &peerpair{b58ID, addr}
}
func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair) {
defer close(addrs)
for {
p := randomPeer(b)
select {
case addrs <- p:
case <-ctx.Done():
return
}
}
}

View File

@ -54,6 +54,22 @@
"hash": "QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8",
"name": "go-multihash",
"version": "1.0.8"
},
{
"author": "magik6k",
"hash": "QmUCfrikzKVGAfpE31RPwPd32fu1DYxSG7HTGCadba5Wza",
"name": "go-ds-badger",
"version": "1.6.1"
},
{
"hash": "QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w",
"name": "go-datastore",
"version": "3.1.0"
},
{
"hash": "QmQjMHF8ptRgx4E57UFMiT4YM6kqaJeYxZ1MCDX23aw4rK",
"name": "golang-lru",
"version": "2017.10.18"
}
],
"gxVersion": "0.4.0",

View File

@ -11,6 +11,7 @@ import (
//ds "github.com/jbenet/go-datastore"
//dssync "github.com/jbenet/go-datastore/sync"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
@ -30,9 +31,6 @@ type Peerstore interface {
KeyBook
Metrics
// Peers returns a list of all peer.IDs in this Peerstore
Peers() []peer.ID
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
@ -83,6 +81,9 @@ type AddrBook interface {
// ClearAddresses removes all previously stored addresses
ClearAddrs(p peer.ID)
// Peers returns all of the peer IDs stored in the AddrBook
Peers() []peer.ID
}
// KeyBook tracks the Public keys of Peers.
@ -179,7 +180,7 @@ func (kb *keybook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
type peerstore struct {
*keybook
*metrics
AddrManager
AddrBook
// store other data, like versions
//ds ds.ThreadSafeDatastore
@ -194,14 +195,30 @@ type peerstore struct {
// NewPeerstore creates a threadsafe collection of peers.
func NewPeerstore() Peerstore {
return &peerstore{
keybook: newKeybook(),
metrics: NewMetrics(),
AddrManager: AddrManager{},
//ds: dssync.MutexWrap(ds.NewMapDatastore()),
ds: make(map[string]interface{}),
keybook: newKeybook(),
metrics: NewMetrics(),
AddrBook: &AddrManager{},
ds: make(map[string]interface{}),
}
}
// NewPeerstoreDatastore creates a threadsafe collection of peers backed by a
// Datastore to prevent excess memory pressure.
func NewPeerstoreDatastore(ctx context.Context, ds datastore.Batching) (Peerstore, error) {
addrBook, err := NewDatastoreAddrManager(ctx, ds, time.Second)
if err != nil {
return nil, err
}
ps := &peerstore{
keybook: newKeybook(),
metrics: NewMetrics(),
AddrBook: addrBook,
ds: make(map[string]interface{}),
}
return ps, nil
}
func (ps *peerstore) Put(p peer.ID, key string, val interface{}) error {
//dsk := ds.NewKey(string(p) + "/" + key)
//return ps.ds.Put(dsk, val)
@ -231,7 +248,7 @@ func (ps *peerstore) Peers() []peer.ID {
for _, p := range ps.keybook.Peers() {
set[p] = struct{}{}
}
for _, p := range ps.AddrManager.Peers() {
for _, p := range ps.AddrBook.Peers() {
set[p] = struct{}{}
}
@ -245,7 +262,7 @@ func (ps *peerstore) Peers() []peer.ID {
func (ps *peerstore) PeerInfo(p peer.ID) PeerInfo {
return PeerInfo{
ID: p,
Addrs: ps.AddrManager.Addrs(p),
Addrs: ps.AddrBook.Addrs(p),
}
}

View File

@ -8,7 +8,8 @@ import (
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-crypto"
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
@ -25,13 +26,37 @@ func getAddrs(t *testing.T, n int) []ma.Multiaddr {
return addrs
}
func TestAddrStream(t *testing.T) {
func runTestWithPeerstores(t *testing.T, testFunc func(*testing.T, Peerstore)) {
t.Helper()
t.Log("NewPeerstore")
ps1 := NewPeerstore()
testFunc(t, ps1)
t.Log("NewPeerstoreDatastore")
ps2, closer2 := setupDatastorePeerstore(t)
defer closer2()
testFunc(t, ps2)
}
func setupDatastorePeerstore(t testing.TB) (Peerstore, func()) {
ds, closeDB := setupBadgerDatastore(t)
ctx, cancel := context.WithCancel(context.Background())
ps, err := NewPeerstoreDatastore(ctx, ds)
if err != nil {
t.Fatal(err)
}
closer := func() {
cancel()
closeDB()
}
return ps, closer
}
func testAddrStream(t *testing.T, ps Peerstore) {
addrs := getAddrs(t, 100)
pid := peer.ID("testpeer")
ps := NewPeerstore()
ps.AddAddrs(pid, addrs[:10], time.Hour)
ctx, cancel := context.WithCancel(context.Background())
@ -103,12 +128,14 @@ func TestAddrStream(t *testing.T) {
}
}
func TestGetStreamBeforePeerAdded(t *testing.T) {
func TestAddrStream(t *testing.T) {
runTestWithPeerstores(t, testAddrStream)
}
func testGetStreamBeforePeerAdded(t *testing.T, ps Peerstore) {
addrs := getAddrs(t, 10)
pid := peer.ID("testpeer")
ps := NewPeerstore()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -154,12 +181,14 @@ func TestGetStreamBeforePeerAdded(t *testing.T) {
}
}
func TestAddrStreamDuplicates(t *testing.T) {
func TestGetStreamBeforePeerAdded(t *testing.T) {
runTestWithPeerstores(t, testGetStreamBeforePeerAdded)
}
func testAddrStreamDuplicates(t *testing.T, ps Peerstore) {
addrs := getAddrs(t, 10)
pid := peer.ID("testpeer")
ps := NewPeerstore()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ach := ps.AddrStream(ctx, pid)
@ -193,8 +222,11 @@ func TestAddrStreamDuplicates(t *testing.T) {
}
}
func TestPeerstoreProtoStore(t *testing.T) {
ps := NewPeerstore()
func TestAddrStreamDuplicates(t *testing.T) {
runTestWithPeerstores(t, testAddrStreamDuplicates)
}
func testPeerstoreProtoStore(t *testing.T, ps Peerstore) {
p1 := peer.ID("TESTPEER")
protos := []string{"a", "b", "c", "d"}
@ -248,20 +280,23 @@ func TestPeerstoreProtoStore(t *testing.T) {
}
}
func TestBasicPeerstore(t *testing.T) {
ps := NewPeerstore()
func TestPeerstoreProtoStore(t *testing.T) {
runTestWithPeerstores(t, testAddrStreamDuplicates)
}
func testBasicPeerstore(t *testing.T, ps Peerstore) {
var pids []peer.ID
addrs := getAddrs(t, 10)
for i, a := range addrs {
p := peer.ID(fmt.Sprint(i))
for _, a := range addrs {
priv, _, _ := crypto.GenerateKeyPair(crypto.RSA, 512)
p, _ := peer.IDFromPrivateKey(priv)
pids = append(pids, p)
ps.AddAddr(p, a, PermanentAddrTTL)
}
peers := ps.Peers()
if len(peers) != 10 {
t.Fatal("expected ten peers")
t.Fatal("expected ten peers, got", len(peers))
}
pinfo := ps.PeerInfo(pids[0])
@ -269,3 +304,33 @@ func TestBasicPeerstore(t *testing.T) {
t.Fatal("stored wrong address")
}
}
func TestBasicPeerstore(t *testing.T) {
runTestWithPeerstores(t, testBasicPeerstore)
}
func benchmarkPeerstore(ps Peerstore) func(*testing.B) {
return func(b *testing.B) {
addrs := make(chan *peerpair, 100)
ctx, cancel := context.WithCancel(context.Background())
go addressProducer(ctx, b, addrs)
b.ResetTimer()
for i := 0; i < b.N; i++ {
pp := <-addrs
pid := peer.ID(pp.ID)
ps.AddAddr(pid, pp.Addr, PermanentAddrTTL)
}
cancel()
}
}
func BenchmarkPeerstore(b *testing.B) {
ps := NewPeerstore()
b.Run("PeerstoreBasic", benchmarkPeerstore(ps))
dsps, closer := setupDatastorePeerstore(b)
defer closer()
b.Run("PeerstoreDatastore", benchmarkPeerstore(dsps))
}