1
0
mirror of https://github.com/libp2p/go-libp2p-core.git synced 2025-04-17 15:40:37 +08:00

genesis of go-libp2p-core.

This commit is contained in:
Raúl Kripalani 2019-04-16 13:34:58 +01:00
parent 7f05a8a4f5
commit 1e60799a3d
44 changed files with 3548 additions and 0 deletions

85
connmgr/connmgr.go Normal file
View File

@ -0,0 +1,85 @@
package connmgr
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
// ConnManager tracks connections to peers, and allows consumers to associate metadata
// with each peer.
//
// It enables connections to be trimmed based on implementation-defined heuristics.
type ConnManager interface {
// TagPeer tags a peer with a string, associating a weight with the tag.
TagPeer(peer.ID, string, int)
// Untag removes the tagged value from the peer.
UntagPeer(p peer.ID, tag string)
// GetTagInfo returns the metadata associated with the peer,
// or nil if no metadata has been recorded for the peer.
GetTagInfo(p peer.ID) *TagInfo
// TrimOpenConns terminates open connections based on an implementation-defined
// heuristic.
TrimOpenConns(ctx context.Context)
// Notifee returns an implementation that can be called back to inform of
// opened and closed connections.
Notifee() network.Notifiee
// Protect protects a peer from having its connection(s) pruned.
//
// Tagging allows different parts of the system to manage protections without interfering with one another.
//
// Calls to Protect() with the same tag are idempotent. They are not refcounted, so after multiple calls
// to Protect() with the same tag, a single Unprotect() call bearing the same tag will revoke the protection.
Protect(id peer.ID, tag string)
// Unprotect removes a protection that may have been placed on a peer, under the specified tag.
//
// The return value indicates whether the peer continues to be protected after this call, by way of a different tag.
// See notes on Protect() for more info.
Unprotect(id peer.ID, tag string) (protected bool)
}
// TagInfo stores metadata associated with a peer.
type TagInfo struct {
FirstSeen time.Time
Value int
// Tags maps tag ids to the numerical values.
Tags map[string]int
// Conns maps connection ids (such as remote multiaddr) to their creation time.
Conns map[string]time.Time
}
type NullConnMgr struct{}
var _ ConnManager = (*NullConnMgr)(nil)
func (_ NullConnMgr) TagPeer(peer.ID, string, int) {}
func (_ NullConnMgr) UntagPeer(peer.ID, string) {}
func (_ NullConnMgr) GetTagInfo(peer.ID) *TagInfo { return &TagInfo{} }
func (_ NullConnMgr) TrimOpenConns(context.Context) {}
func (_ NullConnMgr) Notifee() network.Notifiee { return &cmNotifee{} }
func (_ NullConnMgr) Protect(peer.ID, string) {}
func (_ NullConnMgr) Unprotect(peer.ID, string) bool { return false }
type cmNotifee struct{}
var _ network.Notifiee = (*cmNotifee)(nil)
func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {}
func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {}
func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {}
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {}
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {}
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {}

48
discovery/discovery.go Normal file
View File

@ -0,0 +1,48 @@
package discovery
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
// Advertiser is an interface for advertising services
type Advertiser interface {
// Advertise advertises a service
Advertise(ctx context.Context, ns string, opts ...DiscoveryOpt) (time.Duration, error)
}
// Discoverer is an interface for peer discovery
type Discoverer interface {
// FindPeers discovers peers providing a service
FindPeers(ctx context.Context, ns string, opts ...DiscoveryOpt) (<-chan peer.Info, error)
}
// Discovery is an interface that combines service advertisement and peer discovery
type Discovery interface {
Advertiser
Discoverer
}
// DiscoveryOpt is a single discovery option.
type DiscoveryOpt func(opts *DiscoveryOpts) error
// DiscoveryOpts is a set of discovery options.
type DiscoveryOpts struct {
Ttl time.Duration
Limit int
// Other (implementation-specific) options
Other map[interface{}]interface{}
}
// Apply applies the given options to this DiscoveryOpts
func (opts *DiscoveryOpts) Apply(options ...DiscoveryOpt) error {
for _, o := range options {
if err := o(opts); err != nil {
return err
}
}
return nil
}

12
go.mod Normal file
View File

@ -0,0 +1,12 @@
module github.com/libp2p/go-libp2p-core
go 1.12
require (
github.com/coreos/go-semver v0.3.0
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
github.com/libp2p/go-libp2p-crypto v0.0.1
github.com/mr-tron/base58 v1.1.1
github.com/multiformats/go-multiaddr v0.0.2
github.com/multiformats/go-multihash v0.0.1
)

63
go.sum Normal file
View File

@ -0,0 +1,63 @@
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw=
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

42
helpers/match.go Normal file
View File

@ -0,0 +1,42 @@
package helpers
import (
"strings"
"github.com/coreos/go-semver/semver"
"github.com/libp2p/go-libp2p-core/protocol"
)
// MultistreamSemverMatcher returns a matcher function for a given base protocol.
// The matcher function will return a boolean indicating whether a protocol ID
// matches the base protocol. A given protocol ID matches the base protocol if
// the IDs are the same and if the semantic version of the base protocol is the
// same or higher than that of the protocol ID provided.
// TODO
func MultistreamSemverMatcher(base protocol.ID) (func(string) bool, error) {
parts := strings.Split(string(base), "/")
vers, err := semver.NewVersion(parts[len(parts)-1])
if err != nil {
return nil, err
}
return func(check string) bool {
chparts := strings.Split(check, "/")
if len(chparts) != len(parts) {
return false
}
for i, v := range chparts[:len(chparts)-1] {
if parts[i] != v {
return false
}
}
chvers, err := semver.NewVersion(chparts[len(chparts)-1])
if err != nil {
return false
}
return vers.Major == chvers.Major && vers.Minor >= chvers.Minor
}, nil
}

33
helpers/match_test.go Normal file
View File

@ -0,0 +1,33 @@
package helpers
import (
"testing"
)
func TestSemverMatching(t *testing.T) {
m, err := MultistreamSemverMatcher("/testing/4.3.5")
if err != nil {
t.Fatal(err)
}
cases := map[string]bool{
"/testing/4.3.0": true,
"/testing/4.3.7": true,
"/testing/4.3.5": true,
"/testing/4.2.7": true,
"/testing/4.0.0": true,
"/testing/5.0.0": false,
"/cars/dogs/4.3.5": false,
"/foo/1.0.0": false,
"": false,
"dogs": false,
"/foo": false,
"/foo/1.1.1.1": false,
}
for p, ok := range cases {
if m(p) != ok {
t.Fatalf("expected %s to be %t", p, ok)
}
}
}

56
helpers/stream_util.go Normal file
View File

@ -0,0 +1,56 @@
package helpers
import (
"errors"
"io"
"time"
"github.com/libp2p/go-libp2p-core/network"
)
// EOFTimeout is the maximum amount of time to wait to successfully observe an
// EOF on the stream. Defaults to 60 seconds.
var EOFTimeout = time.Second * 60
// ErrExpectedEOF is returned when we read data while expecting an EOF.
var ErrExpectedEOF = errors.New("read data when expecting EOF")
// FullClose closes the stream and waits to read an EOF from the other side.
//
// * If it reads any data *before* the EOF, it resets the stream.
// * If it doesn't read an EOF within EOFTimeout, it resets the stream.
//
// You'll likely want to invoke this as `go FullClose(stream)` to close the
// stream in the background.
func FullClose(s network.Stream) error {
if err := s.Close(); err != nil {
s.Reset()
return err
}
return AwaitEOF(s)
}
// AwaitEOF waits for an EOF on the given stream, returning an error if that
// fails. It waits at most EOFTimeout (defaults to 1 minute) after which it
// resets the stream.
func AwaitEOF(s network.Stream) error {
// So we don't wait forever
s.SetDeadline(time.Now().Add(EOFTimeout))
// We *have* to observe the EOF. Otherwise, we leak the stream.
// Now, technically, we should do this *before*
// returning from SendMessage as the message
// hasn't really been sent yet until we see the
// EOF but we don't actually *know* what
// protocol the other side is speaking.
n, err := s.Read([]byte{0})
if n > 0 || err == nil {
s.Reset()
return ErrExpectedEOF
}
if err != io.EOF {
s.Reset()
return err
}
return nil
}

151
helpers/stream_util_test.go Normal file
View File

@ -0,0 +1,151 @@
package helpers_test
import (
"errors"
"io"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/helpers"
network "github.com/libp2p/go-libp2p-core/network"
)
var errCloseFailed = errors.New("close failed")
var errWriteFailed = errors.New("write failed")
var errReadFailed = errors.New("read failed")
type stream struct {
network.Stream
data []byte
failRead, failWrite, failClose bool
reset bool
}
func (s *stream) Reset() error {
s.reset = true
return nil
}
func (s *stream) Close() error {
if s.failClose {
return errCloseFailed
}
return nil
}
func (s *stream) SetDeadline(t time.Time) error {
s.SetReadDeadline(t)
s.SetWriteDeadline(t)
return nil
}
func (s *stream) SetReadDeadline(t time.Time) error {
return nil
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return nil
}
func (s *stream) Write(b []byte) (int, error) {
if s.failWrite {
return 0, errWriteFailed
}
return len(b), nil
}
func (s *stream) Read(b []byte) (int, error) {
var err error
if s.failRead {
err = errReadFailed
}
if len(s.data) == 0 {
if err == nil {
err = io.EOF
}
return 0, err
}
n := copy(b, s.data)
s.data = s.data[n:]
return n, err
}
func TestNormal(t *testing.T) {
var s stream
if err := helpers.FullClose(&s); err != nil {
t.Fatal(err)
}
if s.reset {
t.Fatal("stream should not have been reset")
}
}
func TestFailRead(t *testing.T) {
var s stream
s.failRead = true
if helpers.FullClose(&s) != errReadFailed {
t.Fatal("expected read to fail with:", errReadFailed)
}
if !s.reset {
t.Fatal("expected stream to be reset")
}
}
func TestFailClose(t *testing.T) {
var s stream
s.failClose = true
if helpers.FullClose(&s) != errCloseFailed {
t.Fatal("expected close to fail with:", errCloseFailed)
}
if !s.reset {
t.Fatal("expected stream to be reset")
}
}
func TestFailWrite(t *testing.T) {
var s stream
s.failWrite = true
if err := helpers.FullClose(&s); err != nil {
t.Fatal(err)
}
if s.reset {
t.Fatal("stream should not have been reset")
}
}
func TestReadDataOne(t *testing.T) {
var s stream
s.data = []byte{0}
if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF {
t.Fatal("expected:", helpers.ErrExpectedEOF)
}
if !s.reset {
t.Fatal("stream have been reset")
}
}
func TestReadDataMany(t *testing.T) {
var s stream
s.data = []byte{0, 1, 2, 3}
if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF {
t.Fatal("expected:", helpers.ErrExpectedEOF)
}
if !s.reset {
t.Fatal("stream have been reset")
}
}
func TestReadDataError(t *testing.T) {
var s stream
s.data = []byte{0, 1, 2, 3}
s.failRead = true
if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF {
t.Fatal("expected:", helpers.ErrExpectedEOF)
}
if !s.reset {
t.Fatal("stream have been reset")
}
}

11
host/helpers.go Normal file
View File

@ -0,0 +1,11 @@
package host
import "github.com/libp2p/go-libp2p-core/peer"
// InfoFromHost returns a peer.Info struct with the Host's ID and all of its Addrs.
func InfoFromHost(h Host) *peer.Info {
return &peer.Info{
ID: h.ID(),
Addrs: h.Addrs(),
}
}

68
host/host.go Normal file
View File

@ -0,0 +1,68 @@
package host
import (
"context"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"
)
// Host is an object participating in a p2p network, which
// implements protocols or provides services. It handles
// requests like a Server, and issues requests like a Client.
// It is called Host because it is both Server and Client (and Peer
// may be confusing).
type Host interface {
// ID returns the (local) peer.ID associated with this Host
ID() peer.ID
// Peerstore returns the Host's repository of Peer Addresses and Keys.
Peerstore() peerstore.Peerstore
// Returns the listen addresses of the Host
Addrs() []ma.Multiaddr
// Networks returns the Network interface of the Host
Network() network.Network
// Mux returns the Mux multiplexing incoming streams to protocol handlers
Mux() protocol.Switch
// Connect ensures there is a connection between this host and the peer with
// given peer.ID. Connect will absorb the addresses in pi into its internal
// peerstore. If there is not an active connection, Connect will issue a
// h.Network.Dial, and block until a connection is open, or an error is
// returned. // TODO: Relay + NAT.
Connect(ctx context.Context, pi peer.Info) error
// SetStreamHandler sets the protocol handler on the Host's Mux.
// This is equivalent to:
// host.Mux().SetHandler(proto, handler)
// (Threadsafe)
SetStreamHandler(pid protocol.ID, handler network.StreamHandler)
// SetStreamHandlerMatch sets the protocol handler on the Host's Mux
// using a matching function for protocol selection.
SetStreamHandlerMatch(protocol.ID, func(string) bool, network.StreamHandler)
// RemoveStreamHandler removes a handler on the mux that was set by
// SetStreamHandler
RemoveStreamHandler(pid protocol.ID)
// NewStream opens a new stream to given peer p, and writes a p2p/protocol
// header with given ProtocolID. If there is no connection to p, attempts
// to create one. If ProtocolID is "", writes no header.
// (Threadsafe)
NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error)
// Close shuts down the host, its Network, and services.
Close() error
// ConnManager returns this hosts connection manager
ConnManager() connmgr.ConnManager
}

82
metrics/bw_stats.go Normal file
View File

@ -0,0 +1,82 @@
package metrics
import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
type Stats struct {
TotalIn int64
TotalOut int64
RateIn float64
RateOut float64
}
type BandwidthCounter struct {
totalIn Meter
totalOut Meter
protocolIn MeterRegistry
protocolOut MeterRegistry
peerIn MeterRegistry
peerOut MeterRegistry
}
func NewBandwidthCounter() *BandwidthCounter {
return new(BandwidthCounter)
}
func (bwc *BandwidthCounter) LogSentMessage(size int64) {
bwc.totalOut.Mark(uint64(size))
}
func (bwc *BandwidthCounter) LogRecvMessage(size int64) {
bwc.totalIn.Mark(uint64(size))
}
func (bwc *BandwidthCounter) LogSentMessageStream(size int64, proto protocol.ID, p peer.ID) {
bwc.protocolOut.Get(string(proto)).Mark(uint64(size))
bwc.peerOut.Get(string(p)).Mark(uint64(size))
}
func (bwc *BandwidthCounter) LogRecvMessageStream(size int64, proto protocol.ID, p peer.ID) {
bwc.protocolIn.Get(string(proto)).Mark(uint64(size))
bwc.peerIn.Get(string(p)).Mark(uint64(size))
}
func (bwc *BandwidthCounter) GetBandwidthForPeer(p peer.ID) (out Stats) {
inSnap := bwc.peerIn.Get(string(p)).Snapshot()
outSnap := bwc.peerOut.Get(string(p)).Snapshot()
return Stats{
TotalIn: int64(inSnap.Total),
TotalOut: int64(outSnap.Total),
RateIn: inSnap.Rate,
RateOut: outSnap.Rate,
}
}
func (bwc *BandwidthCounter) GetBandwidthForProtocol(proto protocol.ID) (out Stats) {
inSnap := bwc.protocolIn.Get(string(proto)).Snapshot()
outSnap := bwc.protocolOut.Get(string(proto)).Snapshot()
return Stats{
TotalIn: int64(inSnap.Total),
TotalOut: int64(outSnap.Total),
RateIn: inSnap.Rate,
RateOut: outSnap.Rate,
}
}
func (bwc *BandwidthCounter) GetBandwidthTotals() (out Stats) {
inSnap := bwc.totalIn.Snapshot()
outSnap := bwc.totalOut.Snapshot()
return Stats{
TotalIn: int64(inSnap.Total),
TotalOut: int64(outSnap.Total),
RateIn: inSnap.Rate,
RateOut: outSnap.Rate,
}
}

175
metrics/flow_test.go Normal file
View File

@ -0,0 +1,175 @@
package metrics
import (
"math"
"sync"
"testing"
"time"
)
func TestBasic(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
m := new(Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
<-ticker.C
}
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 500) {
t.Errorf("expected rate 25000 (±500), got %f", actual.Rate)
}
for i := 0; i < 200; i++ {
m.Mark(200)
<-ticker.C
}
// Adjusts
actual = m.Snapshot()
if !approxEq(actual.Rate, 5000, 200) {
t.Errorf("expected rate 5000 (±200), got %f", actual.Rate)
}
// Let it settle.
time.Sleep(2 * time.Second)
// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}()
}
wg.Wait()
}
func TestShared(t *testing.T) {
var wg sync.WaitGroup
wg.Add(20 * 21)
for i := 0; i < 20; i++ {
m := new(Meter)
for j := 0; j < 20; j++ {
go func() {
defer wg.Done()
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 300; i++ {
m.Mark(50)
<-ticker.C
}
for i := 0; i < 200; i++ {
m.Mark(10)
<-ticker.C
}
}()
}
go func() {
defer wg.Done()
time.Sleep(40 * 300 * time.Millisecond)
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 250) {
t.Errorf("expected rate 25000 (±250), got %f", actual.Rate)
}
time.Sleep(40 * 200 * time.Millisecond)
// Adjusts
actual = m.Snapshot()
if !approxEq(actual.Rate, 5000, 50) {
t.Errorf("expected rate 5000 (±50), got %f", actual.Rate)
}
// Let it settle.
time.Sleep(2 * time.Second)
// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}()
}
wg.Wait()
}
func TestUnregister(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100 * 2)
pause := make(chan struct{})
for i := 0; i < 100; i++ {
m := new(Meter)
go func() {
defer wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 40; i++ {
m.Mark(1)
<-ticker.C
}
<-pause
time.Sleep(2 * time.Second)
for i := 0; i < 40; i++ {
m.Mark(2)
<-ticker.C
}
}()
go func() {
defer wg.Done()
time.Sleep(40 * 100 * time.Millisecond)
actual := m.Snapshot()
if !approxEq(actual.Rate, 10, 1) {
t.Errorf("expected rate 10 (±1), got %f", actual.Rate)
}
<-pause
actual = m.Snapshot()
if actual.Total != 40 {
t.Errorf("expected total 4000, got %d", actual.Total)
}
time.Sleep(2*time.Second + 40*100*time.Millisecond)
actual = m.Snapshot()
if !approxEq(actual.Rate, 20, 4) {
t.Errorf("expected rate 20 (±4), got %f", actual.Rate)
}
time.Sleep(2 * time.Second)
actual = m.Snapshot()
if actual.Total != 120 {
t.Errorf("expected total 120, got %d", actual.Total)
}
}()
}
time.Sleep(60 * time.Second)
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 0 {
t.Errorf("expected all sweepers to be unregistered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
close(pause)
wg.Wait()
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 100 {
t.Errorf("expected all sweepers to be registered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
}
func approxEq(a, b, err float64) bool {
return math.Abs(a-b) < err
}

16
metrics/interface.go Normal file
View File

@ -0,0 +1,16 @@
package metrics
import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
type Reporter interface {
LogSentMessage(int64)
LogRecvMessage(int64)
LogSentMessageStream(int64, protocol.ID, peer.ID)
LogRecvMessageStream(int64, protocol.ID, peer.ID)
GetBandwidthForPeer(peer.ID) Stats
GetBandwidthForProtocol(protocol.ID) Stats
GetBandwidthTotals() Stats
}

44
metrics/meter.go Normal file
View File

@ -0,0 +1,44 @@
package metrics
import (
"fmt"
"sync/atomic"
)
// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
}
func (s Snapshot) String() string {
return fmt.Sprintf("%d (%f/s)", s.Total, s.Rate)
}
// Meter is a meter for monitoring a flow.
type Meter struct {
accumulator uint64
// Take lock.
snapshot Snapshot
}
// Mark updates the total.
func (m *Meter) Mark(count uint64) {
if count > 0 && atomic.AddUint64(&m.accumulator, count) == count {
// I'm the first one to bump this above 0.
// Register it.
globalSweeper.Register(m)
}
}
// Snapshot gets a consistent snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
return m.snapshot
}
func (m *Meter) String() string {
return m.Snapshot().String()
}

34
metrics/meter_test.go Normal file
View File

@ -0,0 +1,34 @@
package metrics
import (
"fmt"
"math"
"time"
)
func ExampleMeter() {
meter := new(Meter)
t := time.NewTicker(100 * time.Millisecond)
for i := 0; i < 100; i++ {
<-t.C
meter.Mark(30)
}
// Get the current rate. This will be accurate *now* but not after we
// sleep (because we calculate it using EWMA).
rate := meter.Snapshot().Rate
// Sleep 2 seconds to allow the total to catch up. We snapshot every
// second so the total may not yet be accurate.
time.Sleep(2 * time.Second)
// Get the current total.
total := meter.Snapshot().Total
fmt.Printf("%d (%d/s)\n", total, roundTens(rate))
// Output: 3000 (300/s)
}
func roundTens(x float64) int64 {
return int64(math.Floor(x/10+0.5)) * 10
}

134
metrics/metrics_test.go Normal file
View File

@ -0,0 +1,134 @@
package metrics
import (
"fmt"
"math"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
func BenchmarkBandwidthCounter(b *testing.B) {
b.StopTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
bwc := NewBandwidthCounter()
round(bwc, b)
}
}
func round(bwc *BandwidthCounter, b *testing.B) {
start := make(chan struct{})
var wg sync.WaitGroup
wg.Add(10000)
for i := 0; i < 1000; i++ {
p := peer.ID(fmt.Sprintf("peer-%d", i))
for j := 0; j < 10; j++ {
proto := protocol.ID(fmt.Sprintf("bitswap-%d", j))
go func() {
defer wg.Done()
<-start
for i := 0; i < 1000; i++ {
bwc.LogSentMessage(100)
bwc.LogSentMessageStream(100, proto, p)
time.Sleep(1 * time.Millisecond)
}
}()
}
}
b.StartTimer()
close(start)
wg.Wait()
b.StopTimer()
}
// Allow 7% errors for bw calculations.
const acceptableError = 0.07
func TestBandwidthCounter(t *testing.T) {
bwc := NewBandwidthCounter()
start := make(chan struct{})
var wg sync.WaitGroup
wg.Add(200)
for i := 0; i < 100; i++ {
p := peer.ID(fmt.Sprintf("peer-%d", i))
for j := 0; j < 2; j++ {
proto := protocol.ID(fmt.Sprintf("proto-%d", j))
go func() {
defer wg.Done()
<-start
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
for i := 0; i < 40; i++ {
bwc.LogSentMessage(100)
bwc.LogRecvMessage(50)
bwc.LogSentMessageStream(100, proto, p)
bwc.LogRecvMessageStream(50, proto, p)
<-t.C
}
}()
}
}
close(start)
time.Sleep(2*time.Second + 100*time.Millisecond)
for i := 0; i < 100; i++ {
stats := bwc.GetBandwidthForPeer(peer.ID(fmt.Sprintf("peer-%d", i)))
assertApproxEq(t, 2000, stats.RateOut)
assertApproxEq(t, 1000, stats.RateIn)
}
for i := 0; i < 2; i++ {
stats := bwc.GetBandwidthForProtocol(protocol.ID(fmt.Sprintf("proto-%d", i)))
assertApproxEq(t, 100000, stats.RateOut)
assertApproxEq(t, 50000, stats.RateIn)
}
{
stats := bwc.GetBandwidthTotals()
assertApproxEq(t, 200000, stats.RateOut)
assertApproxEq(t, 100000, stats.RateIn)
}
wg.Wait()
time.Sleep(1 * time.Second)
for i := 0; i < 100; i++ {
stats := bwc.GetBandwidthForPeer(peer.ID(fmt.Sprintf("peer-%d", i)))
assertEq(t, 8000, stats.TotalOut)
assertEq(t, 4000, stats.TotalIn)
}
for i := 0; i < 2; i++ {
stats := bwc.GetBandwidthForProtocol(protocol.ID(fmt.Sprintf("proto-%d", i)))
assertEq(t, 400000, stats.TotalOut)
assertEq(t, 200000, stats.TotalIn)
}
{
stats := bwc.GetBandwidthTotals()
assertEq(t, 800000, stats.TotalOut)
assertEq(t, 400000, stats.TotalIn)
}
}
func assertEq(t *testing.T, expected, actual int64) {
if expected != actual {
t.Errorf("expected %d, got %d", expected, actual)
}
}
func assertApproxEq(t *testing.T, expected, actual float64) {
margin := expected * acceptableError
if !(math.Abs(expected-actual) <= margin) {
t.Errorf("expected %f (±%f), got %f", expected, margin, actual)
}
}

35
metrics/registry.go Normal file
View File

@ -0,0 +1,35 @@
package metrics
import (
"sync"
)
// MeterRegistry is a registry for named meters.
type MeterRegistry struct {
meters sync.Map
}
// Get gets (or creates) a meter by name.
func (r *MeterRegistry) Get(name string) *Meter {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
}
m, _ := r.meters.LoadOrStore(name, new(Meter))
return m.(*Meter)
}
// Remove removes the named meter from the registry.
//
// Note: The only reason to do this is to save a bit of memory. Unused meters
// don't consume any CPU (after they go idle).
func (r *MeterRegistry) Remove(name string) {
r.meters.Delete(name)
}
// ForEach calls the passed function for each registered meter.
func (r *MeterRegistry) ForEach(iterFunc func(string, *Meter)) {
r.meters.Range(func(k, v interface{}) bool {
iterFunc(k.(string), v.(*Meter))
return true
})
}

77
metrics/registry_test.go Normal file
View File

@ -0,0 +1,77 @@
package metrics
import (
"testing"
"time"
)
func TestRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m1.Mark(10)
m2.Mark(30)
time.Sleep(2 * time.Second)
if total := r.Get("first").Snapshot().Total; total != 10 {
t.Errorf("expected first total to be 10, got %d", total)
}
if total := r.Get("second").Snapshot().Total; total != 30 {
t.Errorf("expected second total to be 30, got %d", total)
}
expectedMeters := map[string]*Meter{
"first": m1,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
delete(expectedMeters, n)
})
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}
r.Remove("first")
found := false
r.ForEach(func(n string, m *Meter) {
if n != "second" {
t.Errorf("found unexpected meter: %s", n)
return
}
if found {
t.Error("found meter twice")
}
found = true
})
if !found {
t.Errorf("didn't find second meter")
}
m3 := r.Get("first")
if m3 == m1 {
t.Error("should have gotten a new meter")
}
if total := m3.Snapshot().Total; total != 0 {
t.Errorf("expected first total to now be 0, got %d", total)
}
expectedMeters = map[string]*Meter{
"first": m3,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
delete(expectedMeters, n)
})
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}
}

153
metrics/sweeper.go Normal file
View File

@ -0,0 +1,153 @@
package metrics
import (
"math"
"sync"
"sync/atomic"
"time"
)
// IdleRate the rate at which we declare a meter idle (and stop tracking it
// until it's re-registered).
//
// The default ensures that 1 event every ~30s will keep the meter from going
// idle.
var IdleRate = 1e-13
// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper
type sweeper struct {
sweepOnce sync.Once
meters []*Meter
mutex sync.RWMutex
lastUpdateTime time.Time
registerChannel chan *Meter
}
func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
go sw.run()
}
func (sw *sweeper) run() {
for m := range sw.registerChannel {
sw.register(m)
sw.runActive()
}
}
func (sw *sweeper) register(m *Meter) {
// Add back the snapshot total. If we unregistered this
// one, we set it to zero.
atomic.AddUint64(&m.accumulator, m.snapshot.Total)
sw.meters = append(sw.meters, m)
}
func (sw *sweeper) runActive() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
sw.lastUpdateTime = time.Now()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
newMeters := make([]*Meter, len(sw.meters))
copy(newMeters, sw.meters)
sw.meters = newMeters
}
select {
case <-ticker.C:
sw.update()
case m := <-sw.registerChannel:
sw.register(m)
}
}
sw.meters = nil
// Till next time.
}
func (sw *sweeper) update() {
sw.mutex.Lock()
defer sw.mutex.Unlock()
now := time.Now()
tdiff := now.Sub(sw.lastUpdateTime)
if tdiff <= 0 {
return
}
sw.lastUpdateTime = now
timeMultiplier := float64(time.Second) / float64(tdiff)
newLen := len(sw.meters)
for i, m := range sw.meters {
total := atomic.LoadUint64(&m.accumulator)
instant := timeMultiplier * float64(total-m.snapshot.Total)
if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
} else {
m.snapshot.Rate += alpha * (instant - m.snapshot.Rate)
}
m.snapshot.Total = total
// This is equivalent to one zeros, then one, then 30 zeros.
// We'll consider that to be "idle".
if m.snapshot.Rate > IdleRate {
continue
}
// Ok, so we are idle...
// Mark this as idle by zeroing the accumulator.
swappedTotal := atomic.SwapUint64(&m.accumulator, 0)
// So..., are we really idle?
if swappedTotal > total {
// Not so idle...
// Now we need to make sure this gets re-registered.
// First, add back what we removed. If we can do this
// fast enough, we can put it back before anyone
// notices.
currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal)
// Did we make it?
if currentTotal == swappedTotal {
// Yes! Nobody noticed, move along.
continue
}
// No. Someone noticed and will (or has) put back into
// the registration channel.
//
// Remove the snapshot total, it'll get added back on
// registration.
//
// `^uint64(total - 1)` is the two's complement of
// `total`. It's the "correct" way to subtract
// atomically in go.
atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1))
}
// Reset the rate, keep the total.
m.snapshot.Rate = 0
newLen--
sw.meters[i] = sw.meters[newLen]
}
// trim the meter list
for i := newLen; i < len(sw.meters); i++ {
sw.meters[i] = nil
}
sw.meters = sw.meters[:newLen]
}
func (sw *sweeper) Register(m *Meter) {
sw.sweepOnce.Do(sw.start)
sw.registerChannel <- m
}

56
mux/muxer.go Normal file
View File

@ -0,0 +1,56 @@
package mux
import (
"errors"
"io"
"net"
"time"
)
// ErrReset is returned when reading or writing on a reset stream.
var ErrReset = errors.New("stream reset")
// Stream is a bidirectional io pipe within a connection.
type MuxStream interface {
io.Reader
io.Writer
// Close closes the stream for writing. Reading will still work (that
// is, the remote side can still write).
io.Closer
// Reset closes both ends of the stream. Use this to tell the remote
// side to hang up and go away.
Reset() error
SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}
// NoOpHandler do nothing. Resets streams as soon as they are opened.
var NoOpHandler = func(s MuxStream) { s.Reset() }
// MuxedConn is a stream-multiplexing connection to a remote peer.
type MuxedConn interface {
// Close closes the stream muxer and the the underlying net.Conn.
io.Closer
// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool
// OpenStream creates a new stream.
OpenStream() (MuxStream, error)
// AcceptStream accepts a stream opened by the other side.
AcceptStream() (MuxStream, error)
}
// Transport constructs go-stream-muxer compatible connections.
type Multiplexer interface {
// NewConn constructs a new connection
// TODO rename to Wrap / Multiplex
NewConn(c net.Conn, isServer bool) (MuxedConn, error)
}

227
network/network.go Normal file
View File

@ -0,0 +1,227 @@
package network
import (
"context"
"errors"
"io"
"github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
ic "github.com/libp2p/go-libp2p-crypto"
ma "github.com/multiformats/go-multiaddr"
)
// MessageSizeMax is a soft (recommended) maximum for network messages.
// One can write more, as the interface is a stream. But it is useful
// to bunch it up into multiple read/writes when the whole message is
// a single, large serialized object.
const MessageSizeMax = 1 << 22 // 4 MB
// Stream represents a bidirectional channel between two agents in
// a libp2p network. "agent" is as granular as desired, potentially
// being a "request -> reply" pair, or whole protocols.
//
// Streams are backed by a multiplexer underneath the hood.
type Stream interface {
mux.MuxStream
Protocol() protocol.ID
SetProtocol(id protocol.ID)
// Stat returns metadata pertaining to this stream.
Stat() Stat
// Conn returns the connection this stream is part of.
Conn() Conn
}
// Direction represents which peer in a stream initiated a connection.
type Direction int
const (
// DirUnknown is the default direction.
DirUnknown Direction = iota
// DirInbound is for when the remote peer initiated a connection.
DirInbound
// DirOutbound is for when the local peer initiated a connection.
DirOutbound
)
// Stat stores metadata pertaining to a given Stream/Conn.
type Stat struct {
Direction Direction
Extra map[interface{}]interface{}
}
// StreamHandler is the type of function used to listen for
// streams opened by the remote side.
type StreamHandler func(Stream)
// ConnSecurity is the interface that one can mix into a connection interface to
// give it the security methods.
type ConnSecurity interface {
// LocalPeer returns our peer ID
LocalPeer() peer.ID
// LocalPrivateKey returns our private key
LocalPrivateKey() ic.PrivKey
// RemotePeer returns the peer ID of the remote peer.
RemotePeer() peer.ID
// RemotePublicKey returns the public key of the remote peer.
RemotePublicKey() ic.PubKey
}
// ConnMultiaddrs is an interface mixin for connection types that provide multiaddr
// addresses for the endpoints.
type ConnMultiaddrs interface {
// LocalMultiaddr returns the local Multiaddr associated
// with this connection
LocalMultiaddr() ma.Multiaddr
// RemoteMultiaddr returns the remote Multiaddr associated
// with this connection
RemoteMultiaddr() ma.Multiaddr
}
// Conn is a connection to a remote peer. It multiplexes streams.
// Usually there is no need to use a Conn directly, but it may
// be useful to get information about the peer on the other side:
// stream.Conn().RemotePeer()
type Conn interface {
io.Closer
ConnSecurity
ConnMultiaddrs
// NewStream constructs a new Stream over this conn.
NewStream() (Stream, error)
// GetStreams returns all open streams over this conn.
GetStreams() []Stream
// Stat stores metadata pertaining to this conn.
Stat() Stat
}
// ConnHandler is the type of function used to listen for
// connections opened by the remote side.
type ConnHandler func(Conn)
// Network is the interface used to connect to the outside world.
// It dials and listens for connections. it uses a Swarm to pool
// connnections (see swarm pkg, and peerstream.Swarm). Connections
// are encrypted with a TLS-like protocol.
type Network interface {
Dialer
io.Closer
// SetStreamHandler sets the handler for new streams opened by the
// remote side. This operation is threadsafe.
SetStreamHandler(StreamHandler)
// SetConnHandler sets the handler for new connections opened by the
// remote side. This operation is threadsafe.
SetConnHandler(ConnHandler)
// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
NewStream(context.Context, peer.ID) (Stream, error)
// Listen tells the network to start listening on given multiaddrs.
Listen(...ma.Multiaddr) error
// ListenAddresses returns a list of addresses at which this network listens.
ListenAddresses() []ma.Multiaddr
// InterfaceListenAddresses returns a list of addresses at which this network
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
InterfaceListenAddresses() ([]ma.Multiaddr, error)
// Process returns the network's Process
Process() goprocess.Process
}
// There are no addresses associated with a peer when they were needed.
var ErrNoRemoteAddrs = errors.New("no remote addresses")
// ErrNoConn is returned when attempting to open a stream to a peer with the NoDial
// option and no usable connection is available.
var ErrNoConn = errors.New("no usable connection to peer")
// Dialer represents a service that can dial out to peers
// (this is usually just a Network, but other services may not need the whole
// stack, and thus it becomes easier to mock)
type Dialer interface {
// Peerstore returns the internal peerstore
// This is useful to tell the dialer about a new address for a peer.
// Or use one of the public keys found out over the network.
Peerstore() peerstore.Peerstore
// LocalPeer returns the local peer associated with this network
LocalPeer() peer.ID
// DialPeer establishes a connection to a given peer
DialPeer(context.Context, peer.ID) (Conn, error)
// ClosePeer closes the connection to a given peer
ClosePeer(peer.ID) error
// Connectedness returns a state signaling connection capabilities
Connectedness(peer.ID) Connectedness
// Peers returns the peers connected
Peers() []peer.ID
// Conns returns the connections in this Netowrk
Conns() []Conn
// ConnsToPeer returns the connections in this Netowrk for given peer.
ConnsToPeer(p peer.ID) []Conn
// Notify/StopNotify register and unregister a notifiee for signals
Notify(Notifiee)
StopNotify(Notifiee)
}
// Connectedness signals the capacity for a connection with a given node.
// It is used to signal to services and other peers whether a node is reachable.
type Connectedness int
const (
// NotConnected means no connection to peer, and no extra information (default)
NotConnected Connectedness = iota
// Connected means has an open, live connection to peer
Connected
// CanConnect means recently connected to peer, terminated gracefully
CanConnect
// CannotConnect means recently attempted connecting but failed to connect.
// (should signal "made effort, failed")
CannotConnect
)
// Notifiee is an interface for an object wishing to receive
// notifications from a Network.
type Notifiee interface {
Listen(Network, ma.Multiaddr) // called when network starts listening on an addr
ListenClose(Network, ma.Multiaddr) // called when network stops listening on an addr
Connected(Network, Conn) // called when a connection opened
Disconnected(Network, Conn) // called when a connection closed
OpenedStream(Network, Stream) // called when a stream opened
ClosedStream(Network, Stream) // called when a stream closed
// TODO
// PeerConnected(Network, peer.ID) // called when a peer connected
// PeerDisconnected(Network, peer.ID) // called when a peer disconnected
}

61
network/notifiee.go Normal file
View File

@ -0,0 +1,61 @@
package network
import ma "github.com/multiformats/go-multiaddr"
// NotifyBundle implements Notifiee by calling any of the functions set on it,
// and nop'ing if they are unset. This is the easy way to register for
// notifications.
type NotifyBundle struct {
ListenF func(Network, ma.Multiaddr)
ListenCloseF func(Network, ma.Multiaddr)
ConnectedF func(Network, Conn)
DisconnectedF func(Network, Conn)
OpenedStreamF func(Network, Stream)
ClosedStreamF func(Network, Stream)
}
var _ Notifiee = (*NotifyBundle)(nil)
// Listen calls ListenF if it is not null.
func (nb *NotifyBundle) Listen(n Network, a ma.Multiaddr) {
if nb.ListenF != nil {
nb.ListenF(n, a)
}
}
// ListenClose calls ListenCloseF if it is not null.
func (nb *NotifyBundle) ListenClose(n Network, a ma.Multiaddr) {
if nb.ListenCloseF != nil {
nb.ListenCloseF(n, a)
}
}
// Connected calls ConnectedF if it is not null.
func (nb *NotifyBundle) Connected(n Network, c Conn) {
if nb.ConnectedF != nil {
nb.ConnectedF(n, c)
}
}
// Disconnected calls DisconnectedF if it is not null.
func (nb *NotifyBundle) Disconnected(n Network, c Conn) {
if nb.DisconnectedF != nil {
nb.DisconnectedF(n, c)
}
}
// OpenedStream calls OpenedStreamF if it is not null.
func (nb *NotifyBundle) OpenedStream(n Network, s Stream) {
if nb.OpenedStreamF != nil {
nb.OpenedStreamF(n, s)
}
}
// ClosedStream calls ClosedStreamF if it is not null.
func (nb *NotifyBundle) ClosedStream(n Network, s Stream) {
if nb.ClosedStreamF != nil {
nb.ClosedStreamF(n, s)
}
}

123
network/notifiee_test.go Normal file
View File

@ -0,0 +1,123 @@
package network
import (
"testing"
ma "github.com/multiformats/go-multiaddr"
)
func TestListen(T *testing.T) {
var notifee NotifyBundle
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
if err != nil {
T.Fatal("unexpected multiaddr error")
}
notifee.Listen(nil, addr)
called := false
notifee.ListenF = func(Network, ma.Multiaddr) {
called = true
}
if called {
T.Fatal("called should be false")
}
notifee.Listen(nil, addr)
if !called {
T.Fatal("Listen should have been called")
}
}
func TestListenClose(T *testing.T) {
var notifee NotifyBundle
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
if err != nil {
T.Fatal("unexpected multiaddr error")
}
notifee.ListenClose(nil, addr)
called := false
notifee.ListenCloseF = func(Network, ma.Multiaddr) {
called = true
}
if called {
T.Fatal("called should be false")
}
notifee.ListenClose(nil, addr)
if !called {
T.Fatal("ListenClose should have been called")
}
}
func TestConnected(T *testing.T) {
var notifee NotifyBundle
notifee.Connected(nil, nil)
called := false
notifee.ConnectedF = func(Network, Conn) {
called = true
}
if called {
T.Fatal("called should be false")
}
notifee.Connected(nil, nil)
if !called {
T.Fatal("Connected should have been called")
}
}
func TestDisconnected(T *testing.T) {
var notifee NotifyBundle
notifee.Disconnected(nil, nil)
called := false
notifee.DisconnectedF = func(Network, Conn) {
called = true
}
if called {
T.Fatal("called should be false")
}
notifee.Disconnected(nil, nil)
if !called {
T.Fatal("Disconnected should have been called")
}
}
func TestOpenedStream(T *testing.T) {
var notifee NotifyBundle
notifee.OpenedStream(nil, nil)
called := false
notifee.OpenedStreamF = func(Network, Stream) {
called = true
}
if called {
T.Fatal("called should be false")
}
notifee.OpenedStream(nil, nil)
if !called {
T.Fatal("OpenedStream should have been called")
}
}
func TestClosedStream(T *testing.T) {
var notifee NotifyBundle
notifee.ClosedStream(nil, nil)
called := false
notifee.ClosedStreamF = func(Network, Stream) {
called = true
}
if called {
T.Fatal("called should be false")
}
notifee.ClosedStream(nil, nil)
if !called {
T.Fatal("ClosedStream should have been called")
}
}

23
network/options.go Normal file
View File

@ -0,0 +1,23 @@
package network
import "context"
type noDialCtxKey struct{}
var noDial = noDialCtxKey{}
// WithNoDial constructs a new context with an option that instructs the network
// to not attempt a new dial when opening a stream.
func WithNoDial(ctx context.Context, reason string) context.Context {
return context.WithValue(ctx, noDial, reason)
}
// GetNoDial returns true if the no dial option is set in the context.
func GetNoDial(ctx context.Context) (nodial bool, reason string) {
v := ctx.Value(noDial)
if v != nil {
return true, v.(string)
}
return false, ""
}

29
network/timeouts.go Normal file
View File

@ -0,0 +1,29 @@
package network
import (
"context"
"time"
)
// DialPeerTimeout is the default timeout for a single call to `DialPeer`. When
// there are multiple concurrent calls to `DialPeer`, this timeout will apply to
// each independently.
var DialPeerTimeout = 60 * time.Second
type dialPeerTimeoutCtxKey struct{}
// GetDialPeerTimeout returns the current DialPeer timeout (or the default).
func GetDialPeerTimeout(ctx context.Context) time.Duration {
if to, ok := ctx.Value(dialPeerTimeoutCtxKey{}).(time.Duration); ok {
return to
}
return DialPeerTimeout
}
// WithDialPeerTimeout returns a new context with the DialPeer timeout applied.
//
// This timeout overrides the default DialPeerTimeout and applies per-dial
// independently.
func WithDialPeerTimeout(ctx context.Context, timeout time.Duration) context.Context {
return context.WithValue(ctx, dialPeerTimeoutCtxKey{}, timeout)
}

40
network/timeouts_test.go Normal file
View File

@ -0,0 +1,40 @@
package network
import (
"context"
"testing"
"time"
)
func TestDefaultTimeout(t *testing.T) {
ctx := context.Background()
dur := GetDialPeerTimeout(ctx)
if dur != DialPeerTimeout {
t.Fatal("expected default peer timeout")
}
}
func TestNonDefaultTimeout(t *testing.T) {
customTimeout := time.Duration(1)
ctx := context.WithValue(
context.Background(),
dialPeerTimeoutCtxKey{},
customTimeout,
)
dur := GetDialPeerTimeout(ctx)
if dur != customTimeout {
t.Fatal("peer timeout doesn't match set timeout")
}
}
func TestSettingTimeout(t *testing.T) {
customTimeout := time.Duration(1)
ctx := WithDialPeerTimeout(
context.Background(),
customTimeout,
)
dur := GetDialPeerTimeout(ctx)
if dur != customTimeout {
t.Fatal("peer timeout doesn't match set timeout")
}
}

80
peer/info.go Normal file
View File

@ -0,0 +1,80 @@
package peer
import (
"fmt"
"strings"
ma "github.com/multiformats/go-multiaddr"
)
// PeerInfo is a small struct used to pass around a peer with
// a set of addresses (and later, keys?).
type Info struct {
ID ID
Addrs []ma.Multiaddr
}
var _ fmt.Stringer = Info{}
func (pi Info) String() string {
return fmt.Sprintf("{%v: %v}", pi.ID, pi.Addrs)
}
var ErrInvalidAddr = fmt.Errorf("invalid p2p multiaddr")
func InfoFromP2pAddr(m ma.Multiaddr) (*Info, error) {
if m == nil {
return nil, ErrInvalidAddr
}
// make sure it's an IPFS addr
parts := ma.Split(m)
if len(parts) < 1 {
return nil, ErrInvalidAddr
}
// TODO(lgierth): we shouldn't assume /ipfs is the last part
ipfspart := parts[len(parts)-1]
if ipfspart.Protocols()[0].Code != ma.P_IPFS {
return nil, ErrInvalidAddr
}
// make sure the /ipfs value parses as a peer.ID
peerIdParts := strings.Split(ipfspart.String(), "/")
peerIdStr := peerIdParts[len(peerIdParts)-1]
id, err := IDB58Decode(peerIdStr)
if err != nil {
return nil, err
}
// we might have received just an /ipfs part, which means there's no addr.
var addrs []ma.Multiaddr
if len(parts) > 1 {
addrs = append(addrs, ma.Join(parts[:len(parts)-1]...))
}
return &Info{
ID: id,
Addrs: addrs,
}, nil
}
func InfoToP2pAddrs(pi *Info) ([]ma.Multiaddr, error) {
var addrs []ma.Multiaddr
tpl := "/" + ma.ProtocolWithCode(ma.P_IPFS).Name + "/"
for _, addr := range pi.Addrs {
p2paddr, err := ma.NewMultiaddr(tpl + IDB58Encode(pi.ID))
if err != nil {
return nil, err
}
addrs = append(addrs, addr.Encapsulate(p2paddr))
}
return addrs, nil
}
func (pi *Info) Loggable() map[string]interface{} {
return map[string]interface{}{
"peerID": pi.ID.Pretty(),
"addrs": pi.Addrs,
}
}

38
peer/info_serde.go Normal file
View File

@ -0,0 +1,38 @@
package peer
import (
"encoding/json"
ma "github.com/multiformats/go-multiaddr"
)
func (pi Info) MarshalJSON() ([]byte, error) {
out := make(map[string]interface{})
out["ID"] = pi.ID.Pretty()
var addrs []string
for _, a := range pi.Addrs {
addrs = append(addrs, a.String())
}
out["Addrs"] = addrs
return json.Marshal(out)
}
func (pi *Info) UnmarshalJSON(b []byte) error {
var data map[string]interface{}
err := json.Unmarshal(b, &data)
if err != nil {
return err
}
pid, err := IDB58Decode(data["ID"].(string))
if err != nil {
return err
}
pi.ID = pid
addrs, ok := data["Addrs"].([]interface{})
if ok {
for _, a := range addrs {
pi.Addrs = append(pi.Addrs, ma.StringCast(a.(string)))
}
}
return nil
}

181
peer/peer.go Normal file
View File

@ -0,0 +1,181 @@
// Package peer implements an object used to represent peers in the libp2p network.
package peer
import (
"encoding/hex"
"errors"
"fmt"
ic "github.com/libp2p/go-libp2p-crypto"
b58 "github.com/mr-tron/base58/base58"
mh "github.com/multiformats/go-multihash"
)
var (
// ErrEmptyPeerID is an error for empty peer ID.
ErrEmptyPeerID = errors.New("empty peer ID")
// ErrNoPublicKey is an error for peer IDs that don't embed public keys
ErrNoPublicKey = errors.New("public key is not embedded in peer ID")
)
// AdvancedEnableInlining enables automatically inlining keys shorter than
// 42 bytes into the peer ID (using the "identity" multihash function).
//
// WARNING: This flag will likely be set to false in the future and eventually
// be removed in favor of using a hash function specified by the key itself.
// See: https://github.com/libp2p/specs/issues/138
//
// DO NOT change this flag unless you know what you're doing.
//
// This currently defaults to true for backwards compatibility but will likely
// be set to false by default when an upgrade path is determined.
var AdvancedEnableInlining = true
const maxInlineKeyLength = 42
// ID is a libp2p peer identity.
type ID string
// Pretty returns a b58-encoded string of the ID
func (id ID) Pretty() string {
return IDB58Encode(id)
}
// Loggable returns a pretty peerID string in loggable JSON format
func (id ID) Loggable() map[string]interface{} {
return map[string]interface{}{
"peerID": id.Pretty(),
}
}
func (id ID) String() string {
return id.Pretty()
}
// String prints out the peer.
//
// TODO(brian): ensure correctness at ID generation and
// enforce this by only exposing functions that generate
// IDs safely. Then any peer.ID type found in the
// codebase is known to be correct.
func (id ID) ShortString() string {
pid := id.Pretty()
if len(pid) <= 10 {
return fmt.Sprintf("<peer.ID %s>", pid)
}
return fmt.Sprintf("<peer.ID %s*%s>", pid[:2], pid[len(pid)-6:])
}
// MatchesPrivateKey tests whether this ID was derived from sk
func (id ID) MatchesPrivateKey(sk ic.PrivKey) bool {
return id.MatchesPublicKey(sk.GetPublic())
}
// MatchesPublicKey tests whether this ID was derived from pk
func (id ID) MatchesPublicKey(pk ic.PubKey) bool {
oid, err := IDFromPublicKey(pk)
if err != nil {
return false
}
return oid == id
}
// ExtractPublicKey attempts to extract the public key from an ID
//
// This method returns ErrNoPublicKey if the peer ID looks valid but it can't extract
// the public key.
func (id ID) ExtractPublicKey() (ic.PubKey, error) {
decoded, err := mh.Decode([]byte(id))
if err != nil {
return nil, err
}
if decoded.Code != mh.ID {
return nil, ErrNoPublicKey
}
pk, err := ic.UnmarshalPublicKey(decoded.Digest)
if err != nil {
return nil, err
}
return pk, nil
}
// Validate check if ID is empty or not
func (id ID) Validate() error {
if id == ID("") {
return ErrEmptyPeerID
}
return nil
}
// IDFromString cast a string to ID type, and validate
// the id to make sure it is a multihash.
func IDFromString(s string) (ID, error) {
if _, err := mh.Cast([]byte(s)); err != nil {
return ID(""), err
}
return ID(s), nil
}
// IDFromBytes cast a string to ID type, and validate
// the id to make sure it is a multihash.
func IDFromBytes(b []byte) (ID, error) {
if _, err := mh.Cast(b); err != nil {
return ID(""), err
}
return ID(b), nil
}
// IDB58Decode returns a b58-decoded Peer
func IDB58Decode(s string) (ID, error) {
m, err := mh.FromB58String(s)
if err != nil {
return "", err
}
return ID(m), err
}
// IDB58Encode returns b58-encoded string
func IDB58Encode(id ID) string {
return b58.Encode([]byte(id))
}
// IDHexDecode returns a hex-decoded Peer
func IDHexDecode(s string) (ID, error) {
m, err := mh.FromHexString(s)
if err != nil {
return "", err
}
return ID(m), err
}
// IDHexEncode returns hex-encoded string
func IDHexEncode(id ID) string {
return hex.EncodeToString([]byte(id))
}
// IDFromPublicKey returns the Peer ID corresponding to pk
func IDFromPublicKey(pk ic.PubKey) (ID, error) {
b, err := pk.Bytes()
if err != nil {
return "", err
}
var alg uint64 = mh.SHA2_256
if AdvancedEnableInlining && len(b) <= maxInlineKeyLength {
alg = mh.ID
}
hash, _ := mh.Sum(b, alg, -1)
return ID(hash), nil
}
// IDFromPrivateKey returns the Peer ID corresponding to sk
func IDFromPrivateKey(sk ic.PrivKey) (ID, error) {
return IDFromPublicKey(sk.GetPublic())
}
// IDSlice for sorting peers
type IDSlice []ID
func (es IDSlice) Len() int { return len(es) }
func (es IDSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
func (es IDSlice) Less(i, j int) bool { return string(es[i]) < string(es[j]) }

75
peer/peer_serde.go Normal file
View File

@ -0,0 +1,75 @@
// This file contains Protobuf and JSON serialization/deserialization methods for peer IDs.
package peer
import (
"encoding"
"encoding/json"
)
// Interface assertions commented out to avoid introducing hard dependencies to protobuf.
// var _ proto.Marshaler = (*ID)(nil)
// var _ proto.Unmarshaler = (*ID)(nil)
var _ json.Marshaler = (*ID)(nil)
var _ json.Unmarshaler = (*ID)(nil)
var _ encoding.BinaryMarshaler = (*ID)(nil)
var _ encoding.BinaryUnmarshaler = (*ID)(nil)
var _ encoding.TextMarshaler = (*ID)(nil)
var _ encoding.TextUnmarshaler = (*ID)(nil)
func (id ID) Marshal() ([]byte, error) {
return []byte(id), nil
}
// BinaryMarshal returns the byte representation of the peer ID.
func (id ID) MarshalBinary() ([]byte, error) {
return id.Marshal()
}
func (id ID) MarshalTo(data []byte) (n int, err error) {
return copy(data, []byte(id)), nil
}
func (id *ID) Unmarshal(data []byte) (err error) {
*id, err = IDFromBytes(data)
return err
}
// BinaryUnmarshal sets the ID from its binary representation.
func (id *ID) UnmarshalBinary(data []byte) error {
return id.Unmarshal(data)
}
// Implements Gogo's proto.Sizer, but we omit the compile-time assertion to avoid introducing a hard
// dependency on gogo.
func (id ID) Size() int {
return len([]byte(id))
}
func (id ID) MarshalJSON() ([]byte, error) {
return json.Marshal(IDB58Encode(id))
}
func (id *ID) UnmarshalJSON(data []byte) (err error) {
var v string
if err = json.Unmarshal(data, &v); err != nil {
return err
}
*id, err = IDB58Decode(v)
return err
}
// TextMarshal returns the text encoding of the ID.
func (id ID) MarshalText() ([]byte, error) {
return []byte(IDB58Encode(id)), nil
}
// TextUnmarshal restores the ID from its text encoding.
func (id *ID) UnmarshalText(data []byte) error {
pid, err := IDB58Decode(string(data))
if err != nil {
return err
}
*id = pid
return nil
}

83
peer/peer_serde_test.go Normal file
View File

@ -0,0 +1,83 @@
package peer_test
import (
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peer/test"
)
func TestPeerSerdePB(t *testing.T) {
id, err := testutil.RandPeerID()
if err != nil {
t.Fatal(err)
}
b, err := id.Marshal()
if err != nil {
t.Fatal(err)
}
var id2 peer.ID
if err = id2.Unmarshal(b); err != nil {
t.Fatal(err)
}
if id != id2 {
t.Error("expected equal ids in circular serde test")
}
}
func TestPeerSerdeJSON(t *testing.T) {
id, err := testutil.RandPeerID()
if err != nil {
t.Fatal(err)
}
b, err := id.MarshalJSON()
if err != nil {
t.Fatal(err)
}
var id2 peer.ID
if err = id2.UnmarshalJSON(b); err != nil {
t.Fatal(err)
}
if id != id2 {
t.Error("expected equal ids in circular serde test")
}
}
func TestBinaryMarshaler(t *testing.T) {
id, err := testutil.RandPeerID()
if err != nil {
t.Fatal(err)
}
b, err := id.MarshalBinary()
if err != nil {
t.Fatal(err)
}
var id2 peer.ID
if err = id2.UnmarshalBinary(b); err != nil {
t.Fatal(err)
}
if id != id2 {
t.Error("expected equal ids in circular serde test")
}
}
func TestTextMarshaler(t *testing.T) {
id, err := testutil.RandPeerID()
if err != nil {
t.Fatal(err)
}
b, err := id.MarshalText()
if err != nil {
t.Fatal(err)
}
var id2 peer.ID
if err = id2.UnmarshalText(b); err != nil {
t.Fatal(err)
}
if id != id2 {
t.Error("expected equal ids in circular serde test")
}
}

242
peer/peer_test.go Normal file
View File

@ -0,0 +1,242 @@
package peer_test
import (
"crypto/rand"
"encoding/base64"
"fmt"
"strings"
"testing"
ic "github.com/libp2p/go-libp2p-crypto"
tu "github.com/libp2p/go-libp2p-core/peer/test"
mh "github.com/multiformats/go-multihash"
b58 "github.com/mr-tron/base58/base58"
)
var gen1 keyset // generated
var gen2 keyset // generated
var man keyset // manual
func hash(b []byte) []byte {
h, _ := mh.Sum(b, mh.SHA2_256, -1)
return []byte(h)
}
func init() {
if err := gen1.generate(); err != nil {
panic(err)
}
if err := gen2.generate(); err != nil {
panic(err)
}
skManBytes = strings.Replace(skManBytes, "\n", "", -1)
if err := man.load(hpkpMan, skManBytes); err != nil {
panic(err)
}
}
type keyset struct {
sk ic.PrivKey
pk ic.PubKey
hpk string
hpkp string
}
func (ks *keyset) generate() error {
var err error
ks.sk, ks.pk, err = tu.RandTestKeyPair(512)
if err != nil {
return err
}
bpk, err := ks.pk.Bytes()
if err != nil {
return err
}
ks.hpk = string(hash(bpk))
ks.hpkp = b58.Encode([]byte(ks.hpk))
return nil
}
func (ks *keyset) load(hpkp, skBytesStr string) error {
skBytes, err := base64.StdEncoding.DecodeString(skBytesStr)
if err != nil {
return err
}
ks.sk, err = ic.UnmarshalPrivateKey(skBytes)
if err != nil {
return err
}
ks.pk = ks.sk.GetPublic()
bpk, err := ks.pk.Bytes()
if err != nil {
return err
}
ks.hpk = string(hash(bpk))
ks.hpkp = b58.Encode([]byte(ks.hpk))
if ks.hpkp != hpkp {
return fmt.Errorf("hpkp doesn't match key. %s", hpkp)
}
return nil
}
func TestIDMatchesPublicKey(t *testing.T) {
test := func(ks keyset) {
p1, err := IDB58Decode(ks.hpkp)
if err != nil {
t.Fatal(err)
}
if ks.hpk != string(p1) {
t.Error("p1 and hpk differ")
}
if !p1.MatchesPublicKey(ks.pk) {
t.Fatal("p1 does not match pk")
}
p2, err := IDFromPublicKey(ks.pk)
if err != nil {
t.Fatal(err)
}
if p1 != p2 {
t.Error("p1 and p2 differ", p1.Pretty(), p2.Pretty())
}
if p2.Pretty() != ks.hpkp {
t.Error("hpkp and p2.Pretty differ", ks.hpkp, p2.Pretty())
}
}
test(gen1)
test(gen2)
test(man)
}
func TestIDMatchesPrivateKey(t *testing.T) {
test := func(ks keyset) {
p1, err := IDB58Decode(ks.hpkp)
if err != nil {
t.Fatal(err)
}
if ks.hpk != string(p1) {
t.Error("p1 and hpk differ")
}
if !p1.MatchesPrivateKey(ks.sk) {
t.Fatal("p1 does not match sk")
}
p2, err := IDFromPrivateKey(ks.sk)
if err != nil {
t.Fatal(err)
}
if p1 != p2 {
t.Error("p1 and p2 differ", p1.Pretty(), p2.Pretty())
}
}
test(gen1)
test(gen2)
test(man)
}
func TestPublicKeyExtraction(t *testing.T) {
t.Skip("disabled until libp2p/go-libp2p-crypto#51 is fixed")
// Happy path
_, originalPub, err := ic.GenerateEd25519Key(rand.Reader)
if err != nil {
t.Fatal(err)
}
id, err := IDFromPublicKey(originalPub)
if err != nil {
t.Fatal(err)
}
extractedPub, err := id.ExtractPublicKey()
if err != nil {
t.Fatal(err)
}
if extractedPub == nil {
t.Fatal("failed to extract public key")
}
if !originalPub.Equals(extractedPub) {
t.Fatal("extracted public key doesn't match")
}
// Test invalid multihash (invariant of the type of public key)
pk, err := ID("").ExtractPublicKey()
if err == nil {
t.Fatal("expected an error")
}
if pk != nil {
t.Fatal("expected a nil public key")
}
// Shouldn't work for, e.g. RSA keys (too large)
_, rsaPub, err := ic.GenerateKeyPair(ic.RSA, 2048)
if err != nil {
t.Fatal(err)
}
rsaId, err := IDFromPublicKey(rsaPub)
if err != nil {
t.Fatal(err)
}
extractedRsaPub, err := rsaId.ExtractPublicKey()
if err != ErrNoPublicKey {
t.Fatal(err)
}
if extractedRsaPub != nil {
t.Fatal("expected to fail to extract public key from rsa ID")
}
}
func TestValidate(t *testing.T) {
// Empty peer ID invalidates
err := ID("").Validate()
if err == nil {
t.Error("expected error")
} else if err != ErrEmptyPeerID {
t.Error("expected error message: " + ErrEmptyPeerID.Error())
}
// Non-empty peer ID validates
p, err := tu.RandPeerID()
if err != nil {
t.Fatal(err)
}
err = p.Validate()
if err != nil {
t.Error("expected nil, but found " + err.Error())
}
}
var hpkpMan = `QmRK3JgmVEGiewxWbhpXLJyjWuGuLeSTMTndA1coMHEy5o`
var skManBytes = `
CAAS4AQwggJcAgEAAoGBAL7w+Wc4VhZhCdM/+Hccg5Nrf4q9NXWwJylbSrXz/unFS24wyk6pEk0zi3W
7li+vSNVO+NtJQw9qGNAMtQKjVTP+3Vt/jfQRnQM3s6awojtjueEWuLYVt62z7mofOhCtj+VwIdZNBo
/EkLZ0ETfcvN5LVtLYa8JkXybnOPsLvK+PAgMBAAECgYBdk09HDM7zzL657uHfzfOVrdslrTCj6p5mo
DzvCxLkkjIzYGnlPuqfNyGjozkpSWgSUc+X+EGLLl3WqEOVdWJtbM61fewEHlRTM5JzScvwrJ39t7o6
CCAjKA0cBWBd6UWgbN/t53RoWvh9HrA2AW5YrT0ZiAgKe9y7EMUaENVJ8QJBAPhpdmb4ZL4Fkm4OKia
NEcjzn6mGTlZtef7K/0oRC9+2JkQnCuf6HBpaRhJoCJYg7DW8ZY+AV6xClKrgjBOfERMCQQDExhnzu2
dsQ9k8QChBlpHO0TRbZBiQfC70oU31kM1AeLseZRmrxv9Yxzdl8D693NNWS2JbKOXl0kMHHcuGQLMVA
kBZ7WvkmPV3aPL6jnwp2pXepntdVnaTiSxJ1dkXShZ/VSSDNZMYKY306EtHrIu3NZHtXhdyHKcggDXr
qkBrdgErAkAlpGPojUwemOggr4FD8sLX1ot2hDJyyV7OK2FXfajWEYJyMRL1Gm9Uk1+Un53RAkJneqp
JGAzKpyttXBTIDO51AkEA98KTiROMnnU8Y6Mgcvr68/SMIsvCYMt9/mtwSBGgl80VaTQ5Hpaktl6Xbh
VUt5Wv0tRxlXZiViCGCD1EtrrwTw==
`

72
peer/set/peerset.go Normal file
View File

@ -0,0 +1,72 @@
package peerset
import (
"sync"
"github.com/libp2p/go-libp2p-core/peer"
)
// PeerSet is a threadsafe set of peers
type PeerSet struct {
ps map[peer.ID]struct{}
lk sync.RWMutex
size int
}
func New() *PeerSet {
ps := new(PeerSet)
ps.ps = make(map[peer.ID]struct{})
ps.size = -1
return ps
}
func NewLimited(size int) *PeerSet {
ps := new(PeerSet)
ps.ps = make(map[peer.ID]struct{})
ps.size = size
return ps
}
func (ps *PeerSet) Add(p peer.ID) {
ps.lk.Lock()
ps.ps[p] = struct{}{}
ps.lk.Unlock()
}
func (ps *PeerSet) Contains(p peer.ID) bool {
ps.lk.RLock()
_, ok := ps.ps[p]
ps.lk.RUnlock()
return ok
}
func (ps *PeerSet) Size() int {
ps.lk.RLock()
defer ps.lk.RUnlock()
return len(ps.ps)
}
// TryAdd Attempts to add the given peer into the set.
// This operation can fail for one of two reasons:
// 1) The given peer is already in the set
// 2) The number of peers in the set is equal to size
func (ps *PeerSet) TryAdd(p peer.ID) bool {
var success bool
ps.lk.Lock()
if _, ok := ps.ps[p]; !ok && (len(ps.ps) < ps.size || ps.size == -1) {
success = true
ps.ps[p] = struct{}{}
}
ps.lk.Unlock()
return success
}
func (ps *PeerSet) Peers() []peer.ID {
ps.lk.Lock()
out := make([]peer.ID, 0, len(ps.ps))
for p, _ := range ps.ps {
out = append(out, p)
}
ps.lk.Unlock()
return out
}

40
peer/test/utils.go Normal file
View File

@ -0,0 +1,40 @@
package testutil
import (
"io"
"math/rand"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p-core/peer"
ci "github.com/libp2p/go-libp2p-crypto"
mh "github.com/multiformats/go-multihash"
)
var generatedPairs int64 = 0
func RandPeerID() (peer.ID, error) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
buf := make([]byte, 16)
if _, err := io.ReadFull(r, buf); err != nil {
return "", err
}
h, _ := mh.Sum(buf, mh.SHA2_256, -1)
return peer.ID(h), nil
}
func RandTestKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) {
seed := time.Now().UnixNano()
// workaround for low time resolution
seed += atomic.AddInt64(&generatedPairs, 1) << 32
r := rand.New(rand.NewSource(seed))
return ci.GenerateKeyPairWithReader(ci.RSA, bits, r)
}
func SeededTestKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) {
r := rand.New(rand.NewSource(seed))
return ci.GenerateKeyPairWithReader(ci.RSA, 512, r)
}

157
peerstore/peerstore.go Normal file
View File

@ -0,0 +1,157 @@
package peerstore
import (
"context"
"errors"
"io"
"math"
"time"
"github.com/libp2p/go-libp2p-core/peer"
ic "github.com/libp2p/go-libp2p-crypto"
ma "github.com/multiformats/go-multiaddr"
)
var ErrNotFound = errors.New("item not found")
var (
// AddressTTL is the expiration time of addresses.
AddressTTL = time.Hour
// TempAddrTTL is the ttl used for a short lived address
TempAddrTTL = time.Second * 10
// ProviderAddrTTL is the TTL of an address we've received from a provider.
// This is also a temporary address, but lasts longer. After this expires,
// the records we return will require an extra lookup.
ProviderAddrTTL = time.Minute * 10
// RecentlyConnectedAddrTTL is used when we recently connected to a peer.
// It means that we are reasonably certain of the peer's address.
RecentlyConnectedAddrTTL = time.Minute * 10
// OwnObservedAddrTTL is used for our own external addresses observed by peers.
OwnObservedAddrTTL = time.Minute * 10
)
// Permanent TTLs (distinct so we can distinguish between them, constant as they
// are, in fact, permanent)
const (
// PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes).
PermanentAddrTTL = math.MaxInt64 - iota
// ConnectedAddrTTL is the ttl used for the addresses of a peer to whom
// we're connected directly. This is basically permanent, as we will
// clear them + re-add under a TempAddrTTL after disconnecting.
ConnectedAddrTTL
)
// Peerstore provides a threadsafe store of Peer related
// information.
type Peerstore interface {
io.Closer
AddrBook
KeyBook
PeerMetadata
Metrics
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(peer.ID) peer.Info
GetProtocols(peer.ID) ([]string, error)
AddProtocols(peer.ID, ...string) error
SetProtocols(peer.ID, ...string) error
SupportsProtocols(peer.ID, ...string) ([]string, error)
// Peers returns all of the peer IDs stored across all inner stores.
Peers() peer.IDSlice
}
// PeerMetadata can handle values of any type. Serializing values is
// up to the implementation. Dynamic type introspection may not be
// supported, in which case explicitly enlisting types in the
// serializer may be required.
//
// Refer to the docs of the underlying implementation for more
// information.
type PeerMetadata interface {
// Get/Put is a simple registry for other peer-related key/value pairs.
// if we find something we use often, it should become its own set of
// methods. this is a last resort.
Get(p peer.ID, key string) (interface{}, error)
Put(p peer.ID, key string, val interface{}) error
}
// AddrBook holds the multiaddrs of peers.
type AddrBook interface {
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
// AddAddrs gives this AddrBook addresses to use, with a given ttl
// (time-to-live), after which the address is no longer valid.
// If the manager has a longer TTL, the operation is a no-op for that address
AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration)
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
// UpdateAddrs updates the addresses associated with the given peer that have
// the given oldTTL to have the given newTTL.
UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration)
// Addresses returns all known (and valid) addresses for a given peer
Addrs(p peer.ID) []ma.Multiaddr
// AddrStream returns a channel that gets all addresses for a given
// peer sent on it. If new addresses are added after the call is made
// they will be sent along through the channel as well.
AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr
// ClearAddresses removes all previously stored addresses
ClearAddrs(p peer.ID)
// PeersWithAddrs returns all of the peer IDs stored in the AddrBook
PeersWithAddrs() peer.IDSlice
}
// KeyBook tracks the keys of Peers.
type KeyBook interface {
// PubKey stores the public key of a peer.
PubKey(peer.ID) ic.PubKey
// AddPubKey stores the public key of a peer.
AddPubKey(peer.ID, ic.PubKey) error
// PrivKey returns the private key of a peer, if known. Generally this might only be our own
// private key, see
// https://discuss.libp2p.io/t/what-is-the-purpose-of-having-map-peer-id-privatekey-in-peerstore/74.
PrivKey(peer.ID) ic.PrivKey
// AddPrivKey stores the private key of a peer.
AddPrivKey(peer.ID, ic.PrivKey) error
// PeersWithKeys returns all the peer IDs stored in the KeyBook
PeersWithKeys() peer.IDSlice
}
// Metrics is just an object that tracks metrics
// across a set of peers.
type Metrics interface {
// RecordLatency records a new latency measurement
RecordLatency(peer.ID, time.Duration)
// LatencyEWMA returns an exponentially-weighted moving avg.
// of all measurements of a peer's latency.
LatencyEWMA(peer.ID) time.Duration
}

29
pnet/error.go Normal file
View File

@ -0,0 +1,29 @@
package pnet
// Error is error type for ease of detecting PNet errors
type Error interface {
IsPNetError() bool
}
// NewError creates new Error
func NewError(err string) error {
return pnetErr("privnet: " + err)
}
// IsPNetError checks if given error is PNet Error
func IsPNetError(err error) bool {
v, ok := err.(Error)
return ok && v.IsPNetError()
}
type pnetErr string
var _ Error = (*pnetErr)(nil)
func (p pnetErr) Error() string {
return string(p)
}
func (pnetErr) IsPNetError() bool {
return true
}

20
pnet/error_test.go Normal file
View File

@ -0,0 +1,20 @@
package pnet
import (
"errors"
"testing"
)
func TestIsPnetErr(t *testing.T) {
err := NewError("test")
if err.Error() != "privnet: test" {
t.Fatalf("expected 'privnet: test' got '%s'", err.Error())
}
if !IsPNetError(err) {
t.Fatal("expected the pnetErr to be detected by IsPnetError")
}
if IsPNetError(errors.New("not pnet error")) {
t.Fatal("expected generic error not to be pnetError")
}
}

38
pnet/pnet.go Normal file
View File

@ -0,0 +1,38 @@
package pnet
import (
"net"
"os"
)
// EnvKey defines environment variable name for forcing usage of PNet in libp2p
// When environment variable of this name is set to "1" the ForcePrivateNetwork
// variable will be set to true.
const EnvKey = "LIBP2P_FORCE_PNET"
// ForcePrivateNetwork is boolean variable that forces usage of PNet in libp2p
// Setting this variable to true or setting LIBP2P_FORCE_PNET environment variable
// to true will make libp2p to require private network protector.
// If no network protector is provided and this variable is set to true libp2p will
// refuse to connect.
var ForcePrivateNetwork = false
func init() {
ForcePrivateNetwork = os.Getenv(EnvKey) == "1"
}
// ErrNotInPrivateNetwork is an error that should be returned by libp2p when it
// tries to dial with ForcePrivateNetwork set and no PNet Protector
var ErrNotInPrivateNetwork = NewError("private network was not configured but" +
" is enforced by the environment")
// Protector interface is a way for private network implementation to be transparent in
// libp2p. It is created by implementation and use by libp2p-conn to secure connections
// so they can be only established with selected number of peers.
type Protector interface {
// Wraps passed connection to protect it
Protect(net.Conn) (net.Conn, error)
// Returns key fingerprint that is safe to expose
Fingerprint() []byte
}

9
protocol/id.go Normal file
View File

@ -0,0 +1,9 @@
package protocol
// ID is an identifier used to write protocol headers in streams.
type ID string
// These are reserved protocol.IDs.
const (
TestingID ID = "/p2p/_testing"
)

27
protocol/protocol.go Normal file
View File

@ -0,0 +1,27 @@
package protocol
import (
"io"
)
// HandlerFunc is a user-provided function used by the Router to
// handle a protocol/stream.
type HandlerFunc = func(protocol string, rwc io.ReadWriteCloser) error
type Router interface {
AddHandler(protocol string, handler HandlerFunc)
AddHandlerWithFunc(protocol string, match func(string) bool, handler HandlerFunc)
RemoveHandler(protocol string)
Protocols() []string
}
type Negotiator interface {
NegotiateLazy(rwc io.ReadWriteCloser) (io.ReadWriteCloser, string, HandlerFunc, error)
Negotiate(rwc io.ReadWriteCloser) (string, HandlerFunc, error)
Handle(rwc io.ReadWriteCloser) error
}
type Switch interface {
Router
Negotiator
}

25
sec/conn.go Normal file
View File

@ -0,0 +1,25 @@
package sec
import (
"context"
"net"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
// SecureConn is an authenticated, encrypted connection.
type SecureConn interface {
net.Conn
network.ConnSecurity
}
// A SecureTransport turns inbound and outbound unauthenticated,
// plain-text, native connections into authenticated, encrypted connections.
type SecureTransport interface {
// SecureInbound secures an inbound connection.
SecureInbound(ctx context.Context, insecure net.Conn) (SecureConn, error)
// SecureOutbound secures an outbound connection.
SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (SecureConn, error)
}

86
sec/insecure/insecure.go Normal file
View File

@ -0,0 +1,86 @@
package insecure
import (
"context"
"net"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec"
ci "github.com/libp2p/go-libp2p-crypto"
)
// ID is the multistream-select protocol ID that should be used when identifying
// this security transport.
const ID = "/plaintext/1.0.0"
// Transport is a no-op stream security transport. It provides no
// security and simply wraps connections in blank
type Transport struct {
id peer.ID
}
// New constructs a new insecure transport.
func New(id peer.ID) *Transport {
return &Transport{
id: id,
}
}
// LocalPeer returns the transports local peer ID.
func (t *Transport) LocalPeer() peer.ID {
return t.id
}
// LocalPrivateKey returns nil. This transport is not secure.
func (t *Transport) LocalPrivateKey() ci.PrivKey {
return nil
}
// SecureInbound *pretends to secure* an outbound connection to the given peer.
func (t *Transport) SecureInbound(ctx context.Context, insecure net.Conn) (sec.SecureConn, error) {
return &Conn{
Conn: insecure,
local: t.id,
}, nil
}
// SecureOutbound *pretends to secure* an outbound connection to the given peer.
func (t *Transport) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error) {
return &Conn{
Conn: insecure,
local: t.id,
remote: p,
}, nil
}
// Conn is the connection type returned by the insecure transport.
type Conn struct {
net.Conn
local peer.ID
remote peer.ID
}
// LocalPeer returns the local peer ID.
func (ic *Conn) LocalPeer() peer.ID {
return ic.local
}
// RemotePeer returns the remote peer ID if we initiated the dial. Otherwise, it
// returns "" (because this connection isn't actually secure).
func (ic *Conn) RemotePeer() peer.ID {
return ic.remote
}
// RemotePublicKey returns nil. This connection is not secure
func (ic *Conn) RemotePublicKey() ci.PubKey {
return nil
}
// LocalPrivateKey returns nil. This connection is not secure.
func (ic *Conn) LocalPrivateKey() ci.PrivKey {
return nil
}
var _ sec.SecureTransport = (*Transport)(nil)
var _ sec.SecureConn = (*Conn)(nil)

373
sec/test/ttest.go Normal file
View File

@ -0,0 +1,373 @@
package test
import (
"bytes"
"context"
"io"
"net"
"sync"
"testing"
"time"
"math/rand"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec"
)
var Subtests = map[string]func(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID){
"RW": SubtestRW,
"Keys": SubtestKeys,
"WrongPeer": SubtestWrongPeer,
"Stream": SubtestStream,
"CancelHandshakeInbound": SubtestCancelHandshakeInbound,
"CancelHandshakeOutbound": SubtestCancelHandshakeOutbound,
}
var TestMessage = []byte("hello world!")
var TestStreamLen int64 = 1024 * 1024
var TestSeed int64 = 1812
func SubtestAll(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
for n, f := range Subtests {
t.Run(n, func(t *testing.T) {
f(t, at, bt, ap, bp)
f(t, bt, at, bp, ap)
})
}
}
func randStream() io.Reader {
return &io.LimitedReader{
R: rand.New(rand.NewSource(TestSeed)),
N: TestStreamLen,
}
}
func testWriteSustain(t *testing.T, c sec.SecureConn) {
source := randStream()
n := int64(0)
for {
coppied, err := io.CopyN(c, source, int64(rand.Intn(8000)))
n += coppied
switch err {
case io.EOF:
if n != TestStreamLen {
t.Fatal("incorrect random stream length")
}
return
case nil:
default:
t.Fatal(err)
}
}
}
func testReadSustain(t *testing.T, c sec.SecureConn) {
expected := randStream()
total := 0
ebuf := make([]byte, 1024)
abuf := make([]byte, 1024)
for {
n, err := c.Read(abuf)
if err != nil {
t.Fatal(err)
}
total += n
_, err = io.ReadFull(expected, ebuf[:n])
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(abuf[:n], ebuf[:n]) {
t.Fatal("bytes not equal")
}
if total == int(TestStreamLen) {
return
}
}
}
func testWrite(t *testing.T, c sec.SecureConn) {
n, err := c.Write(TestMessage)
if err != nil {
t.Fatal(err)
}
if n != len(TestMessage) {
t.Errorf("wrote %d bytes, expected to write %d bytes", n, len(TestMessage))
}
}
func testRead(t *testing.T, c sec.SecureConn) {
buf := make([]byte, 100)
n, err := c.Read(buf)
if err != nil {
t.Fatal(err)
}
if n != len(TestMessage) {
t.Errorf("wrote %d bytes, expected to write %d bytes", n, len(TestMessage))
}
if !bytes.Equal(buf[:n], TestMessage) {
t.Errorf("received bad test message: %s", string(buf[:n]))
}
}
func testWriteFail(t *testing.T, c sec.SecureConn) {
n, err := c.Write(TestMessage)
if n != 0 || err == nil {
t.Error("shouldn't have been able to write to a closed conn")
}
}
func testReadFail(t *testing.T, c sec.SecureConn) {
buf := make([]byte, len(TestMessage))
n, err := c.Read(buf)
if n != 0 || err == nil {
t.Error("shouldn't have been able to write to a closed conn")
}
}
func testEOF(t *testing.T, c sec.SecureConn) {
buf := make([]byte, 100)
n, err := c.Read(buf)
if n != 0 {
t.Errorf("didn't expect to read any bytes, read: %d", n)
}
if err != io.EOF {
t.Errorf("expected read to fail with EOF, got: %s", err)
}
}
func SubtestRW(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a, b := net.Pipe()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c, err := at.SecureInbound(ctx, a)
if err != nil {
a.Close()
t.Fatal(err)
}
if c.LocalPeer() != ap {
t.Errorf("expected local peer %s, got %s", ap, c.LocalPeer())
}
testWrite(t, c)
testRead(t, c)
c.Close()
testWriteFail(t, c)
testReadFail(t, c)
}()
go func() {
defer wg.Done()
c, err := bt.SecureOutbound(ctx, b, ap)
if err != nil {
b.Close()
t.Fatal(err)
}
if c.RemotePeer() != ap {
t.Errorf("expected remote peer %s, got %s", ap, c.RemotePeer())
}
if c.LocalPeer() != bp {
t.Errorf("expected local peer %s, got %s", bp, c.LocalPeer())
}
testRead(t, c)
testWrite(t, c)
testEOF(t, c)
testWriteFail(t, c)
c.Close()
}()
wg.Wait()
}
func SubtestKeys(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a, b := net.Pipe()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c, err := at.SecureInbound(ctx, a)
if err != nil {
a.Close()
t.Fatal(err)
}
defer c.Close()
if c.RemotePeer() != bp {
t.Errorf("expected remote peer %s, got remote peer %s", bp, c.RemotePeer())
}
if c.LocalPeer() != ap {
t.Errorf("expected local peer %s, got local peer %s", ap, c.LocalPeer())
}
if !c.LocalPeer().MatchesPrivateKey(c.LocalPrivateKey()) {
t.Error("local private key mismatch")
}
if !c.RemotePeer().MatchesPublicKey(c.RemotePublicKey()) {
t.Error("local private key mismatch")
}
}()
go func() {
defer wg.Done()
c, err := bt.SecureOutbound(ctx, b, ap)
if err != nil {
b.Close()
t.Fatal(err)
}
defer c.Close()
if c.RemotePeer() != ap {
t.Errorf("expected remote peer %s, got remote peer %s", ap, c.RemotePeer())
}
if c.LocalPeer() != bp {
t.Errorf("expected local peer %s, got local peer %s", bp, c.LocalPeer())
}
if !c.LocalPeer().MatchesPrivateKey(c.LocalPrivateKey()) {
t.Error("local private key mismatch")
}
if !c.RemotePeer().MatchesPublicKey(c.RemotePublicKey()) {
t.Error("local private key mismatch")
}
c.Close()
}()
wg.Wait()
}
func SubtestWrongPeer(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a, b := net.Pipe()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
defer a.Close()
_, err := at.SecureInbound(ctx, a)
if err == nil {
t.Fatal("conection should have failed")
}
}()
go func() {
defer wg.Done()
defer b.Close()
_, err := bt.SecureOutbound(ctx, b, bp)
if err == nil {
t.Fatal("connection should have failed")
}
}()
wg.Wait()
}
func SubtestCancelHandshakeOutbound(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
a, b := net.Pipe()
defer a.Close()
defer b.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := at.SecureOutbound(ctx, a, ap)
if err == nil {
t.Fatal("connection should have failed")
}
}()
time.Sleep(time.Millisecond)
cancel()
wg.Add(1)
go func() {
defer wg.Done()
_, err := bt.SecureInbound(ctx, b)
if err == nil {
t.Fatal("connection should have failed")
}
}()
wg.Wait()
}
func SubtestCancelHandshakeInbound(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
a, b := net.Pipe()
defer a.Close()
defer b.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := at.SecureInbound(ctx, a)
if err == nil {
t.Fatal("connection should have failed")
}
}()
time.Sleep(time.Millisecond)
cancel()
wg.Add(1)
go func() {
defer wg.Done()
_, err := bt.SecureOutbound(ctx, b, bp)
if err == nil {
t.Fatal("connection should have failed")
}
}()
wg.Wait()
}
func SubtestStream(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a, b := net.Pipe()
defer a.Close()
defer b.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c, err := at.SecureInbound(ctx, a)
if err != nil {
t.Fatal(err)
}
var swg sync.WaitGroup
swg.Add(2)
go func() {
defer swg.Done()
testWriteSustain(t, c)
}()
go func() {
defer swg.Done()
testReadSustain(t, c)
}()
swg.Wait()
c.Close()
}()
go func() {
defer wg.Done()
c, err := bt.SecureOutbound(ctx, b, ap)
if err != nil {
t.Fatal(err)
}
io.Copy(c, c)
c.Close()
}()
wg.Wait()
}

95
transport/transport.go Normal file
View File

@ -0,0 +1,95 @@
package transport
import (
"context"
"net"
"time"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
// DialTimeout is the maximum duration a Dial is allowed to take.
// This includes the time between dialing the raw network connection,
// protocol selection as well the handshake, if applicable.
var DialTimeout = 60 * time.Second
// AcceptTimeout is the maximum duration an Accept is allowed to take.
// This includes the time between accepting the raw network connection,
// protocol selection as well as the handshake, if applicable.
var AcceptTimeout = 60 * time.Second
// UpgradedConn is a connection that is is an extension of the net.Conn interface that provides multiaddr
// information, and an accessor for the transport used to create the conn
type UpgradedConn interface {
mux.MuxedConn
network.ConnSecurity
network.ConnMultiaddrs
// Transport returns the transport to which this connection belongs.
Transport() Transport
}
// Transport represents any device by which you can connect to and accept
// connections from other peers. The built-in transports provided are TCP and UTP
// but many more can be implemented, sctp, audio signals, sneakernet, UDT, a
// network of drones carrying usb flash drives, and so on.
type Transport interface {
// Dial dials a remote peer. It should try to reuse local listener
// addresses if possible but it may choose not to.
Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (UpgradedConn, error)
// CanDial returns true if this transport knows how to dial the given
// multiaddr.
//
// Returning true does not guarantee that dialing this multiaddr will
// succeed. This function should *only* be used to preemptively filter
// out addresses that we can't dial.
CanDial(addr ma.Multiaddr) bool
// Listen listens on the passed multiaddr.
Listen(laddr ma.Multiaddr) (Listener, error)
// Protocol returns the set of protocols handled by this transport.
//
// See the Network interface for an explanation of how this is used.
Protocols() []int
// Proxy returns true if this is a proxy transport.
//
// See the Network interface for an explanation of how this is used.
// TODO: Make this a part of the go-multiaddr protocol instead?
Proxy() bool
}
// Listener is an interface closely resembling the net.Listener interface. The
// only real difference is that Accept() returns Conn's of the type in this
// package, and also exposes a Multiaddr method as opposed to a regular Addr
// method
type Listener interface {
Accept() (UpgradedConn, error)
Close() error
Addr() net.Addr
Multiaddr() ma.Multiaddr
}
// Network is an inet.Network with methods for managing transports.
type TransportBasedNetwork interface {
network.Network
// AddTransport adds a transport to this Network.
//
// When dialing, this Network will iterate over the protocols in the
// remote multiaddr and pick the first protocol registered with a proxy
// transport, if any. Otherwise, it'll pick the transport registered to
// handle the last protocol in the multiaddr.
//
// When listening, this Network will iterate over the protocols in the
// local multiaddr and pick the *last* protocol registered with a proxy
// transport, if any. Otherwise, it'll pick the transport registered to
// handle the last protocol in the multiaddr.
AddTransport(t Transport) error
}