Merge pull request #1 from ipfs/feat/addr-stream

Add peerstore method to subscribe to new addresses
This commit is contained in:
Jeromy Johnson 2016-05-31 10:42:36 -07:00
commit 0089910775
6 changed files with 408 additions and 0 deletions

62
addr/sorting.go Normal file
View File

@ -0,0 +1,62 @@
package addr
import (
"bytes"
ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
mafmt "github.com/whyrusleeping/mafmt"
)
func isFDCostlyTransport(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}
type AddrList []ma.Multiaddr
func (al AddrList) Len() int {
return len(al)
}
func (al AddrList) Swap(i, j int) {
al[i], al[j] = al[j], al[i]
}
func (al AddrList) Less(i, j int) bool {
a := al[i]
b := al[j]
// dial localhost addresses next, they should fail immediately
lba := manet.IsIPLoopback(a)
lbb := manet.IsIPLoopback(b)
if lba {
if !lbb {
return true
}
}
// dial utp and similar 'non-fd-consuming' addresses first
fda := isFDCostlyTransport(a)
fdb := isFDCostlyTransport(b)
if !fda {
if fdb {
return true
}
// if neither consume fd's, assume equal ordering
return false
}
// if 'b' doesnt take a file descriptor
if !fdb {
return false
}
// if 'b' is loopback and both take file descriptors
if lbb {
return false
}
// for the rest, just sort by bytes
return bytes.Compare(a.Bytes(), b.Bytes()) > 0
}

32
addr/sorting_test.go Normal file
View File

@ -0,0 +1,32 @@
package addr
import (
"sort"
"testing"
)
func TestAddressSorting(t *testing.T) {
u1 := newAddrOrFatal(t, "/ip4/152.12.23.53/udp/1234/utp")
u2l := newAddrOrFatal(t, "/ip4/127.0.0.1/udp/1234/utp")
local := newAddrOrFatal(t, "/ip4/127.0.0.1/tcp/1234")
norm := newAddrOrFatal(t, "/ip4/6.5.4.3/tcp/1234")
l := AddrList{local, u1, u2l, norm}
sort.Sort(l)
if !l[0].Equal(u2l) {
t.Fatal("expected utp local addr to be sorted first: ", l[0])
}
if !l[1].Equal(u1) {
t.Fatal("expected utp addr to be sorted second")
}
if !l[2].Equal(local) {
t.Fatal("expected tcp localhost addr thid")
}
if !l[3].Equal(norm) {
t.Fatal("expected normal addr last")
}
}

View File

@ -1,10 +1,13 @@
package peer
import (
"sort"
"sync"
"time"
addr "github.com/ipfs/go-libp2p-peer/addr"
ma "github.com/jbenet/go-multiaddr"
"golang.org/x/net/context"
)
const (
@ -51,6 +54,8 @@ type addrSet map[string]expiringAddr
type AddrManager struct {
addrmu sync.Mutex // guards addrs
addrs map[ID]addrSet
addrSubs map[ID][]*addrSub
}
// ensures the AddrManager is initialized.
@ -59,6 +64,9 @@ func (mgr *AddrManager) init() {
if mgr.addrs == nil {
mgr.addrs = make(map[ID]addrSet)
}
if mgr.addrSubs == nil {
mgr.addrSubs = make(map[ID][]*addrSub)
}
}
func (mgr *AddrManager) Peers() []ID {
@ -101,6 +109,8 @@ func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
mgr.addrs[p] = amap
}
subs := mgr.addrSubs[p]
// only expand ttls
exp := time.Now().Add(ttl)
for _, addr := range addrs {
@ -113,6 +123,10 @@ func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
a, found := amap[addrstr]
if !found || exp.After(a.TTL) {
amap[addrstr] = expiringAddr{Addr: addr, TTL: exp}
for _, sub := range subs {
sub.pubAddr(addr)
}
}
}
}
@ -137,6 +151,8 @@ func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
mgr.addrs[p] = amap
}
subs := mgr.addrSubs[p]
exp := time.Now().Add(ttl)
for _, addr := range addrs {
if addr == nil {
@ -148,6 +164,10 @@ func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
if ttl > 0 {
amap[addrs] = expiringAddr{Addr: addr, TTL: exp}
for _, sub := range subs {
sub.pubAddr(addr)
}
} else {
delete(amap, addrs)
}
@ -195,3 +215,99 @@ func (mgr *AddrManager) ClearAddrs(p ID) {
mgr.addrs[p] = make(addrSet) // clear what was there before
}
func (mgr *AddrManager) removeSub(p ID, s *addrSub) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
subs := mgr.addrSubs[p]
var filtered []*addrSub
for _, v := range subs {
if v != s {
filtered = append(filtered, v)
}
}
mgr.addrSubs[p] = filtered
}
type addrSub struct {
pubch chan ma.Multiaddr
lk sync.Mutex
buffer []ma.Multiaddr
ctx context.Context
}
func (s *addrSub) pubAddr(a ma.Multiaddr) {
select {
case s.pubch <- a:
case <-s.ctx.Done():
}
}
func (mgr *AddrManager) AddrStream(ctx context.Context, p ID) <-chan ma.Multiaddr {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
mgr.init()
sub := &addrSub{pubch: make(chan ma.Multiaddr), ctx: ctx}
out := make(chan ma.Multiaddr)
mgr.addrSubs[p] = append(mgr.addrSubs[p], sub)
baseaddrset := mgr.addrs[p]
var initial []ma.Multiaddr
for _, a := range baseaddrset {
initial = append(initial, a.Addr)
}
sort.Sort(addr.AddrList(initial))
go func(buffer []ma.Multiaddr) {
defer close(out)
sent := make(map[string]bool)
var outch chan ma.Multiaddr
for _, a := range buffer {
sent[a.String()] = true
}
var next ma.Multiaddr
if len(buffer) > 0 {
next = buffer[0]
buffer = buffer[1:]
outch = out
}
for {
select {
case outch <- next:
if len(buffer) > 0 {
next = buffer[0]
buffer = buffer[1:]
} else {
outch = nil
next = nil
}
case naddr := <-sub.pubch:
if sent[naddr.String()] {
continue
}
sent[naddr.String()] = true
if next == nil {
next = naddr
outch = out
} else {
buffer = append(buffer, naddr)
}
case <-ctx.Done():
mgr.removeSub(p, sub)
return
}
}
}(initial)
return out
}

View File

@ -54,6 +54,18 @@
"hash": "QmUEUu1CM8bxBJxc3ZLojAi8evhTr4byQogWstABet79oY",
"name": "go-libp2p-crypto",
"version": "1.0.2"
},
{
"author": "jbenet",
"hash": "QmYp8PC6b9M3UY4awdHkdRUim68KpGSmRmz27bANHteen6",
"name": "go-multiaddr-net",
"version": "1.1.0"
},
{
"author": "whyrusleeping",
"hash": "QmeLQ13LftT9XhNn22piZc3GP56fGqhijuL5Y8KdUaRn1g",
"name": "mafmt",
"version": "1.1.1"
}
],
"gxVersion": "0.4.0",

View File

@ -10,6 +10,7 @@ import (
//ds "github.com/jbenet/go-datastore"
//dssync "github.com/jbenet/go-datastore/sync"
ma "github.com/jbenet/go-multiaddr"
"golang.org/x/net/context"
)
const (
@ -61,6 +62,11 @@ type AddrBook interface {
// Addresses returns all known (and valid) addresses for a given
Addrs(p ID) []ma.Multiaddr
// AddrStream returns a channel that gets all addresses for a given
// peer sent on it. If new addresses are added after the call is made
// they will be sent along through the channel as well.
AddrStream(context.Context, ID) <-chan ma.Multiaddr
// ClearAddresses removes all previously stored addresses
ClearAddrs(p ID)
}

180
peerstore_test.go Normal file
View File

@ -0,0 +1,180 @@
package peer
import (
"fmt"
"math/rand"
"testing"
"time"
ma "github.com/jbenet/go-multiaddr"
"golang.org/x/net/context"
)
func getAddrs(t *testing.T, n int) []ma.Multiaddr {
var addrs []ma.Multiaddr
for i := 0; i < n; i++ {
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i))
if err != nil {
t.Fatal(err)
}
addrs = append(addrs, a)
}
return addrs
}
func TestAddrStream(t *testing.T) {
addrs := getAddrs(t, 100)
pid := ID("testpeer")
ps := NewPeerstore()
ps.AddAddrs(pid, addrs[:10], time.Hour)
ctx, cancel := context.WithCancel(context.Background())
addrch := ps.AddrStream(ctx, pid)
// while that subscription is active, publish ten more addrs
// this tests that it doesnt hang
for i := 10; i < 20; i++ {
ps.AddAddr(pid, addrs[i], time.Hour)
}
// now receive them (without hanging)
timeout := time.After(time.Second * 10)
for i := 0; i < 20; i++ {
select {
case <-addrch:
case <-timeout:
t.Fatal("timed out")
}
}
done := make(chan struct{})
go func() {
defer close(done)
// now send the rest of the addresses
for _, a := range addrs[20:80] {
ps.AddAddr(pid, a, time.Hour)
}
}()
// receive some concurrently with the goroutine
timeout = time.After(time.Second * 10)
for i := 0; i < 40; i++ {
select {
case <-addrch:
case <-timeout:
}
}
<-done
// receive some more after waiting for that goroutine to complete
timeout = time.After(time.Second * 10)
for i := 0; i < 20; i++ {
select {
case <-addrch:
case <-timeout:
}
}
// now cancel it, and add a few more addresses it doesnt hang afterwards
cancel()
for _, a := range addrs[80:] {
ps.AddAddr(pid, a, time.Hour)
}
}
func TestGetStreamBeforePeerAdded(t *testing.T) {
addrs := getAddrs(t, 10)
pid := ID("testpeer")
ps := NewPeerstore()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ach := ps.AddrStream(ctx, pid)
for i := 0; i < 10; i++ {
ps.AddAddr(pid, addrs[i], time.Hour)
}
received := make(map[string]bool)
var count int
for i := 0; i < 10; i++ {
a, ok := <-ach
if !ok {
t.Fatal("channel shouldnt be closed yet")
}
if a == nil {
t.Fatal("got a nil address, thats weird")
}
count++
if received[a.String()] {
t.Fatal("received duplicate address")
}
received[a.String()] = true
}
select {
case <-ach:
t.Fatal("shouldnt have received any more addresses")
default:
}
if count != 10 {
t.Fatal("should have received exactly ten addresses, got ", count)
}
for _, a := range addrs {
if !received[a.String()] {
t.Log(received)
t.Fatalf("expected to receive address %s but didnt", a)
}
}
}
func TestAddrStreamDuplicates(t *testing.T) {
addrs := getAddrs(t, 10)
pid := ID("testpeer")
ps := NewPeerstore()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ach := ps.AddrStream(ctx, pid)
go func() {
for i := 0; i < 10; i++ {
ps.AddAddr(pid, addrs[i], time.Hour)
ps.AddAddr(pid, addrs[rand.Intn(10)], time.Hour)
}
// make sure that all addresses get processed before context is cancelled
time.Sleep(time.Millisecond * 50)
cancel()
}()
received := make(map[string]bool)
var count int
for a := range ach {
if a == nil {
t.Fatal("got a nil address, thats weird")
}
count++
if received[a.String()] {
t.Fatal("received duplicate address")
}
received[a.String()] = true
}
if count != 10 {
t.Fatal("should have received exactly ten addresses")
}
}