mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2025-03-23 13:00:08 +08:00
Merge pull request #30 from raulk/refactor
* Refactor the peerstore into interface, in-memory implementation, datastore implementation. * Create common test suites to test all implementations against. * Normalise naming: rename AddrMgr => AddrBook everywhere. * Create interface PeerMetadata, with in-memory implementation. * Adjust wait times in time-dependent tests.
This commit is contained in:
commit
95b7f1677e
@ -1,308 +0,0 @@
|
|||||||
package peerstore
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore"
|
|
||||||
"github.com/ipfs/go-ds-badger"
|
|
||||||
"github.com/libp2p/go-libp2p-peer"
|
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
|
||||||
)
|
|
||||||
|
|
||||||
func IDS(t *testing.T, ids string) peer.ID {
|
|
||||||
t.Helper()
|
|
||||||
id, err := peer.IDB58Decode(ids)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("id %q is bad: %s", ids, err)
|
|
||||||
}
|
|
||||||
return id
|
|
||||||
}
|
|
||||||
|
|
||||||
func MA(t *testing.T, m string) ma.Multiaddr {
|
|
||||||
t.Helper()
|
|
||||||
maddr, err := ma.NewMultiaddr(m)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return maddr
|
|
||||||
}
|
|
||||||
|
|
||||||
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
|
|
||||||
t.Helper()
|
|
||||||
if len(exp) != len(act) {
|
|
||||||
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, a := range exp {
|
|
||||||
found := false
|
|
||||||
|
|
||||||
for _, b := range act {
|
|
||||||
if a.Equal(b) {
|
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !found {
|
|
||||||
t.Fatalf("expected address %s not found", a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupBadgerDatastore(t testing.TB) (datastore.Batching, func()) {
|
|
||||||
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
ds, err := badger.NewDatastore(dataPath, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
closer := func() {
|
|
||||||
ds.Close()
|
|
||||||
os.RemoveAll(dataPath)
|
|
||||||
}
|
|
||||||
return ds, closer
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupDatastoreAddrManager(t *testing.T) (*DatastoreAddrManager, func()) {
|
|
||||||
ds, closeDB := setupBadgerDatastore(t)
|
|
||||||
mgr, err := NewDatastoreAddrManager(context.Background(), ds, 100*time.Microsecond)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
closer := func() {
|
|
||||||
mgr.Stop()
|
|
||||||
closeDB()
|
|
||||||
}
|
|
||||||
return mgr, closer
|
|
||||||
}
|
|
||||||
|
|
||||||
func runTestWithAddrManagers(t *testing.T, test func(*testing.T, AddrBook)) {
|
|
||||||
t.Log("AddrManager")
|
|
||||||
mgr1 := &AddrManager{}
|
|
||||||
test(t, mgr1)
|
|
||||||
|
|
||||||
t.Log("DatastoreAddrManager")
|
|
||||||
mgr2, closer2 := setupDatastoreAddrManager(t)
|
|
||||||
defer closer2()
|
|
||||||
test(t, mgr2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testAddresses(t *testing.T, m AddrBook) {
|
|
||||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
|
||||||
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
|
|
||||||
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
|
|
||||||
id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn")
|
|
||||||
id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km")
|
|
||||||
|
|
||||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
|
||||||
ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111")
|
|
||||||
ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
|
||||||
ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111")
|
|
||||||
ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222")
|
|
||||||
ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
|
||||||
ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111")
|
|
||||||
ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222")
|
|
||||||
ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333")
|
|
||||||
ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
|
||||||
ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111")
|
|
||||||
ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222")
|
|
||||||
ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333")
|
|
||||||
ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444")
|
|
||||||
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
|
||||||
|
|
||||||
ttl := time.Hour
|
|
||||||
m.AddAddr(id1, ma11, ttl)
|
|
||||||
|
|
||||||
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl)
|
|
||||||
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) // idempotency
|
|
||||||
|
|
||||||
m.AddAddr(id3, ma31, ttl)
|
|
||||||
m.AddAddr(id3, ma32, ttl)
|
|
||||||
m.AddAddr(id3, ma33, ttl)
|
|
||||||
m.AddAddr(id3, ma33, ttl) // idempotency
|
|
||||||
m.AddAddr(id3, ma33, ttl)
|
|
||||||
|
|
||||||
m.AddAddrs(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // multiple
|
|
||||||
|
|
||||||
m.AddAddrs(id5, []ma.Multiaddr{ma21, ma22}, ttl) // clearing
|
|
||||||
m.AddAddrs(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // clearing
|
|
||||||
m.ClearAddrs(id5)
|
|
||||||
m.AddAddrs(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ttl) // clearing
|
|
||||||
|
|
||||||
// test the Addresses return value
|
|
||||||
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
|
||||||
testHas(t, []ma.Multiaddr{ma31, ma32, ma33}, m.Addrs(id3))
|
|
||||||
testHas(t, []ma.Multiaddr{ma41, ma42, ma43, ma44}, m.Addrs(id4))
|
|
||||||
testHas(t, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, m.Addrs(id5))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddresses(t *testing.T) {
|
|
||||||
runTestWithAddrManagers(t, testAddresses)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testAddressesExpire(t *testing.T, m AddrBook) {
|
|
||||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
|
||||||
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
|
||||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
|
||||||
ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
|
||||||
ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
|
||||||
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
|
||||||
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
|
||||||
|
|
||||||
m.AddAddr(id1, ma11, time.Hour)
|
|
||||||
m.AddAddr(id1, ma12, time.Hour)
|
|
||||||
m.AddAddr(id1, ma13, time.Hour)
|
|
||||||
m.AddAddr(id2, ma24, time.Hour)
|
|
||||||
m.AddAddr(id2, ma25, time.Hour)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.SetAddr(id1, ma11, 2*time.Hour)
|
|
||||||
m.SetAddr(id1, ma12, 2*time.Hour)
|
|
||||||
m.SetAddr(id1, ma13, 2*time.Hour)
|
|
||||||
m.SetAddr(id2, ma24, 2*time.Hour)
|
|
||||||
m.SetAddr(id2, ma25, 2*time.Hour)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.SetAddr(id1, ma11, time.Millisecond)
|
|
||||||
<-time.After(time.Millisecond * 2)
|
|
||||||
testHas(t, []ma.Multiaddr{ma12, ma13}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.SetAddr(id1, ma13, time.Millisecond)
|
|
||||||
<-time.After(time.Millisecond * 2)
|
|
||||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.SetAddr(id2, ma24, time.Millisecond)
|
|
||||||
<-time.After(time.Millisecond * 2)
|
|
||||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma25}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.SetAddr(id2, ma25, time.Millisecond)
|
|
||||||
<-time.After(time.Millisecond * 2)
|
|
||||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, nil, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.SetAddr(id1, ma12, time.Millisecond)
|
|
||||||
<-time.After(time.Millisecond * 2)
|
|
||||||
testHas(t, nil, m.Addrs(id1))
|
|
||||||
testHas(t, nil, m.Addrs(id2))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddressesExpire(t *testing.T) {
|
|
||||||
runTestWithAddrManagers(t, testAddressesExpire)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testClearWorks(t *testing.T, m AddrBook) {
|
|
||||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
|
||||||
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
|
||||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
|
||||||
ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
|
||||||
ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
|
||||||
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
|
||||||
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
|
||||||
|
|
||||||
m.AddAddr(id1, ma11, time.Hour)
|
|
||||||
m.AddAddr(id1, ma12, time.Hour)
|
|
||||||
m.AddAddr(id1, ma13, time.Hour)
|
|
||||||
m.AddAddr(id2, ma24, time.Hour)
|
|
||||||
m.AddAddr(id2, ma25, time.Hour)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.ClearAddrs(id1)
|
|
||||||
m.ClearAddrs(id2)
|
|
||||||
|
|
||||||
testHas(t, nil, m.Addrs(id1))
|
|
||||||
testHas(t, nil, m.Addrs(id2))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClearWorks(t *testing.T) {
|
|
||||||
runTestWithAddrManagers(t, testClearWorks)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testSetNegativeTTLClears(t *testing.T, m AddrBook) {
|
|
||||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
|
||||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
|
||||||
|
|
||||||
m.SetAddr(id1, ma11, time.Hour)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
|
|
||||||
|
|
||||||
m.SetAddr(id1, ma11, -1)
|
|
||||||
|
|
||||||
testHas(t, nil, m.Addrs(id1))
|
|
||||||
}
|
|
||||||
func TestSetNegativeTTLClears(t *testing.T) {
|
|
||||||
runTestWithAddrManagers(t, testSetNegativeTTLClears)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testUpdateTTLs(t *testing.T, m AddrBook) {
|
|
||||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
|
||||||
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
|
||||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
|
||||||
ma12 := MA(t, "/ip4/1.2.3.1/tcp/1112")
|
|
||||||
ma21 := MA(t, "/ip4/1.2.3.1/tcp/1121")
|
|
||||||
ma22 := MA(t, "/ip4/1.2.3.1/tcp/1122")
|
|
||||||
|
|
||||||
// Shouldn't panic.
|
|
||||||
m.UpdateAddrs(id1, time.Hour, time.Minute)
|
|
||||||
|
|
||||||
m.SetAddr(id1, ma11, time.Hour)
|
|
||||||
m.SetAddr(id1, ma12, time.Minute)
|
|
||||||
|
|
||||||
// Shouldn't panic.
|
|
||||||
m.UpdateAddrs(id2, time.Hour, time.Minute)
|
|
||||||
|
|
||||||
m.SetAddr(id2, ma21, time.Hour)
|
|
||||||
m.SetAddr(id2, ma22, time.Minute)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.UpdateAddrs(id1, time.Hour, time.Second)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
|
||||||
|
|
||||||
time.Sleep(1200 * time.Millisecond)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
|
||||||
|
|
||||||
m.UpdateAddrs(id2, time.Hour, time.Second)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
|
||||||
|
|
||||||
time.Sleep(1200 * time.Millisecond)
|
|
||||||
|
|
||||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
|
||||||
testHas(t, []ma.Multiaddr{ma22}, m.Addrs(id2))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUpdateTTLs(t *testing.T) {
|
|
||||||
runTestWithAddrManagers(t, testUpdateTTLs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testNilAddrsDontBreak(t *testing.T, m AddrBook) {
|
|
||||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
|
||||||
m.SetAddr(id1, nil, time.Hour)
|
|
||||||
m.AddAddr(id1, nil, time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNilAddrsDontBreak(t *testing.T) {
|
|
||||||
runTestWithAddrManagers(t, testNilAddrsDontBreak)
|
|
||||||
}
|
|
134
interface.go
Normal file
134
interface.go
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
package peerstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
ic "github.com/libp2p/go-libp2p-crypto"
|
||||||
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrNotFound = errors.New("item not found")
|
||||||
|
|
||||||
|
var (
|
||||||
|
// AddressTTL is the expiration time of addresses.
|
||||||
|
AddressTTL = time.Hour
|
||||||
|
|
||||||
|
// TempAddrTTL is the ttl used for a short lived address
|
||||||
|
TempAddrTTL = time.Second * 10
|
||||||
|
|
||||||
|
// ProviderAddrTTL is the TTL of an address we've received from a provider.
|
||||||
|
// This is also a temporary address, but lasts longer. After this expires,
|
||||||
|
// the records we return will require an extra lookup.
|
||||||
|
ProviderAddrTTL = time.Minute * 10
|
||||||
|
|
||||||
|
// RecentlyConnectedAddrTTL is used when we recently connected to a peer.
|
||||||
|
// It means that we are reasonably certain of the peer's address.
|
||||||
|
RecentlyConnectedAddrTTL = time.Minute * 10
|
||||||
|
|
||||||
|
// OwnObservedAddrTTL is used for our own external addresses observed by peers.
|
||||||
|
OwnObservedAddrTTL = time.Minute * 10
|
||||||
|
)
|
||||||
|
|
||||||
|
// Permanent TTLs (distinct so we can distinguish between them, constant as they
|
||||||
|
// are, in fact, permanent)
|
||||||
|
const (
|
||||||
|
|
||||||
|
// PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes).
|
||||||
|
PermanentAddrTTL = math.MaxInt64 - iota
|
||||||
|
|
||||||
|
// ConnectedAddrTTL is the ttl used for the addresses of a peer to whom
|
||||||
|
// we're connected directly. This is basically permanent, as we will
|
||||||
|
// clear them + re-add under a TempAddrTTL after disconnecting.
|
||||||
|
ConnectedAddrTTL
|
||||||
|
)
|
||||||
|
|
||||||
|
// Peerstore provides a threadsafe store of Peer related
|
||||||
|
// information.
|
||||||
|
type Peerstore interface {
|
||||||
|
AddrBook
|
||||||
|
KeyBook
|
||||||
|
PeerMetadata
|
||||||
|
Metrics
|
||||||
|
|
||||||
|
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
|
||||||
|
// This is a small slice of the information Peerstore has on
|
||||||
|
// that peer, useful to other services.
|
||||||
|
PeerInfo(peer.ID) PeerInfo
|
||||||
|
|
||||||
|
GetProtocols(peer.ID) ([]string, error)
|
||||||
|
AddProtocols(peer.ID, ...string) error
|
||||||
|
SetProtocols(peer.ID, ...string) error
|
||||||
|
SupportsProtocols(peer.ID, ...string) ([]string, error)
|
||||||
|
|
||||||
|
// Peers returns all of the peer IDs stored across all inner stores.
|
||||||
|
Peers() []peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeerMetadata interface {
|
||||||
|
// Get/Put is a simple registry for other peer-related key/value pairs.
|
||||||
|
// if we find something we use often, it should become its own set of
|
||||||
|
// methods. this is a last resort.
|
||||||
|
Get(p peer.ID, key string) (interface{}, error)
|
||||||
|
Put(p peer.ID, key string, val interface{}) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddrBook holds the multiaddrs of peers.
|
||||||
|
type AddrBook interface {
|
||||||
|
|
||||||
|
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||||
|
AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
|
||||||
|
|
||||||
|
// AddAddrs gives this AddrBook addresses to use, with a given ttl
|
||||||
|
// (time-to-live), after which the address is no longer valid.
|
||||||
|
// If the manager has a longer TTL, the operation is a no-op for that address
|
||||||
|
AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
|
||||||
|
|
||||||
|
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
|
||||||
|
SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
|
||||||
|
|
||||||
|
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
|
||||||
|
// This is used when we receive the best estimate of the validity of an address.
|
||||||
|
SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
|
||||||
|
|
||||||
|
// UpdateAddrs updates the addresses associated with the given peer that have
|
||||||
|
// the given oldTTL to have the given newTTL.
|
||||||
|
UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration)
|
||||||
|
|
||||||
|
// Addresses returns all known (and valid) addresses for a given peer
|
||||||
|
Addrs(p peer.ID) []ma.Multiaddr
|
||||||
|
|
||||||
|
// AddrStream returns a channel that gets all addresses for a given
|
||||||
|
// peer sent on it. If new addresses are added after the call is made
|
||||||
|
// they will be sent along through the channel as well.
|
||||||
|
AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr
|
||||||
|
|
||||||
|
// ClearAddresses removes all previously stored addresses
|
||||||
|
ClearAddrs(p peer.ID)
|
||||||
|
|
||||||
|
// PeersWithAddrs returns all of the peer IDs stored in the AddrBook
|
||||||
|
PeersWithAddrs() []peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// KeyBook tracks the keys of Peers.
|
||||||
|
type KeyBook interface {
|
||||||
|
|
||||||
|
// PubKey stores the public key of a peer.
|
||||||
|
PubKey(peer.ID) ic.PubKey
|
||||||
|
|
||||||
|
// AddPubKey stores the public key of a peer.
|
||||||
|
AddPubKey(peer.ID, ic.PubKey) error
|
||||||
|
|
||||||
|
// PrivKey returns the private key of a peer.
|
||||||
|
PrivKey(peer.ID) ic.PrivKey
|
||||||
|
|
||||||
|
// AddPrivKey stores the private key of a peer.
|
||||||
|
AddPrivKey(peer.ID, ic.PrivKey) error
|
||||||
|
|
||||||
|
// PeersWithKeys returns all the peer IDs stored in the KeyBook
|
||||||
|
PeersWithKeys() []peer.ID
|
||||||
|
}
|
@ -7,7 +7,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
testutil "github.com/libp2p/go-libp2p-peer/test"
|
"github.com/libp2p/go-libp2p-peer/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLatencyEWMAFun(t *testing.T) {
|
func TestLatencyEWMAFun(t *testing.T) {
|
||||||
|
@ -58,7 +58,7 @@ func InfoFromP2pAddr(m ma.Multiaddr) (*PeerInfo, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func InfoToP2pAddrs(pi *PeerInfo) ([]ma.Multiaddr, error) {
|
func InfoToP2pAddrs(pi *PeerInfo) ([]ma.Multiaddr, error) {
|
||||||
addrs := []ma.Multiaddr{}
|
var addrs []ma.Multiaddr
|
||||||
tpl := "/" + ma.ProtocolWithCode(ma.P_IPFS).Name + "/"
|
tpl := "/" + ma.ProtocolWithCode(ma.P_IPFS).Name + "/"
|
||||||
for _, addr := range pi.Addrs {
|
for _, addr := range pi.Addrs {
|
||||||
p2paddr, err := ma.NewMultiaddr(tpl + peer.IDB58Encode(pi.ID))
|
p2paddr, err := ma.NewMultiaddr(tpl + peer.IDB58Encode(pi.ID))
|
||||||
|
@ -3,7 +3,7 @@ package peerstore
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
240
peerstore.go
240
peerstore.go
@ -1,254 +1,42 @@
|
|||||||
package peerstore
|
package peerstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
ic "github.com/libp2p/go-libp2p-crypto"
|
|
||||||
|
|
||||||
//ds "github.com/jbenet/go-datastore"
|
|
||||||
//dssync "github.com/jbenet/go-datastore/sync"
|
|
||||||
"github.com/ipfs/go-datastore"
|
|
||||||
logging "github.com/ipfs/go-log"
|
|
||||||
"github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("peerstore")
|
var _ Peerstore = (*peerstore)(nil)
|
||||||
|
|
||||||
const (
|
|
||||||
// AddressTTL is the expiration time of addresses.
|
|
||||||
AddressTTL = time.Hour
|
|
||||||
)
|
|
||||||
|
|
||||||
// Peerstore provides a threadsafe store of Peer related
|
|
||||||
// information.
|
|
||||||
type Peerstore interface {
|
|
||||||
AddrBook
|
|
||||||
KeyBook
|
|
||||||
Metrics
|
|
||||||
|
|
||||||
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
|
|
||||||
// This is a small slice of the information Peerstore has on
|
|
||||||
// that peer, useful to other services.
|
|
||||||
PeerInfo(peer.ID) PeerInfo
|
|
||||||
|
|
||||||
// Get/Put is a simple registry for other peer-related key/value pairs.
|
|
||||||
// if we find something we use often, it should become its own set of
|
|
||||||
// methods. this is a last resort.
|
|
||||||
Get(id peer.ID, key string) (interface{}, error)
|
|
||||||
Put(id peer.ID, key string, val interface{}) error
|
|
||||||
|
|
||||||
GetProtocols(peer.ID) ([]string, error)
|
|
||||||
AddProtocols(peer.ID, ...string) error
|
|
||||||
SetProtocols(peer.ID, ...string) error
|
|
||||||
SupportsProtocols(peer.ID, ...string) ([]string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddrBook is an interface that fits the new AddrManager. I'm patching
|
|
||||||
// it up in here to avoid changing a ton of the codebase.
|
|
||||||
type AddrBook interface {
|
|
||||||
|
|
||||||
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
|
||||||
AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
|
|
||||||
|
|
||||||
// AddAddrs gives AddrManager addresses to use, with a given ttl
|
|
||||||
// (time-to-live), after which the address is no longer valid.
|
|
||||||
// If the manager has a longer TTL, the operation is a no-op for that address
|
|
||||||
AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
|
|
||||||
|
|
||||||
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
|
|
||||||
SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
|
|
||||||
|
|
||||||
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
|
|
||||||
// This is used when we receive the best estimate of the validity of an address.
|
|
||||||
SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
|
|
||||||
|
|
||||||
// UpdateAddrs updates the addresses associated with the given peer that have
|
|
||||||
// the given oldTTL to have the given newTTL.
|
|
||||||
UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration)
|
|
||||||
|
|
||||||
// Addresses returns all known (and valid) addresses for a given peer
|
|
||||||
Addrs(p peer.ID) []ma.Multiaddr
|
|
||||||
|
|
||||||
// AddrStream returns a channel that gets all addresses for a given
|
|
||||||
// peer sent on it. If new addresses are added after the call is made
|
|
||||||
// they will be sent along through the channel as well.
|
|
||||||
AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr
|
|
||||||
|
|
||||||
// ClearAddresses removes all previously stored addresses
|
|
||||||
ClearAddrs(p peer.ID)
|
|
||||||
|
|
||||||
// Peers returns all of the peer IDs stored in the AddrBook
|
|
||||||
Peers() []peer.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyBook tracks the Public keys of Peers.
|
|
||||||
type KeyBook interface {
|
|
||||||
PubKey(peer.ID) ic.PubKey
|
|
||||||
AddPubKey(peer.ID, ic.PubKey) error
|
|
||||||
|
|
||||||
PrivKey(peer.ID) ic.PrivKey
|
|
||||||
AddPrivKey(peer.ID, ic.PrivKey) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type keybook struct {
|
|
||||||
pks map[peer.ID]ic.PubKey
|
|
||||||
sks map[peer.ID]ic.PrivKey
|
|
||||||
|
|
||||||
sync.RWMutex // same lock. wont happen a ton.
|
|
||||||
}
|
|
||||||
|
|
||||||
func newKeybook() *keybook {
|
|
||||||
return &keybook{
|
|
||||||
pks: map[peer.ID]ic.PubKey{},
|
|
||||||
sks: map[peer.ID]ic.PrivKey{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kb *keybook) Peers() []peer.ID {
|
|
||||||
kb.RLock()
|
|
||||||
ps := make([]peer.ID, 0, len(kb.pks)+len(kb.sks))
|
|
||||||
for p := range kb.pks {
|
|
||||||
ps = append(ps, p)
|
|
||||||
}
|
|
||||||
for p := range kb.sks {
|
|
||||||
if _, found := kb.pks[p]; !found {
|
|
||||||
ps = append(ps, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
kb.RUnlock()
|
|
||||||
return ps
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kb *keybook) PubKey(p peer.ID) ic.PubKey {
|
|
||||||
kb.RLock()
|
|
||||||
pk := kb.pks[p]
|
|
||||||
kb.RUnlock()
|
|
||||||
if pk != nil {
|
|
||||||
return pk
|
|
||||||
}
|
|
||||||
pk, err := p.ExtractPublicKey()
|
|
||||||
if err == nil && pk != nil {
|
|
||||||
kb.Lock()
|
|
||||||
kb.pks[p] = pk
|
|
||||||
kb.Unlock()
|
|
||||||
}
|
|
||||||
return pk
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kb *keybook) AddPubKey(p peer.ID, pk ic.PubKey) error {
|
|
||||||
|
|
||||||
// check it's correct first
|
|
||||||
if !p.MatchesPublicKey(pk) {
|
|
||||||
return errors.New("ID does not match PublicKey")
|
|
||||||
}
|
|
||||||
|
|
||||||
kb.Lock()
|
|
||||||
kb.pks[p] = pk
|
|
||||||
kb.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kb *keybook) PrivKey(p peer.ID) ic.PrivKey {
|
|
||||||
kb.RLock()
|
|
||||||
sk := kb.sks[p]
|
|
||||||
kb.RUnlock()
|
|
||||||
return sk
|
|
||||||
}
|
|
||||||
|
|
||||||
func (kb *keybook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
|
|
||||||
|
|
||||||
if sk == nil {
|
|
||||||
return errors.New("sk is nil (PrivKey)")
|
|
||||||
}
|
|
||||||
|
|
||||||
// check it's correct first
|
|
||||||
if !p.MatchesPrivateKey(sk) {
|
|
||||||
return errors.New("ID does not match PrivateKey")
|
|
||||||
}
|
|
||||||
|
|
||||||
kb.Lock()
|
|
||||||
kb.sks[p] = sk
|
|
||||||
kb.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type peerstore struct {
|
type peerstore struct {
|
||||||
*keybook
|
Metrics
|
||||||
*metrics
|
|
||||||
AddrBook
|
|
||||||
|
|
||||||
// store other data, like versions
|
KeyBook
|
||||||
//ds ds.ThreadSafeDatastore
|
AddrBook
|
||||||
// TODO: use a datastore for this
|
PeerMetadata
|
||||||
ds map[string]interface{}
|
|
||||||
dslock sync.Mutex
|
|
||||||
|
|
||||||
// lock for protocol information, separate from datastore lock
|
// lock for protocol information, separate from datastore lock
|
||||||
protolock sync.Mutex
|
protolock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPeerstore creates a threadsafe collection of peers.
|
// NewPeerstore creates a data structure that stores peer data, backed by the
|
||||||
func NewPeerstore() Peerstore {
|
// supplied implementations of KeyBook, AddrBook and PeerMetadata.
|
||||||
|
func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore {
|
||||||
return &peerstore{
|
return &peerstore{
|
||||||
keybook: newKeybook(),
|
KeyBook: kb,
|
||||||
metrics: NewMetrics(),
|
AddrBook: ab,
|
||||||
AddrBook: &AddrManager{},
|
PeerMetadata: md,
|
||||||
ds: make(map[string]interface{}),
|
Metrics: NewMetrics(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPeerstoreDatastore creates a threadsafe collection of peers backed by a
|
|
||||||
// Datastore to prevent excess memory pressure.
|
|
||||||
func NewPeerstoreDatastore(ctx context.Context, ds datastore.Batching) (Peerstore, error) {
|
|
||||||
addrBook, err := NewDatastoreAddrManager(ctx, ds, time.Second)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ps := &peerstore{
|
|
||||||
keybook: newKeybook(),
|
|
||||||
metrics: NewMetrics(),
|
|
||||||
AddrBook: addrBook,
|
|
||||||
ds: make(map[string]interface{}),
|
|
||||||
}
|
|
||||||
return ps, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerstore) Put(p peer.ID, key string, val interface{}) error {
|
|
||||||
//dsk := ds.NewKey(string(p) + "/" + key)
|
|
||||||
//return ps.ds.Put(dsk, val)
|
|
||||||
ps.dslock.Lock()
|
|
||||||
defer ps.dslock.Unlock()
|
|
||||||
ps.ds[string(p)+"/"+key] = val
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var ErrNotFound = errors.New("item not found")
|
|
||||||
|
|
||||||
func (ps *peerstore) Get(p peer.ID, key string) (interface{}, error) {
|
|
||||||
//dsk := ds.NewKey(string(p) + "/" + key)
|
|
||||||
//return ps.ds.Get(dsk)
|
|
||||||
|
|
||||||
ps.dslock.Lock()
|
|
||||||
defer ps.dslock.Unlock()
|
|
||||||
i, ok := ps.ds[string(p)+"/"+key]
|
|
||||||
if !ok {
|
|
||||||
return nil, ErrNotFound
|
|
||||||
}
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ps *peerstore) Peers() []peer.ID {
|
func (ps *peerstore) Peers() []peer.ID {
|
||||||
set := map[peer.ID]struct{}{}
|
set := map[peer.ID]struct{}{}
|
||||||
for _, p := range ps.keybook.Peers() {
|
for _, p := range ps.PeersWithKeys() {
|
||||||
set[p] = struct{}{}
|
set[p] = struct{}{}
|
||||||
}
|
}
|
||||||
for _, p := range ps.AddrBook.Peers() {
|
for _, p := range ps.PeersWithAddrs() {
|
||||||
set[p] = struct{}{}
|
set[p] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,336 +0,0 @@
|
|||||||
package peerstore
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
|
||||||
"sort"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-crypto"
|
|
||||||
"github.com/libp2p/go-libp2p-peer"
|
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
|
||||||
)
|
|
||||||
|
|
||||||
func getAddrs(t *testing.T, n int) []ma.Multiaddr {
|
|
||||||
var addrs []ma.Multiaddr
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
addrs = append(addrs, a)
|
|
||||||
}
|
|
||||||
return addrs
|
|
||||||
}
|
|
||||||
|
|
||||||
func runTestWithPeerstores(t *testing.T, testFunc func(*testing.T, Peerstore)) {
|
|
||||||
t.Helper()
|
|
||||||
t.Log("NewPeerstore")
|
|
||||||
ps1 := NewPeerstore()
|
|
||||||
testFunc(t, ps1)
|
|
||||||
|
|
||||||
t.Log("NewPeerstoreDatastore")
|
|
||||||
ps2, closer2 := setupDatastorePeerstore(t)
|
|
||||||
defer closer2()
|
|
||||||
testFunc(t, ps2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupDatastorePeerstore(t testing.TB) (Peerstore, func()) {
|
|
||||||
ds, closeDB := setupBadgerDatastore(t)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
ps, err := NewPeerstoreDatastore(ctx, ds)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
closer := func() {
|
|
||||||
cancel()
|
|
||||||
closeDB()
|
|
||||||
}
|
|
||||||
return ps, closer
|
|
||||||
}
|
|
||||||
|
|
||||||
func testAddrStream(t *testing.T, ps Peerstore) {
|
|
||||||
addrs := getAddrs(t, 100)
|
|
||||||
|
|
||||||
pid := peer.ID("testpeer")
|
|
||||||
|
|
||||||
ps.AddAddrs(pid, addrs[:10], time.Hour)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
|
|
||||||
addrch := ps.AddrStream(ctx, pid)
|
|
||||||
|
|
||||||
// while that subscription is active, publish ten more addrs
|
|
||||||
// this tests that it doesnt hang
|
|
||||||
for i := 10; i < 20; i++ {
|
|
||||||
ps.AddAddr(pid, addrs[i], time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
// now receive them (without hanging)
|
|
||||||
timeout := time.After(time.Second * 10)
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
select {
|
|
||||||
case <-addrch:
|
|
||||||
case <-timeout:
|
|
||||||
t.Fatal("timed out")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// start a second stream
|
|
||||||
ctx2, cancel2 := context.WithCancel(context.Background())
|
|
||||||
addrch2 := ps.AddrStream(ctx2, pid)
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(done)
|
|
||||||
// now send the rest of the addresses
|
|
||||||
for _, a := range addrs[20:80] {
|
|
||||||
ps.AddAddr(pid, a, time.Hour)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// receive some concurrently with the goroutine
|
|
||||||
timeout = time.After(time.Second * 10)
|
|
||||||
for i := 0; i < 40; i++ {
|
|
||||||
select {
|
|
||||||
case <-addrch:
|
|
||||||
case <-timeout:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
<-done
|
|
||||||
|
|
||||||
// receive some more after waiting for that goroutine to complete
|
|
||||||
timeout = time.After(time.Second * 10)
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
select {
|
|
||||||
case <-addrch:
|
|
||||||
case <-timeout:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// now cancel it
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
// now check the *second* subscription. We should see 80 addresses.
|
|
||||||
for i := 0; i < 80; i++ {
|
|
||||||
<-addrch2
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel2()
|
|
||||||
|
|
||||||
// and add a few more addresses it doesnt hang afterwards
|
|
||||||
for _, a := range addrs[80:] {
|
|
||||||
ps.AddAddr(pid, a, time.Hour)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddrStream(t *testing.T) {
|
|
||||||
runTestWithPeerstores(t, testAddrStream)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testGetStreamBeforePeerAdded(t *testing.T, ps Peerstore) {
|
|
||||||
addrs := getAddrs(t, 10)
|
|
||||||
pid := peer.ID("testpeer")
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
ach := ps.AddrStream(ctx, pid)
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
ps.AddAddr(pid, addrs[i], time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
received := make(map[string]bool)
|
|
||||||
var count int
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
a, ok := <-ach
|
|
||||||
if !ok {
|
|
||||||
t.Fatal("channel shouldnt be closed yet")
|
|
||||||
}
|
|
||||||
if a == nil {
|
|
||||||
t.Fatal("got a nil address, thats weird")
|
|
||||||
}
|
|
||||||
count++
|
|
||||||
if received[a.String()] {
|
|
||||||
t.Fatal("received duplicate address")
|
|
||||||
}
|
|
||||||
received[a.String()] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ach:
|
|
||||||
t.Fatal("shouldnt have received any more addresses")
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if count != 10 {
|
|
||||||
t.Fatal("should have received exactly ten addresses, got ", count)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, a := range addrs {
|
|
||||||
if !received[a.String()] {
|
|
||||||
t.Log(received)
|
|
||||||
t.Fatalf("expected to receive address %s but didnt", a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetStreamBeforePeerAdded(t *testing.T) {
|
|
||||||
runTestWithPeerstores(t, testGetStreamBeforePeerAdded)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testAddrStreamDuplicates(t *testing.T, ps Peerstore) {
|
|
||||||
addrs := getAddrs(t, 10)
|
|
||||||
pid := peer.ID("testpeer")
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
ach := ps.AddrStream(ctx, pid)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
ps.AddAddr(pid, addrs[i], time.Hour)
|
|
||||||
ps.AddAddr(pid, addrs[rand.Intn(10)], time.Hour)
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure that all addresses get processed before context is cancelled
|
|
||||||
time.Sleep(time.Millisecond * 50)
|
|
||||||
cancel()
|
|
||||||
}()
|
|
||||||
|
|
||||||
received := make(map[string]bool)
|
|
||||||
var count int
|
|
||||||
for a := range ach {
|
|
||||||
if a == nil {
|
|
||||||
t.Fatal("got a nil address, thats weird")
|
|
||||||
}
|
|
||||||
count++
|
|
||||||
if received[a.String()] {
|
|
||||||
t.Fatal("received duplicate address")
|
|
||||||
}
|
|
||||||
received[a.String()] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if count != 10 {
|
|
||||||
t.Fatal("should have received exactly ten addresses")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddrStreamDuplicates(t *testing.T) {
|
|
||||||
runTestWithPeerstores(t, testAddrStreamDuplicates)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testPeerstoreProtoStore(t *testing.T, ps Peerstore) {
|
|
||||||
p1 := peer.ID("TESTPEER")
|
|
||||||
|
|
||||||
protos := []string{"a", "b", "c", "d"}
|
|
||||||
|
|
||||||
err := ps.AddProtocols(p1, protos...)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
out, err := ps.GetProtocols(p1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(out) != len(protos) {
|
|
||||||
t.Fatal("got wrong number of protocols back")
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Strings(out)
|
|
||||||
for i, p := range protos {
|
|
||||||
if out[i] != p {
|
|
||||||
t.Fatal("got wrong protocol")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
supported, err := ps.SupportsProtocols(p1, "q", "w", "a", "y", "b")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(supported) != 2 {
|
|
||||||
t.Fatal("only expected 2 supported")
|
|
||||||
}
|
|
||||||
|
|
||||||
if supported[0] != "a" || supported[1] != "b" {
|
|
||||||
t.Fatal("got wrong supported array: ", supported)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ps.SetProtocols(p1, "other")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
supported, err = ps.SupportsProtocols(p1, "q", "w", "a", "y", "b")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(supported) != 0 {
|
|
||||||
t.Fatal("none of those protocols should have been supported")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPeerstoreProtoStore(t *testing.T) {
|
|
||||||
runTestWithPeerstores(t, testAddrStreamDuplicates)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testBasicPeerstore(t *testing.T, ps Peerstore) {
|
|
||||||
var pids []peer.ID
|
|
||||||
addrs := getAddrs(t, 10)
|
|
||||||
for _, a := range addrs {
|
|
||||||
priv, _, _ := crypto.GenerateKeyPair(crypto.RSA, 512)
|
|
||||||
p, _ := peer.IDFromPrivateKey(priv)
|
|
||||||
pids = append(pids, p)
|
|
||||||
ps.AddAddr(p, a, PermanentAddrTTL)
|
|
||||||
}
|
|
||||||
|
|
||||||
peers := ps.Peers()
|
|
||||||
if len(peers) != 10 {
|
|
||||||
t.Fatal("expected ten peers, got", len(peers))
|
|
||||||
}
|
|
||||||
|
|
||||||
pinfo := ps.PeerInfo(pids[0])
|
|
||||||
if !pinfo.Addrs[0].Equal(addrs[0]) {
|
|
||||||
t.Fatal("stored wrong address")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBasicPeerstore(t *testing.T) {
|
|
||||||
runTestWithPeerstores(t, testBasicPeerstore)
|
|
||||||
}
|
|
||||||
|
|
||||||
func benchmarkPeerstore(ps Peerstore) func(*testing.B) {
|
|
||||||
return func(b *testing.B) {
|
|
||||||
addrs := make(chan *peerpair, 100)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
go addressProducer(ctx, b, addrs)
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
for i := 0; i < b.N; i++ {
|
|
||||||
pp := <-addrs
|
|
||||||
pid := peer.ID(pp.ID)
|
|
||||||
ps.AddAddr(pid, pp.Addr, PermanentAddrTTL)
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkPeerstore(b *testing.B) {
|
|
||||||
ps := NewPeerstore()
|
|
||||||
b.Run("PeerstoreBasic", benchmarkPeerstore(ps))
|
|
||||||
|
|
||||||
dsps, closer := setupDatastorePeerstore(b)
|
|
||||||
defer closer()
|
|
||||||
b.Run("PeerstoreDatastore", benchmarkPeerstore(dsps))
|
|
||||||
}
|
|
@ -1,4 +1,4 @@
|
|||||||
package peerstore
|
package pstoreds
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -8,43 +8,51 @@ import (
|
|||||||
"github.com/hashicorp/golang-lru"
|
"github.com/hashicorp/golang-lru"
|
||||||
ds "github.com/ipfs/go-datastore"
|
ds "github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/query"
|
"github.com/ipfs/go-datastore/query"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
mh "github.com/multiformats/go-multihash"
|
mh "github.com/multiformats/go-multihash"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("peerstore/ds")
|
||||||
|
|
||||||
// Number of times to retry transactional writes
|
// Number of times to retry transactional writes
|
||||||
var dsWriteRetries = 5
|
var dsWriteRetries = 5
|
||||||
|
|
||||||
// DatastoreAddrManager is an address manager backed by a Datastore with both an
|
var _ pstore.AddrBook = (*dsAddrBook)(nil)
|
||||||
|
|
||||||
|
// dsAddrBook is an address book backed by a Datastore with both an
|
||||||
// in-memory TTL manager and an in-memory address stream manager.
|
// in-memory TTL manager and an in-memory address stream manager.
|
||||||
type DatastoreAddrManager struct {
|
type dsAddrBook struct {
|
||||||
cache *lru.ARCCache
|
cache *lru.ARCCache
|
||||||
ds ds.Batching
|
ds ds.Batching
|
||||||
ttlManager *ttlmanager
|
ttlManager *ttlmanager
|
||||||
subsManager *AddrSubManager
|
subsManager *pstoremem.AddrSubManager
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatastoreAddrManager initializes a new DatastoreAddrManager given a
|
// NewAddrBook initializes a new address book given a
|
||||||
// Datastore instance, a context for managing the TTL manager, and the interval
|
// Datastore instance, a context for managing the TTL manager,
|
||||||
// at which the TTL manager should sweep the Datastore.
|
// and the interval at which the TTL manager should sweep the Datastore.
|
||||||
func NewDatastoreAddrManager(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) (*DatastoreAddrManager, error) {
|
func NewAddrBook(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) (*dsAddrBook, error) {
|
||||||
cache, err := lru.NewARC(1024)
|
cache, err := lru.NewARC(1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
mgr := &DatastoreAddrManager{
|
mgr := &dsAddrBook{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
ds: ds,
|
ds: ds,
|
||||||
ttlManager: newTTLManager(ctx, ds, cache, ttlInterval),
|
ttlManager: newTTLManager(ctx, ds, cache, ttlInterval),
|
||||||
subsManager: NewAddrSubManager(),
|
subsManager: pstoremem.NewAddrSubManager(),
|
||||||
}
|
}
|
||||||
return mgr, nil
|
return mgr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop will signal the TTL manager to stop and block until it returns.
|
// Stop will signal the TTL manager to stop and block until it returns.
|
||||||
func (mgr *DatastoreAddrManager) Stop() {
|
func (mgr *dsAddrBook) Stop() {
|
||||||
mgr.ttlManager.cancel()
|
mgr.ttlManager.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,12 +70,12 @@ func peerIDFromKey(key ds.Key) (peer.ID, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AddAddr will add a new address if it's not already in the AddrBook.
|
// AddAddr will add a new address if it's not already in the AddrBook.
|
||||||
func (mgr *DatastoreAddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
func (mgr *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||||
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddAddrs will add many new addresses if they're not already in the AddrBook.
|
// AddAddrs will add many new addresses if they're not already in the AddrBook.
|
||||||
func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||||
if ttl <= 0 {
|
if ttl <= 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -76,16 +84,16 @@ func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetAddr will add or update the TTL of an address in the AddrBook.
|
// SetAddr will add or update the TTL of an address in the AddrBook.
|
||||||
func (mgr *DatastoreAddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
func (mgr *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||||
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
|
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
|
||||||
func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||||
mgr.setAddrs(p, addrs, ttl, false)
|
mgr.setAddrs(p, addrs, ttl, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) {
|
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) {
|
||||||
for i := 0; i < dsWriteRetries; i++ {
|
for i := 0; i < dsWriteRetries; i++ {
|
||||||
// keys to add to the TTL manager
|
// keys to add to the TTL manager
|
||||||
var keys []ds.Key
|
var keys []ds.Key
|
||||||
@ -145,13 +153,13 @@ func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl t
|
|||||||
|
|
||||||
// UpdateAddrs will update any addresses for a given peer and TTL combination to
|
// UpdateAddrs will update any addresses for a given peer and TTL combination to
|
||||||
// have a new TTL.
|
// have a new TTL.
|
||||||
func (mgr *DatastoreAddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
||||||
prefix := ds.NewKey(p.Pretty())
|
prefix := ds.NewKey(p.Pretty())
|
||||||
mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL)
|
mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Addrs Returns all of the non-expired addresses for a given peer.
|
// Addrs Returns all of the non-expired addresses for a given peer.
|
||||||
func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr {
|
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
||||||
prefix := ds.NewKey(p.Pretty())
|
prefix := ds.NewKey(p.Pretty())
|
||||||
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
||||||
results, err := mgr.ds.Query(q)
|
results, err := mgr.ds.Query(q)
|
||||||
@ -185,7 +193,7 @@ func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Peers returns all of the peer IDs for which the AddrBook has addresses.
|
// Peers returns all of the peer IDs for which the AddrBook has addresses.
|
||||||
func (mgr *DatastoreAddrManager) Peers() []peer.ID {
|
func (mgr *dsAddrBook) PeersWithAddrs() []peer.ID {
|
||||||
q := query.Query{KeysOnly: true}
|
q := query.Query{KeysOnly: true}
|
||||||
results, err := mgr.ds.Query(q)
|
results, err := mgr.ds.Query(q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -212,13 +220,13 @@ func (mgr *DatastoreAddrManager) Peers() []peer.ID {
|
|||||||
|
|
||||||
// AddrStream returns a channel on which all new addresses discovered for a
|
// AddrStream returns a channel on which all new addresses discovered for a
|
||||||
// given peer ID will be published.
|
// given peer ID will be published.
|
||||||
func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
|
func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
|
||||||
initial := mgr.Addrs(p)
|
initial := mgr.Addrs(p)
|
||||||
return mgr.subsManager.AddrStream(ctx, p, initial)
|
return mgr.subsManager.AddrStream(ctx, p, initial)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearAddrs will delete all known addresses for a peer ID.
|
// ClearAddrs will delete all known addresses for a peer ID.
|
||||||
func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
|
func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
|
||||||
prefix := ds.NewKey(p.Pretty())
|
prefix := ds.NewKey(p.Pretty())
|
||||||
for i := 0; i < dsWriteRetries; i++ {
|
for i := 0; i < dsWriteRetries; i++ {
|
||||||
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
||||||
@ -254,8 +262,6 @@ func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
|
|||||||
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries)
|
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ttlmanager
|
|
||||||
|
|
||||||
type ttlentry struct {
|
type ttlentry struct {
|
||||||
TTL time.Duration
|
TTL time.Duration
|
||||||
ExpiresAt time.Time
|
ExpiresAt time.Time
|
69
pstoreds/ds_test.go
Normal file
69
pstoreds/ds_test.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package pstoreds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-ds-badger"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p-peerstore/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupBadgerDatastore(t testing.TB) (datastore.Batching, func()) {
|
||||||
|
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
ds, err := badger.NewDatastore(dataPath, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
closer := func() {
|
||||||
|
ds.Close()
|
||||||
|
os.RemoveAll(dataPath)
|
||||||
|
}
|
||||||
|
return ds, closer
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPeerstoreFactory(tb testing.TB) test.PeerstoreFactory {
|
||||||
|
return func() (peerstore.Peerstore, func()) {
|
||||||
|
ds, closeFunc := setupBadgerDatastore(tb)
|
||||||
|
|
||||||
|
ps, err := NewPeerstore(context.Background(), ds)
|
||||||
|
if err != nil {
|
||||||
|
tb.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ps, closeFunc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBadgerDsPeerstore(t *testing.T) {
|
||||||
|
test.TestPeerstore(t, newPeerstoreFactory(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBadgerDsAddrBook(t *testing.T) {
|
||||||
|
test.TestAddrBook(t, func() (peerstore.AddrBook, func()) {
|
||||||
|
ds, closeDB := setupBadgerDatastore(t)
|
||||||
|
|
||||||
|
mgr, err := NewAddrBook(context.Background(), ds, 100*time.Microsecond)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
closeFunc := func() {
|
||||||
|
mgr.Stop()
|
||||||
|
closeDB()
|
||||||
|
}
|
||||||
|
return mgr, closeFunc
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkBadgerDsPeerstore(b *testing.B) {
|
||||||
|
test.BenchmarkPeerstore(b, newPeerstoreFactory(b))
|
||||||
|
}
|
22
pstoreds/peerstore.go
Normal file
22
pstoreds/peerstore.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
package pstoreds
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewPeerstore creates a peerstore backed by the provided persistent datastore.
|
||||||
|
func NewPeerstore(ctx context.Context, ds datastore.Batching) (pstore.Peerstore, error) {
|
||||||
|
addrBook, err := NewAddrBook(ctx, ds, time.Second)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ps := pstore.NewPeerstore(pstoremem.NewKeyBook(), addrBook, pstoremem.NewPeerMetadata())
|
||||||
|
return ps, nil
|
||||||
|
}
|
@ -1,46 +1,20 @@
|
|||||||
package peerstore
|
package pstoremem
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"math"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
"github.com/libp2p/go-libp2p-peerstore/addr"
|
|
||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p-peerstore/addr"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var log = logging.Logger("peerstore")
|
||||||
|
|
||||||
// TempAddrTTL is the ttl used for a short lived address
|
|
||||||
TempAddrTTL = time.Second * 10
|
|
||||||
|
|
||||||
// ProviderAddrTTL is the TTL of an address we've received from a provider.
|
|
||||||
// This is also a temporary address, but lasts longer. After this expires,
|
|
||||||
// the records we return will require an extra lookup.
|
|
||||||
ProviderAddrTTL = time.Minute * 10
|
|
||||||
|
|
||||||
// RecentlyConnectedAddrTTL is used when we recently connected to a peer.
|
|
||||||
// It means that we are reasonably certain of the peer's address.
|
|
||||||
RecentlyConnectedAddrTTL = time.Minute * 10
|
|
||||||
|
|
||||||
// OwnObservedAddrTTL is used for our own external addresses observed by peers.
|
|
||||||
OwnObservedAddrTTL = time.Minute * 10
|
|
||||||
)
|
|
||||||
|
|
||||||
// Permanent TTLs (distinct so we can distinguish between them, constant as they
|
|
||||||
// are, in fact, permanent)
|
|
||||||
const (
|
|
||||||
// PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes).
|
|
||||||
PermanentAddrTTL = math.MaxInt64 - iota
|
|
||||||
|
|
||||||
// ConnectedAddrTTL is the ttl used for the addresses of a peer to whom
|
|
||||||
// we're connected directly. This is basically permanent, as we will
|
|
||||||
// clear them + re-add under a TempAddrTTL after disconnecting.
|
|
||||||
ConnectedAddrTTL
|
|
||||||
)
|
|
||||||
|
|
||||||
type expiringAddr struct {
|
type expiringAddr struct {
|
||||||
Addr ma.Multiaddr
|
Addr ma.Multiaddr
|
||||||
@ -54,61 +28,55 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool {
|
|||||||
|
|
||||||
type addrSlice []expiringAddr
|
type addrSlice []expiringAddr
|
||||||
|
|
||||||
// AddrManager manages addresses.
|
var _ pstore.AddrBook = (*memoryAddrBook)(nil)
|
||||||
// The zero-value is ready to be used.
|
|
||||||
type AddrManager struct {
|
// memoryAddrBook manages addresses.
|
||||||
addrmu sync.Mutex // guards addrs
|
type memoryAddrBook struct {
|
||||||
|
addrmu sync.Mutex
|
||||||
addrs map[peer.ID]addrSlice
|
addrs map[peer.ID]addrSlice
|
||||||
|
|
||||||
subManager *AddrSubManager
|
subManager *AddrSubManager
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensures the AddrManager is initialized.
|
func NewAddrBook() pstore.AddrBook {
|
||||||
// So we can use the zero value.
|
return &memoryAddrBook{
|
||||||
func (mgr *AddrManager) init() {
|
addrs: make(map[peer.ID]addrSlice),
|
||||||
if mgr.addrs == nil {
|
subManager: NewAddrSubManager(),
|
||||||
mgr.addrs = make(map[peer.ID]addrSlice)
|
|
||||||
}
|
|
||||||
if mgr.subManager == nil {
|
|
||||||
mgr.subManager = NewAddrSubManager()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *AddrManager) Peers() []peer.ID {
|
func (mab *memoryAddrBook) PeersWithAddrs() []peer.ID {
|
||||||
mgr.addrmu.Lock()
|
mab.addrmu.Lock()
|
||||||
defer mgr.addrmu.Unlock()
|
defer mab.addrmu.Unlock()
|
||||||
if mgr.addrs == nil {
|
if mab.addrs == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
pids := make([]peer.ID, 0, len(mgr.addrs))
|
pids := make([]peer.ID, 0, len(mab.addrs))
|
||||||
for pid := range mgr.addrs {
|
for pid := range mab.addrs {
|
||||||
pids = append(pids, pid)
|
pids = append(pids, pid)
|
||||||
}
|
}
|
||||||
return pids
|
return pids
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||||
func (mgr *AddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
func (mab *memoryAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||||
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
mab.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddAddrs gives AddrManager addresses to use, with a given ttl
|
// AddAddrs gives memoryAddrBook addresses to use, with a given ttl
|
||||||
// (time-to-live), after which the address is no longer valid.
|
// (time-to-live), after which the address is no longer valid.
|
||||||
// If the manager has a longer TTL, the operation is a no-op for that address
|
// If the manager has a longer TTL, the operation is a no-op for that address
|
||||||
func (mgr *AddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||||
mgr.addrmu.Lock()
|
mab.addrmu.Lock()
|
||||||
defer mgr.addrmu.Unlock()
|
defer mab.addrmu.Unlock()
|
||||||
|
|
||||||
// if ttl is zero, exit. nothing to do.
|
// if ttl is zero, exit. nothing to do.
|
||||||
if ttl <= 0 {
|
if ttl <= 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// so zero value can be used
|
oldAddrs := mab.addrs[p]
|
||||||
mgr.init()
|
|
||||||
|
|
||||||
oldAddrs := mgr.addrs[p]
|
|
||||||
amap := make(map[string]expiringAddr, len(oldAddrs))
|
amap := make(map[string]expiringAddr, len(oldAddrs))
|
||||||
for _, ea := range oldAddrs {
|
for _, ea := range oldAddrs {
|
||||||
amap[string(ea.Addr.Bytes())] = ea
|
amap[string(ea.Addr.Bytes())] = ea
|
||||||
@ -127,31 +95,28 @@ func (mgr *AddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
|
|||||||
if !found || exp.After(a.Expires) {
|
if !found || exp.After(a.Expires) {
|
||||||
amap[addrstr] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
|
amap[addrstr] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
|
||||||
|
|
||||||
mgr.subManager.BroadcastAddr(p, addr)
|
mab.subManager.BroadcastAddr(p, addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
newAddrs := make([]expiringAddr, 0, len(amap))
|
newAddrs := make([]expiringAddr, 0, len(amap))
|
||||||
for _, ea := range amap {
|
for _, ea := range amap {
|
||||||
newAddrs = append(newAddrs, ea)
|
newAddrs = append(newAddrs, ea)
|
||||||
}
|
}
|
||||||
mgr.addrs[p] = newAddrs
|
mab.addrs[p] = newAddrs
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
|
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
|
||||||
func (mgr *AddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
func (mab *memoryAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||||
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
mab.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
|
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
|
||||||
// This is used when we receive the best estimate of the validity of an address.
|
// This is used when we receive the best estimate of the validity of an address.
|
||||||
func (mgr *AddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||||
mgr.addrmu.Lock()
|
mab.addrmu.Lock()
|
||||||
defer mgr.addrmu.Unlock()
|
defer mab.addrmu.Unlock()
|
||||||
|
|
||||||
// so zero value can be used
|
oldAddrs := mab.addrs[p]
|
||||||
mgr.init()
|
|
||||||
|
|
||||||
oldAddrs := mgr.addrs[p]
|
|
||||||
amap := make(map[string]expiringAddr, len(oldAddrs))
|
amap := make(map[string]expiringAddr, len(oldAddrs))
|
||||||
for _, ea := range oldAddrs {
|
for _, ea := range oldAddrs {
|
||||||
amap[string(ea.Addr.Bytes())] = ea
|
amap[string(ea.Addr.Bytes())] = ea
|
||||||
@ -169,7 +134,7 @@ func (mgr *AddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
|
|||||||
if ttl > 0 {
|
if ttl > 0 {
|
||||||
amap[addrs] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
|
amap[addrs] = expiringAddr{Addr: addr, Expires: exp, TTL: ttl}
|
||||||
|
|
||||||
mgr.subManager.BroadcastAddr(p, addr)
|
mab.subManager.BroadcastAddr(p, addr)
|
||||||
} else {
|
} else {
|
||||||
delete(amap, addrs)
|
delete(amap, addrs)
|
||||||
}
|
}
|
||||||
@ -178,25 +143,26 @@ func (mgr *AddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durat
|
|||||||
for _, ea := range amap {
|
for _, ea := range amap {
|
||||||
newAddrs = append(newAddrs, ea)
|
newAddrs = append(newAddrs, ea)
|
||||||
}
|
}
|
||||||
mgr.addrs[p] = newAddrs
|
mab.addrs[p] = newAddrs
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateAddrs updates the addresses associated with the given peer that have
|
// UpdateAddrs updates the addresses associated with the given peer that have
|
||||||
// the given oldTTL to have the given newTTL.
|
// the given oldTTL to have the given newTTL.
|
||||||
func (mgr *AddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
||||||
mgr.addrmu.Lock()
|
mab.addrmu.Lock()
|
||||||
defer mgr.addrmu.Unlock()
|
defer mab.addrmu.Unlock()
|
||||||
|
|
||||||
if mgr.addrs == nil {
|
if mab.addrs == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
addrs, found := mgr.addrs[p]
|
addrs, found := mab.addrs[p]
|
||||||
if !found {
|
if !found {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
exp := time.Now().Add(newTTL)
|
exp := time.Now().Add(newTTL)
|
||||||
|
// TODO: RK - Shorthand.
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
aexp := &addrs[i]
|
aexp := &addrs[i]
|
||||||
if oldTTL == aexp.TTL {
|
if oldTTL == aexp.TTL {
|
||||||
@ -207,16 +173,16 @@ func (mgr *AddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Addresses returns all known (and valid) addresses for a given
|
// Addresses returns all known (and valid) addresses for a given
|
||||||
func (mgr *AddrManager) Addrs(p peer.ID) []ma.Multiaddr {
|
func (mab *memoryAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
||||||
mgr.addrmu.Lock()
|
mab.addrmu.Lock()
|
||||||
defer mgr.addrmu.Unlock()
|
defer mab.addrmu.Unlock()
|
||||||
|
|
||||||
// not initialized? nothing to give.
|
// not initialized? nothing to give.
|
||||||
if mgr.addrs == nil {
|
if mab.addrs == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
maddrs, found := mgr.addrs[p]
|
maddrs, found := mab.addrs[p]
|
||||||
if !found {
|
if !found {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -233,40 +199,52 @@ func (mgr *AddrManager) Addrs(p peer.ID) []ma.Multiaddr {
|
|||||||
|
|
||||||
// clean up the expired ones.
|
// clean up the expired ones.
|
||||||
if len(cleaned) == 0 {
|
if len(cleaned) == 0 {
|
||||||
delete(mgr.addrs, p)
|
delete(mab.addrs, p)
|
||||||
} else {
|
} else {
|
||||||
mgr.addrs[p] = cleaned
|
mab.addrs[p] = cleaned
|
||||||
}
|
}
|
||||||
return good
|
return good
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearAddrs removes all previously stored addresses
|
// ClearAddrs removes all previously stored addresses
|
||||||
func (mgr *AddrManager) ClearAddrs(p peer.ID) {
|
func (mab *memoryAddrBook) ClearAddrs(p peer.ID) {
|
||||||
mgr.addrmu.Lock()
|
mab.addrmu.Lock()
|
||||||
defer mgr.addrmu.Unlock()
|
defer mab.addrmu.Unlock()
|
||||||
mgr.init()
|
|
||||||
|
|
||||||
delete(mgr.addrs, p)
|
delete(mab.addrs, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddrStream returns a channel on which all new addresses discovered for a
|
// AddrStream returns a channel on which all new addresses discovered for a
|
||||||
// given peer ID will be published.
|
// given peer ID will be published.
|
||||||
func (mgr *AddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
|
func (mab *memoryAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
|
||||||
mgr.addrmu.Lock()
|
mab.addrmu.Lock()
|
||||||
defer mgr.addrmu.Unlock()
|
defer mab.addrmu.Unlock()
|
||||||
mgr.init()
|
|
||||||
|
|
||||||
baseaddrslice := mgr.addrs[p]
|
baseaddrslice := mab.addrs[p]
|
||||||
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
|
initial := make([]ma.Multiaddr, 0, len(baseaddrslice))
|
||||||
for _, a := range baseaddrslice {
|
for _, a := range baseaddrslice {
|
||||||
initial = append(initial, a.Addr)
|
initial = append(initial, a.Addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return mgr.subManager.AddrStream(ctx, p, initial)
|
return mab.subManager.AddrStream(ctx, p, initial)
|
||||||
|
}
|
||||||
|
|
||||||
|
type addrSub struct {
|
||||||
|
pubch chan ma.Multiaddr
|
||||||
|
lk sync.Mutex
|
||||||
|
buffer []ma.Multiaddr
|
||||||
|
ctx context.Context
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *addrSub) pubAddr(a ma.Multiaddr) {
|
||||||
|
select {
|
||||||
|
case s.pubch <- a:
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// An abstracted, pub-sub manager for address streams. Extracted from
|
// An abstracted, pub-sub manager for address streams. Extracted from
|
||||||
// AddrManager in order to support additional implementations.
|
// memoryAddrBook in order to support additional implementations.
|
||||||
type AddrSubManager struct {
|
type AddrSubManager struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
subs map[peer.ID][]*addrSub
|
subs map[peer.ID][]*addrSub
|
||||||
@ -284,6 +262,7 @@ func NewAddrSubManager() *AddrSubManager {
|
|||||||
func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
|
func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
|
||||||
mgr.mu.Lock()
|
mgr.mu.Lock()
|
||||||
defer mgr.mu.Unlock()
|
defer mgr.mu.Unlock()
|
||||||
|
|
||||||
subs := mgr.subs[p]
|
subs := mgr.subs[p]
|
||||||
if len(subs) == 1 {
|
if len(subs) == 1 {
|
||||||
if subs[0] != s {
|
if subs[0] != s {
|
||||||
@ -292,6 +271,7 @@ func (mgr *AddrSubManager) removeSub(p peer.ID, s *addrSub) {
|
|||||||
delete(mgr.subs, p)
|
delete(mgr.subs, p)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, v := range subs {
|
for i, v := range subs {
|
||||||
if v == s {
|
if v == s {
|
||||||
subs[i] = subs[len(subs)-1]
|
subs[i] = subs[len(subs)-1]
|
||||||
@ -318,7 +298,6 @@ func (mgr *AddrSubManager) BroadcastAddr(p peer.ID, addr ma.Multiaddr) {
|
|||||||
// channel with any addresses we might already have on file.
|
// channel with any addresses we might already have on file.
|
||||||
func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr {
|
func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []ma.Multiaddr) <-chan ma.Multiaddr {
|
||||||
sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}
|
sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}
|
||||||
|
|
||||||
out := make(chan ma.Multiaddr)
|
out := make(chan ma.Multiaddr)
|
||||||
|
|
||||||
mgr.mu.Lock()
|
mgr.mu.Lock()
|
||||||
@ -380,17 +359,3 @@ func (mgr *AddrSubManager) AddrStream(ctx context.Context, p peer.ID, initial []
|
|||||||
|
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
type addrSub struct {
|
|
||||||
pubch chan ma.Multiaddr
|
|
||||||
lk sync.Mutex
|
|
||||||
buffer []ma.Multiaddr
|
|
||||||
ctx context.Context
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *addrSub) pubAddr(a ma.Multiaddr) {
|
|
||||||
select {
|
|
||||||
case s.pubch <- a:
|
|
||||||
case <-s.ctx.Done():
|
|
||||||
}
|
|
||||||
}
|
|
26
pstoremem/inmem_test.go
Normal file
26
pstoremem/inmem_test.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package pstoremem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
"github.com/libp2p/go-libp2p-peerstore/test"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInMemoryPeerstore(t *testing.T) {
|
||||||
|
test.TestPeerstore(t, func() (pstore.Peerstore, func()) {
|
||||||
|
return NewPeerstore(), nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInMemoryAddrBook(t *testing.T) {
|
||||||
|
test.TestAddrBook(t, func() (pstore.AddrBook, func()) {
|
||||||
|
return NewAddrBook(), nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkInMemoryPeerstore(b *testing.B) {
|
||||||
|
test.BenchmarkPeerstore(b, func() (pstore.Peerstore, func()) {
|
||||||
|
return NewPeerstore(), nil
|
||||||
|
})
|
||||||
|
}
|
93
pstoremem/keybook.go
Normal file
93
pstoremem/keybook.go
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
package pstoremem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
ic "github.com/libp2p/go-libp2p-crypto"
|
||||||
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type memoryKeyBook struct {
|
||||||
|
sync.RWMutex // same lock. wont happen a ton.
|
||||||
|
pks map[peer.ID]ic.PubKey
|
||||||
|
sks map[peer.ID]ic.PrivKey
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ pstore.KeyBook = (*memoryKeyBook)(nil)
|
||||||
|
|
||||||
|
// noop new, but in the future we may want to do some init work.
|
||||||
|
func NewKeyBook() pstore.KeyBook {
|
||||||
|
return &memoryKeyBook{
|
||||||
|
pks: map[peer.ID]ic.PubKey{},
|
||||||
|
sks: map[peer.ID]ic.PrivKey{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mkb *memoryKeyBook) PeersWithKeys() []peer.ID {
|
||||||
|
mkb.RLock()
|
||||||
|
ps := make([]peer.ID, 0, len(mkb.pks)+len(mkb.sks))
|
||||||
|
for p := range mkb.pks {
|
||||||
|
ps = append(ps, p)
|
||||||
|
}
|
||||||
|
for p := range mkb.sks {
|
||||||
|
if _, found := mkb.pks[p]; !found {
|
||||||
|
ps = append(ps, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mkb.RUnlock()
|
||||||
|
return ps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mkb *memoryKeyBook) PubKey(p peer.ID) ic.PubKey {
|
||||||
|
mkb.RLock()
|
||||||
|
pk := mkb.pks[p]
|
||||||
|
mkb.RUnlock()
|
||||||
|
if pk != nil {
|
||||||
|
return pk
|
||||||
|
}
|
||||||
|
pk, err := p.ExtractPublicKey()
|
||||||
|
if err == nil && pk != nil {
|
||||||
|
mkb.Lock()
|
||||||
|
mkb.pks[p] = pk
|
||||||
|
mkb.Unlock()
|
||||||
|
}
|
||||||
|
return pk
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mkb *memoryKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
|
||||||
|
// check it's correct first
|
||||||
|
if !p.MatchesPublicKey(pk) {
|
||||||
|
return errors.New("ID does not match PublicKey")
|
||||||
|
}
|
||||||
|
|
||||||
|
mkb.Lock()
|
||||||
|
mkb.pks[p] = pk
|
||||||
|
mkb.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mkb *memoryKeyBook) PrivKey(p peer.ID) ic.PrivKey {
|
||||||
|
mkb.RLock()
|
||||||
|
sk := mkb.sks[p]
|
||||||
|
mkb.RUnlock()
|
||||||
|
return sk
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mkb *memoryKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
|
||||||
|
if sk == nil {
|
||||||
|
return errors.New("sk is nil (PrivKey)")
|
||||||
|
}
|
||||||
|
|
||||||
|
// check it's correct first
|
||||||
|
if !p.MatchesPrivateKey(sk) {
|
||||||
|
return errors.New("ID does not match PrivateKey")
|
||||||
|
}
|
||||||
|
|
||||||
|
mkb.Lock()
|
||||||
|
mkb.sks[p] = sk
|
||||||
|
mkb.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
45
pstoremem/metadata.go
Normal file
45
pstoremem/metadata.go
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package pstoremem
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
type memoryPeerMetadata struct {
|
||||||
|
// store other data, like versions
|
||||||
|
//ds ds.ThreadSafeDatastore
|
||||||
|
// TODO: use a datastore for this
|
||||||
|
ds map[string]interface{}
|
||||||
|
dslock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPeerMetadata() pstore.PeerMetadata {
|
||||||
|
return &memoryPeerMetadata{
|
||||||
|
ds: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *memoryPeerMetadata) Put(p peer.ID, key string, val interface{}) error {
|
||||||
|
//dsk := ds.NewKey(string(p) + "/" + key)
|
||||||
|
//return ps.ds.Put(dsk, val)
|
||||||
|
ps.dslock.Lock()
|
||||||
|
defer ps.dslock.Unlock()
|
||||||
|
ps.ds[string(p)+"/"+key] = val
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *memoryPeerMetadata) Get(p peer.ID, key string) (interface{}, error) {
|
||||||
|
//dsk := ds.NewKey(string(p) + "/" + key)
|
||||||
|
//return ps.ds.Get(dsk)
|
||||||
|
|
||||||
|
ps.dslock.Lock()
|
||||||
|
defer ps.dslock.Unlock()
|
||||||
|
i, ok := ps.ds[string(p)+"/"+key]
|
||||||
|
if !ok {
|
||||||
|
return nil, pstore.ErrNotFound
|
||||||
|
}
|
||||||
|
return i, nil
|
||||||
|
}
|
11
pstoremem/peerstore.go
Normal file
11
pstoremem/peerstore.go
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
package pstoremem
|
||||||
|
|
||||||
|
import pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
|
||||||
|
// NewPeerstore creates an in-memory threadsafe collection of peers.
|
||||||
|
func NewPeerstore() pstore.Peerstore {
|
||||||
|
return pstore.NewPeerstore(
|
||||||
|
NewKeyBook(),
|
||||||
|
NewAddrBook(),
|
||||||
|
NewPeerMetadata())
|
||||||
|
}
|
@ -5,7 +5,7 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
ks "github.com/whyrusleeping/go-keyspace"
|
ks "github.com/whyrusleeping/go-keyspace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package queue
|
package queue
|
||||||
|
|
||||||
import peer "github.com/libp2p/go-libp2p-peer"
|
import "github.com/libp2p/go-libp2p-peer"
|
||||||
|
|
||||||
// PeerQueue maintains a set of peers ordered according to a metric.
|
// PeerQueue maintains a set of peers ordered according to a metric.
|
||||||
// Implementations of PeerQueue could order peers based on distances along
|
// Implementations of PeerQueue could order peers based on distances along
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
mh "github.com/multiformats/go-multihash"
|
mh "github.com/multiformats/go-multihash"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("peerqueue")
|
var log = logging.Logger("peerqueue")
|
||||||
|
280
test/addr_book_suite.go
Normal file
280
test/addr_book_suite.go
Normal file
@ -0,0 +1,280 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){
|
||||||
|
"Addresses": testAddresses,
|
||||||
|
"Clear": testClearWorks,
|
||||||
|
"SetNegativeTTLClears": testSetNegativeTTLClears,
|
||||||
|
"UpdateTTLs": testUpdateTTLs,
|
||||||
|
"NilAddrsDontBreak": testNilAddrsDontBreak,
|
||||||
|
"AddressesExpire": testAddressesExpire,
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddrBookFactory func() (pstore.AddrBook, func())
|
||||||
|
|
||||||
|
func TestAddrBook(t *testing.T, factory AddrBookFactory) {
|
||||||
|
for name, test := range addressBookSuite {
|
||||||
|
// Create a new peerstore.
|
||||||
|
ab, closeFunc := factory()
|
||||||
|
|
||||||
|
// Run the test.
|
||||||
|
t.Run(name, test(ab))
|
||||||
|
|
||||||
|
// Cleanup.
|
||||||
|
if closeFunc != nil {
|
||||||
|
closeFunc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testAddresses(m pstore.AddrBook) func(*testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||||
|
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
|
||||||
|
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
|
||||||
|
id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn")
|
||||||
|
id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km")
|
||||||
|
|
||||||
|
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||||
|
ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111")
|
||||||
|
ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
||||||
|
ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111")
|
||||||
|
ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222")
|
||||||
|
ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
||||||
|
ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111")
|
||||||
|
ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222")
|
||||||
|
ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333")
|
||||||
|
ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
||||||
|
ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111")
|
||||||
|
ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222")
|
||||||
|
ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333")
|
||||||
|
ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444")
|
||||||
|
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
||||||
|
|
||||||
|
ttl := time.Hour
|
||||||
|
m.AddAddr(id1, ma11, ttl)
|
||||||
|
|
||||||
|
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl)
|
||||||
|
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) // idempotency
|
||||||
|
|
||||||
|
m.AddAddr(id3, ma31, ttl)
|
||||||
|
m.AddAddr(id3, ma32, ttl)
|
||||||
|
m.AddAddr(id3, ma33, ttl)
|
||||||
|
m.AddAddr(id3, ma33, ttl) // idempotency
|
||||||
|
m.AddAddr(id3, ma33, ttl)
|
||||||
|
|
||||||
|
m.AddAddrs(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // multiple
|
||||||
|
|
||||||
|
m.AddAddrs(id5, []ma.Multiaddr{ma21, ma22}, ttl) // clearing
|
||||||
|
m.AddAddrs(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // clearing
|
||||||
|
m.ClearAddrs(id5)
|
||||||
|
m.AddAddrs(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ttl) // clearing
|
||||||
|
|
||||||
|
// test the Addresses return value
|
||||||
|
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
||||||
|
testHas(t, []ma.Multiaddr{ma31, ma32, ma33}, m.Addrs(id3))
|
||||||
|
testHas(t, []ma.Multiaddr{ma41, ma42, ma43, ma44}, m.Addrs(id4))
|
||||||
|
testHas(t, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, m.Addrs(id5))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testClearWorks(m pstore.AddrBook) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||||
|
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
||||||
|
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||||
|
ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
||||||
|
ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
||||||
|
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
||||||
|
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
||||||
|
|
||||||
|
m.AddAddr(id1, ma11, time.Hour)
|
||||||
|
m.AddAddr(id1, ma12, time.Hour)
|
||||||
|
m.AddAddr(id1, ma13, time.Hour)
|
||||||
|
m.AddAddr(id2, ma24, time.Hour)
|
||||||
|
m.AddAddr(id2, ma25, time.Hour)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.ClearAddrs(id1)
|
||||||
|
m.ClearAddrs(id2)
|
||||||
|
|
||||||
|
testHas(t, nil, m.Addrs(id1))
|
||||||
|
testHas(t, nil, m.Addrs(id2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||||
|
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||||
|
|
||||||
|
m.SetAddr(id1, ma11, time.Hour)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
|
||||||
|
|
||||||
|
m.SetAddr(id1, ma11, -1)
|
||||||
|
|
||||||
|
testHas(t, nil, m.Addrs(id1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||||
|
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
||||||
|
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||||
|
ma12 := MA(t, "/ip4/1.2.3.1/tcp/1112")
|
||||||
|
ma21 := MA(t, "/ip4/1.2.3.1/tcp/1121")
|
||||||
|
ma22 := MA(t, "/ip4/1.2.3.1/tcp/1122")
|
||||||
|
|
||||||
|
// Shouldn't panic.
|
||||||
|
m.UpdateAddrs(id1, time.Hour, time.Minute)
|
||||||
|
|
||||||
|
m.SetAddr(id1, ma11, time.Hour)
|
||||||
|
m.SetAddr(id1, ma12, time.Minute)
|
||||||
|
|
||||||
|
// Shouldn't panic.
|
||||||
|
m.UpdateAddrs(id2, time.Hour, time.Minute)
|
||||||
|
|
||||||
|
m.SetAddr(id2, ma21, time.Hour)
|
||||||
|
m.SetAddr(id2, ma22, time.Minute)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.UpdateAddrs(id1, time.Hour, time.Second)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
||||||
|
|
||||||
|
time.Sleep(1200 * time.Millisecond)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.UpdateAddrs(id2, time.Hour, time.Second)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
||||||
|
|
||||||
|
time.Sleep(1200 * time.Millisecond)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma22}, m.Addrs(id2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testNilAddrsDontBreak(m pstore.AddrBook) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||||
|
m.SetAddr(id1, nil, time.Hour)
|
||||||
|
m.AddAddr(id1, nil, time.Hour)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||||
|
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
||||||
|
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||||
|
ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
||||||
|
ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
||||||
|
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
||||||
|
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
||||||
|
|
||||||
|
m.AddAddr(id1, ma11, time.Hour)
|
||||||
|
m.AddAddr(id1, ma12, time.Hour)
|
||||||
|
m.AddAddr(id1, ma13, time.Hour)
|
||||||
|
m.AddAddr(id2, ma24, time.Hour)
|
||||||
|
m.AddAddr(id2, ma25, time.Hour)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.SetAddr(id1, ma11, 2*time.Hour)
|
||||||
|
m.SetAddr(id1, ma12, 2*time.Hour)
|
||||||
|
m.SetAddr(id1, ma13, 2*time.Hour)
|
||||||
|
m.SetAddr(id2, ma24, 2*time.Hour)
|
||||||
|
m.SetAddr(id2, ma25, 2*time.Hour)
|
||||||
|
|
||||||
|
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.SetAddr(id1, ma11, time.Millisecond)
|
||||||
|
<-time.After(time.Millisecond * 5)
|
||||||
|
testHas(t, []ma.Multiaddr{ma12, ma13}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.SetAddr(id1, ma13, time.Millisecond)
|
||||||
|
<-time.After(time.Millisecond * 5)
|
||||||
|
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.SetAddr(id2, ma24, time.Millisecond)
|
||||||
|
<-time.After(time.Millisecond * 5)
|
||||||
|
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, []ma.Multiaddr{ma25}, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.SetAddr(id2, ma25, time.Millisecond)
|
||||||
|
<-time.After(time.Millisecond * 5)
|
||||||
|
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||||
|
testHas(t, nil, m.Addrs(id2))
|
||||||
|
|
||||||
|
m.SetAddr(id1, ma12, time.Millisecond)
|
||||||
|
<-time.After(time.Millisecond * 5)
|
||||||
|
testHas(t, nil, m.Addrs(id1))
|
||||||
|
testHas(t, nil, m.Addrs(id2))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func IDS(t *testing.T, ids string) peer.ID {
|
||||||
|
t.Helper()
|
||||||
|
id, err := peer.IDB58Decode(ids)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("id %q is bad: %s", ids, err)
|
||||||
|
}
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
|
||||||
|
func MA(t *testing.T, m string) ma.Multiaddr {
|
||||||
|
t.Helper()
|
||||||
|
maddr, err := ma.NewMultiaddr(m)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return maddr
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
|
||||||
|
t.Helper()
|
||||||
|
if len(exp) != len(act) {
|
||||||
|
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, a := range exp {
|
||||||
|
found := false
|
||||||
|
|
||||||
|
for _, b := range act {
|
||||||
|
if a.Equal(b) {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("expected address %s not found", a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package peerstore
|
package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
320
test/peerstore_suite.go
Normal file
320
test/peerstore_suite.go
Normal file
@ -0,0 +1,320 @@
|
|||||||
|
package test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-crypto"
|
||||||
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
var peerstoreSuite = map[string]func(book pstore.Peerstore) func(*testing.T){
|
||||||
|
"AddrStream": testAddrStream,
|
||||||
|
"GetStreamBeforePeerAdded": testGetStreamBeforePeerAdded,
|
||||||
|
"AddStreamDuplicates": testAddrStreamDuplicates,
|
||||||
|
"PeerstoreProtoStore": testPeerstoreProtoStore,
|
||||||
|
"BasicPeerstore": testBasicPeerstore,
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeerstoreFactory func() (pstore.Peerstore, func())
|
||||||
|
|
||||||
|
func TestPeerstore(t *testing.T, factory PeerstoreFactory) {
|
||||||
|
for name, test := range peerstoreSuite {
|
||||||
|
// Create a new peerstore.
|
||||||
|
ps, closeFunc := factory()
|
||||||
|
|
||||||
|
// Run the test.
|
||||||
|
t.Run(name, test(ps))
|
||||||
|
|
||||||
|
// Cleanup.
|
||||||
|
if closeFunc != nil {
|
||||||
|
closeFunc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory) {
|
||||||
|
ps, closeFunc := factory()
|
||||||
|
|
||||||
|
b.Run("Peerstore", benchmarkPeerstore(ps))
|
||||||
|
|
||||||
|
if closeFunc != nil {
|
||||||
|
closeFunc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testAddrStream(ps pstore.Peerstore) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
addrs, pid := getAddrs(t, 100), peer.ID("testpeer")
|
||||||
|
ps.AddAddrs(pid, addrs[:10], time.Hour)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
addrch := ps.AddrStream(ctx, pid)
|
||||||
|
|
||||||
|
// while that subscription is active, publish ten more addrs
|
||||||
|
// this tests that it doesnt hang
|
||||||
|
for i := 10; i < 20; i++ {
|
||||||
|
ps.AddAddr(pid, addrs[i], time.Hour)
|
||||||
|
}
|
||||||
|
|
||||||
|
// now receive them (without hanging)
|
||||||
|
timeout := time.After(time.Second * 10)
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
select {
|
||||||
|
case <-addrch:
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatal("timed out")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start a second stream
|
||||||
|
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||||
|
addrch2 := ps.AddrStream(ctx2, pid)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
// now send the rest of the addresses
|
||||||
|
for _, a := range addrs[20:80] {
|
||||||
|
ps.AddAddr(pid, a, time.Hour)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// receive some concurrently with the goroutine
|
||||||
|
timeout = time.After(time.Second * 10)
|
||||||
|
for i := 0; i < 40; i++ {
|
||||||
|
select {
|
||||||
|
case <-addrch:
|
||||||
|
case <-timeout:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
<-done
|
||||||
|
|
||||||
|
// receive some more after waiting for that goroutine to complete
|
||||||
|
timeout = time.After(time.Second * 10)
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
select {
|
||||||
|
case <-addrch:
|
||||||
|
case <-timeout:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// now cancel it
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// now check the *second* subscription. We should see 80 addresses.
|
||||||
|
for i := 0; i < 80; i++ {
|
||||||
|
<-addrch2
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel2()
|
||||||
|
|
||||||
|
// and add a few more addresses it doesnt hang afterwards
|
||||||
|
for _, a := range addrs[80:] {
|
||||||
|
ps.AddAddr(pid, a, time.Hour)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testGetStreamBeforePeerAdded(ps pstore.Peerstore) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
addrs, pid := getAddrs(t, 10), peer.ID("testpeer")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ach := ps.AddrStream(ctx, pid)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
ps.AddAddr(pid, addrs[i], time.Hour)
|
||||||
|
}
|
||||||
|
|
||||||
|
received := make(map[string]bool)
|
||||||
|
var count int
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
a, ok := <-ach
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("channel shouldnt be closed yet")
|
||||||
|
}
|
||||||
|
if a == nil {
|
||||||
|
t.Fatal("got a nil address, thats weird")
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
if received[a.String()] {
|
||||||
|
t.Fatal("received duplicate address")
|
||||||
|
}
|
||||||
|
received[a.String()] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ach:
|
||||||
|
t.Fatal("shouldnt have received any more addresses")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if count != 10 {
|
||||||
|
t.Fatal("should have received exactly ten addresses, got ", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, a := range addrs {
|
||||||
|
if !received[a.String()] {
|
||||||
|
t.Log(received)
|
||||||
|
t.Fatalf("expected to receive address %s but didnt", a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testAddrStreamDuplicates(ps pstore.Peerstore) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
addrs, pid := getAddrs(t, 10), peer.ID("testpeer")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
ach := ps.AddrStream(ctx, pid)
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
ps.AddAddr(pid, addrs[i], time.Hour)
|
||||||
|
ps.AddAddr(pid, addrs[rand.Intn(10)], time.Hour)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure that all addresses get processed before context is cancelled
|
||||||
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
received := make(map[string]bool)
|
||||||
|
var count int
|
||||||
|
for a := range ach {
|
||||||
|
if a == nil {
|
||||||
|
t.Fatal("got a nil address, thats weird")
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
if received[a.String()] {
|
||||||
|
t.Fatal("received duplicate address")
|
||||||
|
}
|
||||||
|
received[a.String()] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if count != 10 {
|
||||||
|
t.Fatal("should have received exactly ten addresses")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testPeerstoreProtoStore(ps pstore.Peerstore) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
p1, protos := peer.ID("TESTPEER"), []string{"a", "b", "c", "d"}
|
||||||
|
|
||||||
|
err := ps.AddProtocols(p1, protos...)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := ps.GetProtocols(p1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(out) != len(protos) {
|
||||||
|
t.Fatal("got wrong number of protocols back")
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Strings(out)
|
||||||
|
for i, p := range protos {
|
||||||
|
if out[i] != p {
|
||||||
|
t.Fatal("got wrong protocol")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
supported, err := ps.SupportsProtocols(p1, "q", "w", "a", "y", "b")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(supported) != 2 {
|
||||||
|
t.Fatal("only expected 2 supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
if supported[0] != "a" || supported[1] != "b" {
|
||||||
|
t.Fatal("got wrong supported array: ", supported)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ps.SetProtocols(p1, "other")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
supported, err = ps.SupportsProtocols(p1, "q", "w", "a", "y", "b")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(supported) != 0 {
|
||||||
|
t.Fatal("none of those protocols should have been supported")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testBasicPeerstore(ps pstore.Peerstore) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
var pids []peer.ID
|
||||||
|
addrs := getAddrs(t, 10)
|
||||||
|
|
||||||
|
for _, a := range addrs {
|
||||||
|
priv, _, _ := crypto.GenerateKeyPair(crypto.RSA, 512)
|
||||||
|
p, _ := peer.IDFromPrivateKey(priv)
|
||||||
|
pids = append(pids, p)
|
||||||
|
ps.AddAddr(p, a, pstore.PermanentAddrTTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
peers := ps.Peers()
|
||||||
|
if len(peers) != 10 {
|
||||||
|
t.Fatal("expected ten peers, got", len(peers))
|
||||||
|
}
|
||||||
|
|
||||||
|
pinfo := ps.PeerInfo(pids[0])
|
||||||
|
if !pinfo.Addrs[0].Equal(addrs[0]) {
|
||||||
|
t.Fatal("stored wrong address")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkPeerstore(ps pstore.Peerstore) func(*testing.B) {
|
||||||
|
return func(b *testing.B) {
|
||||||
|
addrs := make(chan *peerpair, 100)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
go addressProducer(ctx, b, addrs)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
pp := <-addrs
|
||||||
|
pid := peer.ID(pp.ID)
|
||||||
|
ps.AddAddr(pid, pp.Addr, pstore.PermanentAddrTTL)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAddrs(t *testing.T, n int) []ma.Multiaddr {
|
||||||
|
var addrs []ma.Multiaddr
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
addrs = append(addrs, a)
|
||||||
|
}
|
||||||
|
return addrs
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package testutil
|
package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
@ -6,7 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
ci "github.com/libp2p/go-libp2p-crypto"
|
ci "github.com/libp2p/go-libp2p-crypto"
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
"github.com/libp2p/go-libp2p-peer"
|
||||||
mh "github.com/multiformats/go-multihash"
|
mh "github.com/multiformats/go-multihash"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user