mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2025-03-30 13:50:07 +08:00
move to p2p dir
This commit is contained in:
commit
e41f12f9ea
70
addr/addrsrcs.go
Normal file
70
addr/addrsrcs.go
Normal file
@ -0,0 +1,70 @@
|
||||
// Package addr provides utility functions to handle peer addresses.
|
||||
package addr
|
||||
|
||||
import (
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
||||
// AddrSource is a source of addresses. It allows clients to retrieve
|
||||
// a set of addresses at a last possible moment in time. It is used
|
||||
// to query a set of addresses that may change over time, as a result
|
||||
// of the network changing interfaces or mappings.
|
||||
type Source interface {
|
||||
Addrs() []ma.Multiaddr
|
||||
}
|
||||
|
||||
// CombineSources returns a new AddrSource which is the
|
||||
// concatenation of all input AddrSources:
|
||||
//
|
||||
// combined := CombinedSources(a, b)
|
||||
// combined.Addrs() // append(a.Addrs(), b.Addrs()...)
|
||||
//
|
||||
func CombineSources(srcs ...Source) Source {
|
||||
return combinedAS(srcs)
|
||||
}
|
||||
|
||||
type combinedAS []Source
|
||||
|
||||
func (cas combinedAS) Addrs() []ma.Multiaddr {
|
||||
var addrs []ma.Multiaddr
|
||||
for _, s := range cas {
|
||||
addrs = append(addrs, s.Addrs()...)
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// UniqueSource returns a new AddrSource which omits duplicate
|
||||
// addresses from the inputs:
|
||||
//
|
||||
// unique := UniqueSource(a, b)
|
||||
// unique.Addrs() // append(a.Addrs(), b.Addrs()...)
|
||||
// // but only adds each addr once.
|
||||
//
|
||||
func UniqueSource(srcs ...Source) Source {
|
||||
return uniqueAS(srcs)
|
||||
}
|
||||
|
||||
type uniqueAS []Source
|
||||
|
||||
func (uas uniqueAS) Addrs() []ma.Multiaddr {
|
||||
seen := make(map[string]struct{})
|
||||
var addrs []ma.Multiaddr
|
||||
for _, s := range uas {
|
||||
for _, a := range s.Addrs() {
|
||||
s := a.String()
|
||||
if _, found := seen[s]; !found {
|
||||
addrs = append(addrs, a)
|
||||
seen[s] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
// Slice is a simple slice of addresses that implements
|
||||
// the AddrSource interface.
|
||||
type Slice []ma.Multiaddr
|
||||
|
||||
func (as Slice) Addrs() []ma.Multiaddr {
|
||||
return as
|
||||
}
|
78
addr/addrsrcs_test.go
Normal file
78
addr/addrsrcs_test.go
Normal file
@ -0,0 +1,78 @@
|
||||
package addr
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
||||
func newAddrOrFatal(t *testing.T, s string) ma.Multiaddr {
|
||||
a, err := ma.NewMultiaddr(s)
|
||||
if err != nil {
|
||||
t.Fatal("error parsing multiaddr", err)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func newAddrs(t *testing.T, n int) []ma.Multiaddr {
|
||||
addrs := make([]ma.Multiaddr, n)
|
||||
for i := 0; i < n; i++ {
|
||||
s := fmt.Sprintf("/ip4/1.2.3.4/tcp/%d", i)
|
||||
addrs[i] = newAddrOrFatal(t, s)
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
func addrSetsSame(a, b []ma.Multiaddr) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i, aa := range a {
|
||||
bb := b[i]
|
||||
if !aa.Equal(bb) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func addrSourcesSame(a, b Source) bool {
|
||||
return addrSetsSame(a.Addrs(), b.Addrs())
|
||||
}
|
||||
|
||||
func TestAddrCombine(t *testing.T) {
|
||||
addrs := newAddrs(t, 30)
|
||||
a := Slice(addrs[0:10])
|
||||
b := Slice(addrs[10:20])
|
||||
c := Slice(addrs[20:30])
|
||||
d := CombineSources(a, b, c)
|
||||
if !addrSetsSame(addrs, d.Addrs()) {
|
||||
t.Error("addrs differ")
|
||||
}
|
||||
if !addrSourcesSame(Slice(addrs), d) {
|
||||
t.Error("addrs differ")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddrUnique(t *testing.T) {
|
||||
|
||||
addrs := newAddrs(t, 40)
|
||||
a := Slice(addrs[0:20])
|
||||
b := Slice(addrs[10:30])
|
||||
c := Slice(addrs[20:40])
|
||||
d := CombineSources(a, b, c)
|
||||
e := UniqueSource(a, b, c)
|
||||
if addrSetsSame(addrs, d.Addrs()) {
|
||||
t.Error("addrs same")
|
||||
}
|
||||
if addrSourcesSame(Slice(addrs), d) {
|
||||
t.Error("addrs same")
|
||||
}
|
||||
if !addrSetsSame(addrs, e.Addrs()) {
|
||||
t.Error("addrs differ", addrs, "\n\n", e.Addrs(), "\n\n")
|
||||
}
|
||||
if !addrSourcesSame(Slice(addrs), e) {
|
||||
t.Error("addrs differ", addrs, "\n\n", e.Addrs(), "\n\n")
|
||||
}
|
||||
}
|
188
addr_manager.go
Normal file
188
addr_manager.go
Normal file
@ -0,0 +1,188 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
// TempAddrTTL is the ttl used for a short lived address
|
||||
TempAddrTTL = time.Second * 10
|
||||
|
||||
// ProviderAddrTTL is the TTL of an address we've received from a provider.
|
||||
// This is also a temporary address, but lasts longer. After this expires,
|
||||
// the records we return will require an extra lookup.
|
||||
ProviderAddrTTL = time.Minute * 10
|
||||
|
||||
// RecentlyConnectedAddrTTL is used when we recently connected to a peer.
|
||||
// It means that we are reasonably certain of the peer's address.
|
||||
RecentlyConnectedAddrTTL = time.Minute * 10
|
||||
|
||||
// OwnObservedAddrTTL is used for our own external addresses observed by peers.
|
||||
OwnObservedAddrTTL = time.Minute * 10
|
||||
|
||||
// PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes)
|
||||
// if we haven't shipped you an update to ipfs in 356 days
|
||||
// we probably arent running the same bootstrap nodes...
|
||||
PermanentAddrTTL = time.Hour * 24 * 356
|
||||
|
||||
// ConnectedAddrTTL is the ttl used for the addresses of a peer to whom
|
||||
// we're connected directly. This is basically permanent, as we will
|
||||
// clear them + re-add under a TempAddrTTL after disconnecting.
|
||||
ConnectedAddrTTL = PermanentAddrTTL
|
||||
)
|
||||
|
||||
type expiringAddr struct {
|
||||
Addr ma.Multiaddr
|
||||
TTL time.Time
|
||||
}
|
||||
|
||||
func (e *expiringAddr) ExpiredBy(t time.Time) bool {
|
||||
return t.After(e.TTL)
|
||||
}
|
||||
|
||||
type addrSet map[string]expiringAddr
|
||||
|
||||
// AddrManager manages addresses.
|
||||
// The zero-value is ready to be used.
|
||||
type AddrManager struct {
|
||||
addrmu sync.Mutex // guards addrs
|
||||
addrs map[ID]addrSet
|
||||
}
|
||||
|
||||
// ensures the AddrManager is initialized.
|
||||
// So we can use the zero value.
|
||||
func (mgr *AddrManager) init() {
|
||||
if mgr.addrs == nil {
|
||||
mgr.addrs = make(map[ID]addrSet)
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *AddrManager) Peers() []ID {
|
||||
mgr.addrmu.Lock()
|
||||
defer mgr.addrmu.Unlock()
|
||||
if mgr.addrs == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
pids := make([]ID, 0, len(mgr.addrs))
|
||||
for pid := range mgr.addrs {
|
||||
pids = append(pids, pid)
|
||||
}
|
||||
return pids
|
||||
}
|
||||
|
||||
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||
func (mgr *AddrManager) AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||
}
|
||||
|
||||
// AddAddrs gives AddrManager addresses to use, with a given ttl
|
||||
// (time-to-live), after which the address is no longer valid.
|
||||
// If the manager has a longer TTL, the operation is a no-op for that address
|
||||
func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.addrmu.Lock()
|
||||
defer mgr.addrmu.Unlock()
|
||||
|
||||
// if ttl is zero, exit. nothing to do.
|
||||
if ttl <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// so zero value can be used
|
||||
mgr.init()
|
||||
|
||||
amap, found := mgr.addrs[p]
|
||||
if !found {
|
||||
amap = make(addrSet)
|
||||
mgr.addrs[p] = amap
|
||||
}
|
||||
|
||||
// only expand ttls
|
||||
exp := time.Now().Add(ttl)
|
||||
for _, addr := range addrs {
|
||||
addrstr := addr.String()
|
||||
a, found := amap[addrstr]
|
||||
if !found || exp.After(a.TTL) {
|
||||
amap[addrstr] = expiringAddr{Addr: addr, TTL: exp}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
|
||||
func (mgr *AddrManager) SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||
}
|
||||
|
||||
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
|
||||
// This is used when we receive the best estimate of the validity of an address.
|
||||
func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.addrmu.Lock()
|
||||
defer mgr.addrmu.Unlock()
|
||||
|
||||
// so zero value can be used
|
||||
mgr.init()
|
||||
|
||||
amap, found := mgr.addrs[p]
|
||||
if !found {
|
||||
amap = make(addrSet)
|
||||
mgr.addrs[p] = amap
|
||||
}
|
||||
|
||||
exp := time.Now().Add(ttl)
|
||||
for _, addr := range addrs {
|
||||
// re-set all of them for new ttl.
|
||||
addrs := addr.String()
|
||||
|
||||
if ttl > 0 {
|
||||
amap[addrs] = expiringAddr{Addr: addr, TTL: exp}
|
||||
} else {
|
||||
delete(amap, addrs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Addresses returns all known (and valid) addresses for a given
|
||||
func (mgr *AddrManager) Addrs(p ID) []ma.Multiaddr {
|
||||
mgr.addrmu.Lock()
|
||||
defer mgr.addrmu.Unlock()
|
||||
|
||||
// not initialized? nothing to give.
|
||||
if mgr.addrs == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
maddrs, found := mgr.addrs[p]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
good := make([]ma.Multiaddr, 0, len(maddrs))
|
||||
var expired []string
|
||||
for s, m := range maddrs {
|
||||
if m.ExpiredBy(now) {
|
||||
expired = append(expired, s)
|
||||
} else {
|
||||
good = append(good, m.Addr)
|
||||
}
|
||||
}
|
||||
|
||||
// clean up the expired ones.
|
||||
for _, s := range expired {
|
||||
delete(maddrs, s)
|
||||
}
|
||||
return good
|
||||
}
|
||||
|
||||
// ClearAddresses removes all previously stored addresses
|
||||
func (mgr *AddrManager) ClearAddrs(p ID) {
|
||||
mgr.addrmu.Lock()
|
||||
defer mgr.addrmu.Unlock()
|
||||
mgr.init()
|
||||
|
||||
mgr.addrs[p] = make(addrSet) // clear what was there before
|
||||
}
|
180
addr_manager_test.go
Normal file
180
addr_manager_test.go
Normal file
@ -0,0 +1,180 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
||||
func IDS(t *testing.T, ids string) ID {
|
||||
id, err := IDB58Decode(ids)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func MA(t *testing.T, m string) ma.Multiaddr {
|
||||
maddr, err := ma.NewMultiaddr(m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return maddr
|
||||
}
|
||||
|
||||
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
|
||||
if len(exp) != len(act) {
|
||||
t.Fatal("lengths not the same")
|
||||
}
|
||||
|
||||
for _, a := range exp {
|
||||
found := false
|
||||
|
||||
for _, b := range act {
|
||||
if a.Equal(b) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Fatal("expected address %s not found", a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddresses(t *testing.T) {
|
||||
|
||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
|
||||
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
|
||||
id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn")
|
||||
id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km")
|
||||
|
||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||
ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111")
|
||||
ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
||||
ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111")
|
||||
ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222")
|
||||
ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
||||
ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111")
|
||||
ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222")
|
||||
ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333")
|
||||
ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
||||
ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111")
|
||||
ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222")
|
||||
ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333")
|
||||
ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444")
|
||||
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
||||
|
||||
ttl := time.Hour
|
||||
m := AddrManager{}
|
||||
m.AddAddr(id1, ma11, ttl)
|
||||
|
||||
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl)
|
||||
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl) // idempotency
|
||||
|
||||
m.AddAddr(id3, ma31, ttl)
|
||||
m.AddAddr(id3, ma32, ttl)
|
||||
m.AddAddr(id3, ma33, ttl)
|
||||
m.AddAddr(id3, ma33, ttl) // idempotency
|
||||
m.AddAddr(id3, ma33, ttl)
|
||||
|
||||
m.AddAddrs(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // multiple
|
||||
|
||||
m.AddAddrs(id5, []ma.Multiaddr{ma21, ma22}, ttl) // clearing
|
||||
m.AddAddrs(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}, ttl) // clearing
|
||||
m.ClearAddrs(id5)
|
||||
m.AddAddrs(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ttl) // clearing
|
||||
|
||||
// test the Addresses return value
|
||||
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
|
||||
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
|
||||
testHas(t, []ma.Multiaddr{ma31, ma32, ma33}, m.Addrs(id3))
|
||||
testHas(t, []ma.Multiaddr{ma41, ma42, ma43, ma44}, m.Addrs(id4))
|
||||
testHas(t, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, m.Addrs(id5))
|
||||
|
||||
}
|
||||
|
||||
func TestAddressesExpire(t *testing.T) {
|
||||
|
||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||
ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
||||
ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
||||
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
||||
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
||||
|
||||
m := AddrManager{}
|
||||
m.AddAddr(id1, ma11, time.Hour)
|
||||
m.AddAddr(id1, ma12, time.Hour)
|
||||
m.AddAddr(id1, ma13, time.Hour)
|
||||
m.AddAddr(id2, ma24, time.Hour)
|
||||
m.AddAddr(id2, ma25, time.Hour)
|
||||
|
||||
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||
|
||||
m.SetAddr(id1, ma11, 2*time.Hour)
|
||||
m.SetAddr(id1, ma12, 2*time.Hour)
|
||||
m.SetAddr(id1, ma13, 2*time.Hour)
|
||||
m.SetAddr(id2, ma24, 2*time.Hour)
|
||||
m.SetAddr(id2, ma25, 2*time.Hour)
|
||||
|
||||
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||
|
||||
m.SetAddr(id1, ma11, time.Millisecond)
|
||||
<-time.After(time.Millisecond)
|
||||
testHas(t, []ma.Multiaddr{ma12, ma13}, m.Addrs(id1))
|
||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||
|
||||
m.SetAddr(id1, ma13, time.Millisecond)
|
||||
<-time.After(time.Millisecond)
|
||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||
|
||||
m.SetAddr(id2, ma24, time.Millisecond)
|
||||
<-time.After(time.Millisecond)
|
||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||
testHas(t, []ma.Multiaddr{ma25}, m.Addrs(id2))
|
||||
|
||||
m.SetAddr(id2, ma25, time.Millisecond)
|
||||
<-time.After(time.Millisecond)
|
||||
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
|
||||
testHas(t, nil, m.Addrs(id2))
|
||||
|
||||
m.SetAddr(id1, ma12, time.Millisecond)
|
||||
<-time.After(time.Millisecond)
|
||||
testHas(t, nil, m.Addrs(id1))
|
||||
testHas(t, nil, m.Addrs(id2))
|
||||
}
|
||||
|
||||
func TestClearWorks(t *testing.T) {
|
||||
|
||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
|
||||
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
|
||||
ma12 := MA(t, "/ip4/2.2.3.2/tcp/2222")
|
||||
ma13 := MA(t, "/ip4/3.2.3.3/tcp/3333")
|
||||
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
|
||||
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
|
||||
|
||||
m := AddrManager{}
|
||||
m.AddAddr(id1, ma11, time.Hour)
|
||||
m.AddAddr(id1, ma12, time.Hour)
|
||||
m.AddAddr(id1, ma13, time.Hour)
|
||||
m.AddAddr(id2, ma24, time.Hour)
|
||||
m.AddAddr(id2, ma25, time.Hour)
|
||||
|
||||
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
|
||||
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
|
||||
|
||||
m.ClearAddrs(id1)
|
||||
m.ClearAddrs(id2)
|
||||
|
||||
testHas(t, nil, m.Addrs(id1))
|
||||
testHas(t, nil, m.Addrs(id2))
|
||||
}
|
63
metrics.go
Normal file
63
metrics.go
Normal file
@ -0,0 +1,63 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// LatencyEWMASmooting governs the decay of the EWMA (the speed
|
||||
// at which it changes). This must be a normalized (0-1) value.
|
||||
// 1 is 100% change, 0 is no change.
|
||||
var LatencyEWMASmoothing = 0.1
|
||||
|
||||
// Metrics is just an object that tracks metrics
|
||||
// across a set of peers.
|
||||
type Metrics interface {
|
||||
|
||||
// RecordLatency records a new latency measurement
|
||||
RecordLatency(ID, time.Duration)
|
||||
|
||||
// LatencyEWMA returns an exponentially-weighted moving avg.
|
||||
// of all measurements of a peer's latency.
|
||||
LatencyEWMA(ID) time.Duration
|
||||
}
|
||||
|
||||
type metrics struct {
|
||||
latmap map[ID]time.Duration
|
||||
latmu sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMetrics() Metrics {
|
||||
return &metrics{
|
||||
latmap: make(map[ID]time.Duration),
|
||||
}
|
||||
}
|
||||
|
||||
// RecordLatency records a new latency measurement
|
||||
func (m *metrics) RecordLatency(p ID, next time.Duration) {
|
||||
nextf := float64(next)
|
||||
s := LatencyEWMASmoothing
|
||||
if s > 1 || s < 0 {
|
||||
s = 0.1 // ignore the knob. it's broken. look, it jiggles.
|
||||
}
|
||||
|
||||
m.latmu.Lock()
|
||||
ewma, found := m.latmap[p]
|
||||
ewmaf := float64(ewma)
|
||||
if !found {
|
||||
m.latmap[p] = next // when no data, just take it as the mean.
|
||||
} else {
|
||||
nextf = ((1.0 - s) * ewmaf) + (s * nextf)
|
||||
m.latmap[p] = time.Duration(nextf)
|
||||
}
|
||||
m.latmu.Unlock()
|
||||
}
|
||||
|
||||
// LatencyEWMA returns an exponentially-weighted moving avg.
|
||||
// of all measurements of a peer's latency.
|
||||
func (m *metrics) LatencyEWMA(p ID) time.Duration {
|
||||
m.latmu.RLock()
|
||||
lat := m.latmap[p]
|
||||
m.latmu.RUnlock()
|
||||
return time.Duration(lat)
|
||||
}
|
40
metrics_test.go
Normal file
40
metrics_test.go
Normal file
@ -0,0 +1,40 @@
|
||||
package peer_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
func TestLatencyEWMAFun(t *testing.T) {
|
||||
t.Skip("run it for fun")
|
||||
|
||||
m := peer.NewMetrics()
|
||||
id, err := testutil.RandPeerID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mu := 100.0
|
||||
sig := 10.0
|
||||
next := func() time.Duration {
|
||||
mu = (rand.NormFloat64() * sig) + mu
|
||||
return time.Duration(mu)
|
||||
}
|
||||
|
||||
print := func() {
|
||||
fmt.Printf("%3.f %3.f --> %d\n", sig, mu, m.LatencyEWMA(id))
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
m.RecordLatency(id, next())
|
||||
print()
|
||||
}
|
||||
}
|
||||
}
|
180
peer.go
Normal file
180
peer.go
Normal file
@ -0,0 +1,180 @@
|
||||
// package peer implements an object used to represent peers in the ipfs network.
|
||||
package peer
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
||||
|
||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
|
||||
)
|
||||
|
||||
var log = logging.Logger("peer")
|
||||
|
||||
// ID represents the identity of a peer.
|
||||
type ID string
|
||||
|
||||
// Pretty returns a b58-encoded string of the ID
|
||||
func (id ID) Pretty() string {
|
||||
return IDB58Encode(id)
|
||||
}
|
||||
|
||||
func (id ID) Loggable() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"peerID": id.Pretty(),
|
||||
}
|
||||
}
|
||||
|
||||
// String prints out the peer.
|
||||
//
|
||||
// TODO(brian): ensure correctness at ID generation and
|
||||
// enforce this by only exposing functions that generate
|
||||
// IDs safely. Then any peer.ID type found in the
|
||||
// codebase is known to be correct.
|
||||
func (id ID) String() string {
|
||||
pid := id.Pretty()
|
||||
|
||||
//All sha256 nodes start with Qm
|
||||
//We can skip the Qm to make the peer.ID more useful
|
||||
if strings.HasPrefix(pid, "Qm") {
|
||||
pid = pid[2:]
|
||||
}
|
||||
|
||||
maxRunes := 6
|
||||
if len(pid) < maxRunes {
|
||||
maxRunes = len(pid)
|
||||
}
|
||||
return fmt.Sprintf("<peer.ID %s>", pid[:maxRunes])
|
||||
}
|
||||
|
||||
// MatchesPrivateKey tests whether this ID was derived from sk
|
||||
func (id ID) MatchesPrivateKey(sk ic.PrivKey) bool {
|
||||
return id.MatchesPublicKey(sk.GetPublic())
|
||||
}
|
||||
|
||||
// MatchesPublicKey tests whether this ID was derived from pk
|
||||
func (id ID) MatchesPublicKey(pk ic.PubKey) bool {
|
||||
oid, err := IDFromPublicKey(pk)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return oid == id
|
||||
}
|
||||
|
||||
// IDFromString cast a string to ID type, and validate
|
||||
// the id to make sure it is a multihash.
|
||||
func IDFromString(s string) (ID, error) {
|
||||
if _, err := mh.Cast([]byte(s)); err != nil {
|
||||
return ID(""), err
|
||||
}
|
||||
return ID(s), nil
|
||||
}
|
||||
|
||||
// IDFromBytes cast a string to ID type, and validate
|
||||
// the id to make sure it is a multihash.
|
||||
func IDFromBytes(b []byte) (ID, error) {
|
||||
if _, err := mh.Cast(b); err != nil {
|
||||
return ID(""), err
|
||||
}
|
||||
return ID(b), nil
|
||||
}
|
||||
|
||||
// IDB58Decode returns a b58-decoded Peer
|
||||
func IDB58Decode(s string) (ID, error) {
|
||||
m, err := mh.FromB58String(s)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return ID(m), err
|
||||
}
|
||||
|
||||
// IDB58Encode returns b58-encoded string
|
||||
func IDB58Encode(id ID) string {
|
||||
return b58.Encode([]byte(id))
|
||||
}
|
||||
|
||||
// IDHexDecode returns a b58-decoded Peer
|
||||
func IDHexDecode(s string) (ID, error) {
|
||||
m, err := mh.FromHexString(s)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return ID(m), err
|
||||
}
|
||||
|
||||
// IDHexEncode returns b58-encoded string
|
||||
func IDHexEncode(id ID) string {
|
||||
return hex.EncodeToString([]byte(id))
|
||||
}
|
||||
|
||||
// IDFromPublicKey returns the Peer ID corresponding to pk
|
||||
func IDFromPublicKey(pk ic.PubKey) (ID, error) {
|
||||
b, err := pk.Bytes()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
hash := u.Hash(b)
|
||||
return ID(hash), nil
|
||||
}
|
||||
|
||||
// IDFromPrivateKey returns the Peer ID corresponding to sk
|
||||
func IDFromPrivateKey(sk ic.PrivKey) (ID, error) {
|
||||
return IDFromPublicKey(sk.GetPublic())
|
||||
}
|
||||
|
||||
// Map maps a Peer ID to a struct.
|
||||
type Set map[ID]struct{}
|
||||
|
||||
// PeerInfo is a small struct used to pass around a peer with
|
||||
// a set of addresses (and later, keys?). This is not meant to be
|
||||
// a complete view of the system, but rather to model updates to
|
||||
// the peerstore. It is used by things like the routing system.
|
||||
type PeerInfo struct {
|
||||
ID ID
|
||||
Addrs []ma.Multiaddr
|
||||
}
|
||||
|
||||
func (pi *PeerInfo) MarshalJSON() ([]byte, error) {
|
||||
out := make(map[string]interface{})
|
||||
out["ID"] = IDB58Encode(pi.ID)
|
||||
var addrs []string
|
||||
for _, a := range pi.Addrs {
|
||||
addrs = append(addrs, a.String())
|
||||
}
|
||||
out["Addrs"] = addrs
|
||||
return json.Marshal(out)
|
||||
}
|
||||
|
||||
func (pi *PeerInfo) UnmarshalJSON(b []byte) error {
|
||||
var data map[string]interface{}
|
||||
err := json.Unmarshal(b, &data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pid, err := IDB58Decode(data["ID"].(string))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pi.ID = pid
|
||||
addrs, ok := data["Addrs"].([]interface{})
|
||||
if ok {
|
||||
for _, a := range addrs {
|
||||
pi.Addrs = append(pi.Addrs, ma.StringCast(a.(string)))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IDSlice for sorting peers
|
||||
type IDSlice []ID
|
||||
|
||||
func (es IDSlice) Len() int { return len(es) }
|
||||
func (es IDSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
|
||||
func (es IDSlice) Less(i, j int) bool { return string(es[i]) < string(es[j]) }
|
163
peer_test.go
Normal file
163
peer_test.go
Normal file
@ -0,0 +1,163 @@
|
||||
package peer_test
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
. "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
tu "github.com/ipfs/go-ipfs/util/testutil"
|
||||
|
||||
b58 "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
|
||||
)
|
||||
|
||||
var gen1 keyset // generated
|
||||
var gen2 keyset // generated
|
||||
var man keyset // manual
|
||||
|
||||
func init() {
|
||||
if err := gen1.generate(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := gen2.generate(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
skManBytes = strings.Replace(skManBytes, "\n", "", -1)
|
||||
if err := man.load(hpkpMan, skManBytes); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
type keyset struct {
|
||||
sk ic.PrivKey
|
||||
pk ic.PubKey
|
||||
hpk string
|
||||
hpkp string
|
||||
}
|
||||
|
||||
func (ks *keyset) generate() error {
|
||||
var err error
|
||||
ks.sk, ks.pk, err = tu.RandTestKeyPair(512)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bpk, err := ks.pk.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ks.hpk = string(u.Hash(bpk))
|
||||
ks.hpkp = b58.Encode([]byte(ks.hpk))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ks *keyset) load(hpkp, skBytesStr string) error {
|
||||
skBytes, err := base64.StdEncoding.DecodeString(skBytesStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ks.sk, err = ic.UnmarshalPrivateKey(skBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ks.pk = ks.sk.GetPublic()
|
||||
bpk, err := ks.pk.Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ks.hpk = string(u.Hash(bpk))
|
||||
ks.hpkp = b58.Encode([]byte(ks.hpk))
|
||||
if ks.hpkp != hpkp {
|
||||
return fmt.Errorf("hpkp doesn't match key. %s", hpkp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestIDMatchesPublicKey(t *testing.T) {
|
||||
|
||||
test := func(ks keyset) {
|
||||
p1, err := IDB58Decode(ks.hpkp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if ks.hpk != string(p1) {
|
||||
t.Error("p1 and hpk differ")
|
||||
}
|
||||
|
||||
if !p1.MatchesPublicKey(ks.pk) {
|
||||
t.Fatal("p1 does not match pk")
|
||||
}
|
||||
|
||||
p2, err := IDFromPublicKey(ks.pk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if p1 != p2 {
|
||||
t.Error("p1 and p2 differ", p1.Pretty(), p2.Pretty())
|
||||
}
|
||||
|
||||
if p2.Pretty() != ks.hpkp {
|
||||
t.Error("hpkp and p2.Pretty differ", ks.hpkp, p2.Pretty())
|
||||
}
|
||||
}
|
||||
|
||||
test(gen1)
|
||||
test(gen2)
|
||||
test(man)
|
||||
}
|
||||
|
||||
func TestIDMatchesPrivateKey(t *testing.T) {
|
||||
|
||||
test := func(ks keyset) {
|
||||
p1, err := IDB58Decode(ks.hpkp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if ks.hpk != string(p1) {
|
||||
t.Error("p1 and hpk differ")
|
||||
}
|
||||
|
||||
if !p1.MatchesPrivateKey(ks.sk) {
|
||||
t.Fatal("p1 does not match sk")
|
||||
}
|
||||
|
||||
p2, err := IDFromPrivateKey(ks.sk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if p1 != p2 {
|
||||
t.Error("p1 and p2 differ", p1.Pretty(), p2.Pretty())
|
||||
}
|
||||
}
|
||||
|
||||
test(gen1)
|
||||
test(gen2)
|
||||
test(man)
|
||||
}
|
||||
|
||||
var hpkpMan = `QmRK3JgmVEGiewxWbhpXLJyjWuGuLeSTMTndA1coMHEy5o`
|
||||
var skManBytes = `
|
||||
CAAS4AQwggJcAgEAAoGBAL7w+Wc4VhZhCdM/+Hccg5Nrf4q9NXWwJylbSrXz/unFS24wyk6pEk0zi3W
|
||||
7li+vSNVO+NtJQw9qGNAMtQKjVTP+3Vt/jfQRnQM3s6awojtjueEWuLYVt62z7mofOhCtj+VwIdZNBo
|
||||
/EkLZ0ETfcvN5LVtLYa8JkXybnOPsLvK+PAgMBAAECgYBdk09HDM7zzL657uHfzfOVrdslrTCj6p5mo
|
||||
DzvCxLkkjIzYGnlPuqfNyGjozkpSWgSUc+X+EGLLl3WqEOVdWJtbM61fewEHlRTM5JzScvwrJ39t7o6
|
||||
CCAjKA0cBWBd6UWgbN/t53RoWvh9HrA2AW5YrT0ZiAgKe9y7EMUaENVJ8QJBAPhpdmb4ZL4Fkm4OKia
|
||||
NEcjzn6mGTlZtef7K/0oRC9+2JkQnCuf6HBpaRhJoCJYg7DW8ZY+AV6xClKrgjBOfERMCQQDExhnzu2
|
||||
dsQ9k8QChBlpHO0TRbZBiQfC70oU31kM1AeLseZRmrxv9Yxzdl8D693NNWS2JbKOXl0kMHHcuGQLMVA
|
||||
kBZ7WvkmPV3aPL6jnwp2pXepntdVnaTiSxJ1dkXShZ/VSSDNZMYKY306EtHrIu3NZHtXhdyHKcggDXr
|
||||
qkBrdgErAkAlpGPojUwemOggr4FD8sLX1ot2hDJyyV7OK2FXfajWEYJyMRL1Gm9Uk1+Un53RAkJneqp
|
||||
JGAzKpyttXBTIDO51AkEA98KTiROMnnU8Y6Mgcvr68/SMIsvCYMt9/mtwSBGgl80VaTQ5Hpaktl6Xbh
|
||||
VUt5Wv0tRxlXZiViCGCD1EtrrwTw==
|
||||
`
|
216
peerstore.go
Normal file
216
peerstore.go
Normal file
@ -0,0 +1,216 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
|
||||
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
||||
const (
|
||||
// AddressTTL is the expiration time of addresses.
|
||||
AddressTTL = time.Hour
|
||||
)
|
||||
|
||||
// Peerstore provides a threadsafe store of Peer related
|
||||
// information.
|
||||
type Peerstore interface {
|
||||
AddrBook
|
||||
KeyBook
|
||||
Metrics
|
||||
|
||||
// Peers returns a list of all peer.IDs in this Peerstore
|
||||
Peers() []ID
|
||||
|
||||
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
|
||||
// This is a small slice of the information Peerstore has on
|
||||
// that peer, useful to other services.
|
||||
PeerInfo(ID) PeerInfo
|
||||
|
||||
// Get/Put is a simple registry for other peer-related key/value pairs.
|
||||
// if we find something we use often, it should become its own set of
|
||||
// methods. this is a last resort.
|
||||
Get(id ID, key string) (interface{}, error)
|
||||
Put(id ID, key string, val interface{}) error
|
||||
}
|
||||
|
||||
// AddrBook is an interface that fits the new AddrManager. I'm patching
|
||||
// it up in here to avoid changing a ton of the codebase.
|
||||
type AddrBook interface {
|
||||
|
||||
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||
AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration)
|
||||
|
||||
// AddAddrs gives AddrManager addresses to use, with a given ttl
|
||||
// (time-to-live), after which the address is no longer valid.
|
||||
// If the manager has a longer TTL, the operation is a no-op for that address
|
||||
AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
|
||||
|
||||
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
|
||||
SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration)
|
||||
|
||||
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
|
||||
// This is used when we receive the best estimate of the validity of an address.
|
||||
SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
|
||||
|
||||
// Addresses returns all known (and valid) addresses for a given
|
||||
Addrs(p ID) []ma.Multiaddr
|
||||
|
||||
// ClearAddresses removes all previously stored addresses
|
||||
ClearAddrs(p ID)
|
||||
}
|
||||
|
||||
// KeyBook tracks the Public keys of Peers.
|
||||
type KeyBook interface {
|
||||
PubKey(ID) ic.PubKey
|
||||
AddPubKey(ID, ic.PubKey) error
|
||||
|
||||
PrivKey(ID) ic.PrivKey
|
||||
AddPrivKey(ID, ic.PrivKey) error
|
||||
}
|
||||
|
||||
type keybook struct {
|
||||
pks map[ID]ic.PubKey
|
||||
sks map[ID]ic.PrivKey
|
||||
|
||||
sync.RWMutex // same lock. wont happen a ton.
|
||||
}
|
||||
|
||||
func newKeybook() *keybook {
|
||||
return &keybook{
|
||||
pks: map[ID]ic.PubKey{},
|
||||
sks: map[ID]ic.PrivKey{},
|
||||
}
|
||||
}
|
||||
|
||||
func (kb *keybook) Peers() []ID {
|
||||
kb.RLock()
|
||||
ps := make([]ID, 0, len(kb.pks)+len(kb.sks))
|
||||
for p := range kb.pks {
|
||||
ps = append(ps, p)
|
||||
}
|
||||
for p := range kb.sks {
|
||||
if _, found := kb.pks[p]; !found {
|
||||
ps = append(ps, p)
|
||||
}
|
||||
}
|
||||
kb.RUnlock()
|
||||
return ps
|
||||
}
|
||||
|
||||
func (kb *keybook) PubKey(p ID) ic.PubKey {
|
||||
kb.RLock()
|
||||
pk := kb.pks[p]
|
||||
kb.RUnlock()
|
||||
return pk
|
||||
}
|
||||
|
||||
func (kb *keybook) AddPubKey(p ID, pk ic.PubKey) error {
|
||||
|
||||
// check it's correct first
|
||||
if !p.MatchesPublicKey(pk) {
|
||||
return errors.New("ID does not match PublicKey")
|
||||
}
|
||||
|
||||
kb.Lock()
|
||||
kb.pks[p] = pk
|
||||
kb.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kb *keybook) PrivKey(p ID) ic.PrivKey {
|
||||
kb.RLock()
|
||||
sk := kb.sks[p]
|
||||
kb.RUnlock()
|
||||
return sk
|
||||
}
|
||||
|
||||
func (kb *keybook) AddPrivKey(p ID, sk ic.PrivKey) error {
|
||||
|
||||
if sk == nil {
|
||||
return errors.New("sk is nil (PrivKey)")
|
||||
}
|
||||
|
||||
// check it's correct first
|
||||
if !p.MatchesPrivateKey(sk) {
|
||||
return errors.New("ID does not match PrivateKey")
|
||||
}
|
||||
|
||||
kb.Lock()
|
||||
kb.sks[p] = sk
|
||||
kb.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
type peerstore struct {
|
||||
keybook
|
||||
metrics
|
||||
AddrManager
|
||||
|
||||
// store other data, like versions
|
||||
ds ds.ThreadSafeDatastore
|
||||
}
|
||||
|
||||
// NewPeerstore creates a threadsafe collection of peers.
|
||||
func NewPeerstore() Peerstore {
|
||||
return &peerstore{
|
||||
keybook: *newKeybook(),
|
||||
metrics: *(NewMetrics()).(*metrics),
|
||||
AddrManager: AddrManager{},
|
||||
ds: dssync.MutexWrap(ds.NewMapDatastore()),
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *peerstore) Put(p ID, key string, val interface{}) error {
|
||||
dsk := ds.NewKey(string(p) + "/" + key)
|
||||
return ps.ds.Put(dsk, val)
|
||||
}
|
||||
|
||||
func (ps *peerstore) Get(p ID, key string) (interface{}, error) {
|
||||
dsk := ds.NewKey(string(p) + "/" + key)
|
||||
return ps.ds.Get(dsk)
|
||||
}
|
||||
|
||||
func (ps *peerstore) Peers() []ID {
|
||||
set := map[ID]struct{}{}
|
||||
for _, p := range ps.keybook.Peers() {
|
||||
set[p] = struct{}{}
|
||||
}
|
||||
for _, p := range ps.AddrManager.Peers() {
|
||||
set[p] = struct{}{}
|
||||
}
|
||||
|
||||
pps := make([]ID, 0, len(set))
|
||||
for p := range set {
|
||||
pps = append(pps, p)
|
||||
}
|
||||
return pps
|
||||
}
|
||||
|
||||
func (ps *peerstore) PeerInfo(p ID) PeerInfo {
|
||||
return PeerInfo{
|
||||
ID: p,
|
||||
Addrs: ps.AddrManager.Addrs(p),
|
||||
}
|
||||
}
|
||||
|
||||
func PeerInfos(ps Peerstore, peers []ID) []PeerInfo {
|
||||
pi := make([]PeerInfo, len(peers))
|
||||
for i, p := range peers {
|
||||
pi[i] = ps.PeerInfo(p)
|
||||
}
|
||||
return pi
|
||||
}
|
||||
|
||||
func PeerInfoIDs(pis []PeerInfo) []ID {
|
||||
ps := make([]ID, len(pis))
|
||||
for i, pi := range pis {
|
||||
ps[i] = pi.ID
|
||||
}
|
||||
return ps
|
||||
}
|
101
queue/distance.go
Normal file
101
queue/distance.go
Normal file
@ -0,0 +1,101 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"math/big"
|
||||
"sync"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
ks "github.com/ipfs/go-ipfs/routing/keyspace"
|
||||
)
|
||||
|
||||
// peerMetric tracks a peer and its distance to something else.
|
||||
type peerMetric struct {
|
||||
// the peer
|
||||
peer peer.ID
|
||||
|
||||
// big.Int for XOR metric
|
||||
metric *big.Int
|
||||
}
|
||||
|
||||
// peerMetricHeap implements a heap of peerDistances
|
||||
type peerMetricHeap []*peerMetric
|
||||
|
||||
func (ph peerMetricHeap) Len() int {
|
||||
return len(ph)
|
||||
}
|
||||
|
||||
func (ph peerMetricHeap) Less(i, j int) bool {
|
||||
return -1 == ph[i].metric.Cmp(ph[j].metric)
|
||||
}
|
||||
|
||||
func (ph peerMetricHeap) Swap(i, j int) {
|
||||
ph[i], ph[j] = ph[j], ph[i]
|
||||
}
|
||||
|
||||
func (ph *peerMetricHeap) Push(x interface{}) {
|
||||
item := x.(*peerMetric)
|
||||
*ph = append(*ph, item)
|
||||
}
|
||||
|
||||
func (ph *peerMetricHeap) Pop() interface{} {
|
||||
old := *ph
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
*ph = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// distancePQ implements heap.Interface and PeerQueue
|
||||
type distancePQ struct {
|
||||
// from is the Key this PQ measures against
|
||||
from ks.Key
|
||||
|
||||
// heap is a heap of peerDistance items
|
||||
heap peerMetricHeap
|
||||
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (pq *distancePQ) Len() int {
|
||||
pq.Lock()
|
||||
defer pq.Unlock()
|
||||
return len(pq.heap)
|
||||
}
|
||||
|
||||
func (pq *distancePQ) Enqueue(p peer.ID) {
|
||||
pq.Lock()
|
||||
defer pq.Unlock()
|
||||
|
||||
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
|
||||
|
||||
heap.Push(&pq.heap, &peerMetric{
|
||||
peer: p,
|
||||
metric: distance,
|
||||
})
|
||||
}
|
||||
|
||||
func (pq *distancePQ) Dequeue() peer.ID {
|
||||
pq.Lock()
|
||||
defer pq.Unlock()
|
||||
|
||||
if len(pq.heap) < 1 {
|
||||
panic("called Dequeue on an empty PeerQueue")
|
||||
// will panic internally anyway, but we can help debug here
|
||||
}
|
||||
|
||||
o := heap.Pop(&pq.heap)
|
||||
p := o.(*peerMetric)
|
||||
return p.peer
|
||||
}
|
||||
|
||||
// NewXORDistancePQ returns a PeerQueue which maintains its peers sorted
|
||||
// in terms of their distances to each other in an XORKeySpace (i.e. using
|
||||
// XOR as a metric of distance).
|
||||
func NewXORDistancePQ(fromKey key.Key) PeerQueue {
|
||||
return &distancePQ{
|
||||
from: ks.XORKeySpace.Key([]byte(fromKey)),
|
||||
heap: peerMetricHeap{},
|
||||
}
|
||||
}
|
18
queue/interface.go
Normal file
18
queue/interface.go
Normal file
@ -0,0 +1,18 @@
|
||||
package queue
|
||||
|
||||
import peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
|
||||
// PeerQueue maintains a set of peers ordered according to a metric.
|
||||
// Implementations of PeerQueue could order peers based on distances along
|
||||
// a KeySpace, latency measurements, trustworthiness, reputation, etc.
|
||||
type PeerQueue interface {
|
||||
|
||||
// Len returns the number of items in PeerQueue
|
||||
Len() int
|
||||
|
||||
// Enqueue adds this node to the queue.
|
||||
Enqueue(peer.ID)
|
||||
|
||||
// Dequeue retrieves the highest (smallest int) priority node
|
||||
Dequeue() peer.ID
|
||||
}
|
141
queue/queue_test.go
Normal file
141
queue/queue_test.go
Normal file
@ -0,0 +1,141 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
|
||||
p1 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
|
||||
p2 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32") // these aren't valid, because need to hex-decode.
|
||||
p3 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") // these aren't valid, because need to hex-decode.
|
||||
p4 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34") // these aren't valid, because need to hex-decode.
|
||||
p5 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
|
||||
// but they work.
|
||||
|
||||
// these are the peer.IDs' XORKeySpace Key values:
|
||||
// [228 47 151 130 156 102 222 232 218 31 132 94 170 208 80 253 120 103 55 35 91 237 48 157 81 245 57 247 66 150 9 40]
|
||||
// [26 249 85 75 54 49 25 30 21 86 117 62 85 145 48 175 155 194 210 216 58 14 241 143 28 209 129 144 122 28 163 6]
|
||||
// [78 135 26 216 178 181 224 181 234 117 2 248 152 115 255 103 244 34 4 152 193 88 9 225 8 127 216 158 226 8 236 246]
|
||||
// [125 135 124 6 226 160 101 94 192 57 39 12 18 79 121 140 190 154 147 55 44 83 101 151 63 255 94 179 51 203 241 51]
|
||||
|
||||
pq := NewXORDistancePQ(key.Key("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
|
||||
pq.Enqueue(p3)
|
||||
pq.Enqueue(p1)
|
||||
pq.Enqueue(p2)
|
||||
pq.Enqueue(p4)
|
||||
pq.Enqueue(p5)
|
||||
pq.Enqueue(p1)
|
||||
|
||||
// should come out as: p1, p4, p3, p2
|
||||
|
||||
if d := pq.Dequeue(); d != p1 && d != p5 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if d := pq.Dequeue(); d != p1 && d != p5 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if d := pq.Dequeue(); d != p1 && d != p5 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if pq.Dequeue() != p4 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if pq.Dequeue() != p3 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if pq.Dequeue() != p2 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func newPeerTime(t time.Time) peer.ID {
|
||||
s := fmt.Sprintf("hmmm time: %v", t)
|
||||
h := u.Hash([]byte(s))
|
||||
return peer.ID(h)
|
||||
}
|
||||
|
||||
func TestSyncQueue(t *testing.T) {
|
||||
tickT := time.Microsecond * 50
|
||||
max := 5000
|
||||
consumerN := 10
|
||||
countsIn := make([]int, consumerN*2)
|
||||
countsOut := make([]int, consumerN)
|
||||
|
||||
if testing.Short() {
|
||||
max = 1000
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pq := NewXORDistancePQ(key.Key("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
|
||||
cq := NewChanQueue(ctx, pq)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
produce := func(p int) {
|
||||
defer wg.Done()
|
||||
|
||||
tick := time.Tick(tickT)
|
||||
for i := 0; i < max; i++ {
|
||||
select {
|
||||
case tim := <-tick:
|
||||
countsIn[p]++
|
||||
cq.EnqChan <- newPeerTime(tim)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
consume := func(c int) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-cq.DeqChan:
|
||||
countsOut[c]++
|
||||
if countsOut[c] >= max*2 {
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make n * 2 producers and n consumers
|
||||
for i := 0; i < consumerN; i++ {
|
||||
wg.Add(3)
|
||||
go produce(i)
|
||||
go produce(consumerN + i)
|
||||
go consume(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
sum := func(ns []int) int {
|
||||
total := 0
|
||||
for _, n := range ns {
|
||||
total += n
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
if sum(countsIn) != sum(countsOut) {
|
||||
t.Errorf("didnt get all of them out: %d/%d", sum(countsOut), sum(countsIn))
|
||||
}
|
||||
}
|
84
queue/sync.go
Normal file
84
queue/sync.go
Normal file
@ -0,0 +1,84 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
|
||||
)
|
||||
|
||||
var log = logging.Logger("peerqueue")
|
||||
|
||||
// ChanQueue makes any PeerQueue synchronizable through channels.
|
||||
type ChanQueue struct {
|
||||
Queue PeerQueue
|
||||
EnqChan chan<- peer.ID
|
||||
DeqChan <-chan peer.ID
|
||||
}
|
||||
|
||||
// NewChanQueue creates a ChanQueue by wrapping pq.
|
||||
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
|
||||
cq := &ChanQueue{Queue: pq}
|
||||
cq.process(ctx)
|
||||
return cq
|
||||
}
|
||||
|
||||
func (cq *ChanQueue) process(ctx context.Context) {
|
||||
// construct the channels here to be able to use them bidirectionally
|
||||
enqChan := make(chan peer.ID)
|
||||
deqChan := make(chan peer.ID)
|
||||
|
||||
cq.EnqChan = enqChan
|
||||
cq.DeqChan = deqChan
|
||||
|
||||
go func() {
|
||||
log.Debug("processing")
|
||||
defer log.Debug("closed")
|
||||
defer close(deqChan)
|
||||
|
||||
var next peer.ID
|
||||
var item peer.ID
|
||||
var more bool
|
||||
|
||||
for {
|
||||
if cq.Queue.Len() == 0 {
|
||||
// log.Debug("wait for enqueue")
|
||||
select {
|
||||
case next, more = <-enqChan:
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
// log.Debug("got", next)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
} else {
|
||||
next = cq.Queue.Dequeue()
|
||||
// log.Debug("peek", next)
|
||||
}
|
||||
|
||||
select {
|
||||
case item, more = <-enqChan:
|
||||
if !more {
|
||||
if cq.Queue.Len() > 0 {
|
||||
return // we're done done.
|
||||
}
|
||||
enqChan = nil // closed, so no use.
|
||||
}
|
||||
// log.Debug("got", item)
|
||||
cq.Queue.Enqueue(item)
|
||||
cq.Queue.Enqueue(next) // order may have changed.
|
||||
next = ""
|
||||
|
||||
case deqChan <- next:
|
||||
// log.Debug("dequeued", next)
|
||||
next = ""
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
Loading…
Reference in New Issue
Block a user