1
0
mirror of https://github.com/libp2p/go-libp2p-core.git synced 2025-04-27 17:00:21 +08:00

absorb transport tests.

This commit is contained in:
Raúl Kripalani 2019-04-17 13:11:20 +01:00
parent 40efb03084
commit c9a30b9fbe
11 changed files with 840 additions and 41 deletions

5
go.mod
View File

@ -4,10 +4,11 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/ipfs/go-cid v0.0.1
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
github.com/kr/pretty v0.1.0 // indirect
github.com/libp2p/go-libp2p-crypto v0.0.1
github.com/libp2p/go-libp2p-peer v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.0.2
github.com/mr-tron/base58 v1.1.1
github.com/multiformats/go-multiaddr v0.0.2
github.com/multiformats/go-multihash v0.0.1
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)

39
go.sum
View File

@ -1,5 +1,3 @@
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
@ -14,32 +12,17 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ=
github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1 h1:GBjWPktLnNyX0JiQCNFpUuUSoMw5KMyqrsejHYlILBE=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
@ -52,16 +35,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw=
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
github.com/libp2p/go-libp2p-peer v0.0.1/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo=
github.com/libp2p/go-libp2p-peer v0.1.0 h1:9D1St1vqXRkeAhNdDtpt8AivS1bhzA6yH+YWrVXWcWI=
github.com/libp2p/go-libp2p-peer v0.1.0/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo=
github.com/libp2p/go-libp2p-peerstore v0.0.2 h1:Lirt3A1Oq11jszJ4SPNBo8chNv61UWXE538KUEGxTVk=
github.com/libp2p/go-libp2p-peerstore v0.0.2/go.mod h1:RabLyPVJLuNQ+GFyoEkfi8H4Ti6k/HtZJ7YKgtSq+20=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
@ -71,11 +46,8 @@ github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr-dns v0.0.1/go.mod h1:9kWcqw/Pj6FwxAwW38n/9403szc57zJPs45fmnznu3Q=
github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d1tnvM7ihcFwHMSstNAVUU=
github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ=
@ -83,21 +55,11 @@ github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKT
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
github.com/whyrusleeping/mafmt v1.2.8/go.mod h1:faQJFPbLSxzD9xpA02ttW/tS9vZykNvXwGvqIpk20FA=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0=
@ -106,7 +68,6 @@ golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpbl
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=

7
helpers/race.go Normal file
View File

@ -0,0 +1,7 @@
package helpers
// WithRace returns whether the binary was compiled
// with the race flag on.
func WithRace() bool {
return withRace
}

9
helpers/race_test.go Normal file
View File

@ -0,0 +1,9 @@
package helpers
import (
"testing"
)
func TestWithRace(t *testing.T) {
t.Logf("WithRace() is %v\n", WithRace())
}

5
helpers/race_with.go Normal file
View File

@ -0,0 +1,5 @@
// +build race
package helpers
const withRace = true

5
helpers/race_without.go Normal file
View File

@ -0,0 +1,5 @@
// +build !race
package helpers
const withRace = false

479
transport/test/stream.go Normal file
View File

@ -0,0 +1,479 @@
package test
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"runtime/debug"
"strconv"
"sync"
"testing"
"time"
crand "crypto/rand"
mrand "math/rand"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)
// VerboseDebugging can be set to true to enable verbose debug logging in the
// stream stress tests.
var VerboseDebugging = false
var randomness []byte
var StressTestTimeout = 1 * time.Minute
func init() {
// read 1MB of randomness
randomness = make([]byte, 1<<20)
if _, err := crand.Read(randomness); err != nil {
panic(err)
}
if timeout := os.Getenv("TEST_STRESS_TIMEOUT_MS"); timeout != "" {
if v, err := strconv.ParseInt(timeout, 10, 32); err == nil {
StressTestTimeout = time.Duration(v) * time.Millisecond
}
}
}
type Options struct {
connNum int
streamNum int
msgNum int
msgMin int
msgMax int
}
func fullClose(t *testing.T, s mux.MuxStream) {
if err := s.Close(); err != nil {
t.Error(err)
s.Reset()
return
}
b, err := ioutil.ReadAll(s)
if err != nil {
t.Error(err)
}
if len(b) != 0 {
t.Error("expected to be done reading")
}
}
func randBuf(size int) []byte {
n := len(randomness) - size
if size < 1 {
panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness)))
}
start := mrand.Intn(n)
return randomness[start : start+size]
}
func checkErr(t *testing.T, err error) {
t.Helper()
if err != nil {
debug.PrintStack()
// TODO: not safe to call in parallel
t.Fatal(err)
}
}
func debugLog(t *testing.T, s string, args ...interface{}) {
if VerboseDebugging {
t.Logf(s, args...)
}
}
func echoStream(t *testing.T, s mux.MuxStream) {
defer s.Close()
// echo everything
var err error
if VerboseDebugging {
t.Logf("accepted stream")
_, err = io.Copy(&logWriter{t, s}, s)
t.Log("closing stream")
} else {
_, err = io.Copy(s, s) // echo everything
}
if err != nil {
t.Error(err)
}
}
type logWriter struct {
t *testing.T
W io.Writer
}
func (lw *logWriter) Write(buf []byte) (int, error) {
lw.t.Logf("logwriter: writing %d bytes", len(buf))
return lw.W.Write(buf)
}
func goServe(t *testing.T, l transport.Listener) (done func()) {
closed := make(chan struct{}, 1)
go func() {
for {
c, err := l.Accept()
if err != nil {
select {
case <-closed:
return // closed naturally.
default:
checkErr(t, err)
}
}
debugLog(t, "accepted connection")
go func() {
for {
str, err := c.AcceptStream()
if err != nil {
break
}
go echoStream(t, str)
}
}()
}
}()
return func() {
closed <- struct{}{}
}
}
func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) {
msgsize := 1 << 11
errs := make(chan error, 0) // dont block anything.
rateLimitN := 5000 // max of 5k funcs, because -race has 8k max.
rateLimitChan := make(chan struct{}, rateLimitN)
for i := 0; i < rateLimitN; i++ {
rateLimitChan <- struct{}{}
}
rateLimit := func(f func()) {
<-rateLimitChan
f()
rateLimitChan <- struct{}{}
}
writeStream := func(s mux.MuxStream, bufs chan<- []byte) {
debugLog(t, "writeStream %p, %d msgNum", s, opt.msgNum)
for i := 0; i < opt.msgNum; i++ {
buf := randBuf(msgsize)
bufs <- buf
debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3])
if _, err := s.Write(buf); err != nil {
errs <- fmt.Errorf("s.Write(buf): %s", err)
continue
}
}
}
readStream := func(s mux.MuxStream, bufs <-chan []byte) {
debugLog(t, "readStream %p, %d msgNum", s, opt.msgNum)
buf2 := make([]byte, msgsize)
i := 0
for buf1 := range bufs {
i++
debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
if _, err := io.ReadFull(s, buf2); err != nil {
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
continue
}
if !bytes.Equal(buf1, buf2) {
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
}
}
}
openStreamAndRW := func(c mux.MuxedConn) {
debugLog(t, "openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum)
s, err := c.OpenStream()
if err != nil {
errs <- fmt.Errorf("Failed to create NewStream: %s", err)
return
}
bufs := make(chan []byte, opt.msgNum)
go func() {
writeStream(s, bufs)
close(bufs)
}()
readStream(s, bufs)
fullClose(t, s)
}
openConnAndRW := func() {
debugLog(t, "openConnAndRW")
l, err := ta.Listen(maddr)
checkErr(t, err)
done := goServe(t, l)
defer done()
c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
// serve the outgoing conn, because some muxers assume
// that we _always_ call serve. (this is an error?)
go func() {
debugLog(t, "serving connection")
for {
str, err := c.AcceptStream()
if err != nil {
break
}
go echoStream(t, str)
}
}()
var wg sync.WaitGroup
for i := 0; i < opt.streamNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openStreamAndRW(c)
})
}
wg.Wait()
c.Close()
}
openConnsAndRW := func() {
debugLog(t, "openConnsAndRW, %d conns", opt.connNum)
var wg sync.WaitGroup
for i := 0; i < opt.connNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
openConnAndRW()
})
}
wg.Wait()
}
go func() {
openConnsAndRW()
close(errs) // done
}()
for err := range errs {
t.Error(err)
}
}
func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
l, err := ta.Listen(maddr)
checkErr(t, err)
defer l.Close()
count := 10000
workers := 5
if helpers.WithRace() {
// the race detector can only deal with 8128 simultaneous goroutines, so let's make sure we don't go overboard.
count = 1000
}
var (
connA, connB transport.UpgradedConn
)
accepted := make(chan error, 1)
go func() {
var err error
connA, err = l.Accept()
accepted <- err
}()
connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
checkErr(t, <-accepted)
defer func() {
if connA != nil {
connA.Close()
}
if connB != nil {
connB.Close()
}
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < workers; j++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
s, err := connA.OpenStream()
if err != nil {
t.Error(err)
return
}
wg.Add(1)
go func() {
defer wg.Done()
fullClose(t, s)
}()
}
}()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < count*workers; i++ {
str, err := connB.AcceptStream()
if err != nil {
break
}
wg.Add(1)
go func() {
defer wg.Done()
fullClose(t, str)
}()
}
}()
timeout := time.After(StressTestTimeout)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-timeout:
t.Fatal("timed out receiving streams")
case <-done:
}
}
func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
l, err := ta.Listen(maddr)
checkErr(t, err)
done := make(chan struct{}, 2)
go func() {
muxa, err := l.Accept()
checkErr(t, err)
s, err := muxa.OpenStream()
if err != nil {
panic(err)
}
// Some transports won't open the stream until we write. That's
// fine.
s.Write([]byte("foo"))
time.Sleep(time.Millisecond * 50)
_, err = s.Write([]byte("bar"))
if err == nil {
t.Error("should have failed to write")
}
s.Close()
done <- struct{}{}
}()
muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
go func() {
str, err := muxb.AcceptStream()
checkErr(t, err)
str.Reset()
done <- struct{}{}
}()
<-done
<-done
}
func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 1,
msgNum: 1,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn1Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 1,
msgNum: 100,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn100Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 100,
msgNum: 100,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress50Conn10Stream50Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 50,
streamNum: 10,
msgNum: 50,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn1000Stream10Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 1000,
msgNum: 10,
msgMax: 100,
msgMin: 100,
})
}
func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 100,
msgNum: 100,
msgMax: 10000,
msgMin: 1000,
})
}

287
transport/test/transport.go Normal file
View File

@ -0,0 +1,287 @@
package test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"sync"
"testing"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)
var testData = []byte("this is some test data")
type streamAndConn struct {
stream mux.MuxStream
conn transport.UpgradedConn
}
func SubtestProtocols(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
rawIPAddr, _ := ma.NewMultiaddr("/ip4/1.2.3.4")
if ta.CanDial(rawIPAddr) || tb.CanDial(rawIPAddr) {
t.Error("nothing should be able to dial raw IP")
}
tprotos := make(map[int]bool)
for _, p := range ta.Protocols() {
tprotos[p] = true
}
if !ta.Proxy() {
protos := maddr.Protocols()
proto := protos[len(protos)-1]
if !tprotos[proto.Code] {
t.Errorf("transport should have reported that it supports protocol '%s' (%d)", proto.Name, proto.Code)
}
} else {
found := false
for _, proto := range maddr.Protocols() {
if tprotos[proto.Code] {
found = true
break
}
}
if !found {
t.Errorf("didn't find any matching proxy protocols in maddr: %s", maddr)
}
}
}
func SubtestBasic(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
list, err := ta.Listen(maddr)
if err != nil {
t.Fatal(err)
}
defer list.Close()
var (
connA, connB transport.UpgradedConn
done = make(chan struct{})
)
defer func() {
<-done
if connA != nil {
connA.Close()
}
if connB != nil {
connB.Close()
}
}()
go func() {
defer close(done)
var err error
connB, err = list.Accept()
if err != nil {
t.Error(err)
return
}
s, err := connB.AcceptStream()
if err != nil {
t.Error(err)
return
}
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Error(err)
return
}
if !bytes.Equal(testData, buf) {
t.Errorf("expected %s, got %s", testData, buf)
}
n, err := s.Write(testData)
if err != nil {
t.Error(err)
return
}
if n != len(testData) {
t.Error(err)
return
}
err = s.Close()
if err != nil {
t.Error(err)
}
}()
if !tb.CanDial(list.Multiaddr()) {
t.Error("CanDial should have returned true")
}
connA, err = tb.Dial(ctx, list.Multiaddr(), peerA)
if err != nil {
t.Fatal(err)
}
s, err := connA.OpenStream()
if err != nil {
t.Fatal(err)
}
n, err := s.Write(testData)
if err != nil {
t.Fatal(err)
return
}
if n != len(testData) {
t.Fatalf("failed to write enough data (a->b)")
return
}
err = s.Close()
if err != nil {
t.Fatal(err)
return
}
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
return
}
if !bytes.Equal(testData, buf) {
t.Errorf("expected %s, got %s", testData, buf)
}
}
func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
streams := 100
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
list, err := ta.Listen(maddr)
if err != nil {
t.Fatal(err)
}
defer list.Close()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
c, err := list.Accept()
if err != nil {
t.Error(err)
return
}
defer c.Close()
var sWg sync.WaitGroup
for i := 0; i < streams; i++ {
s, err := c.AcceptStream()
if err != nil {
t.Error(err)
return
}
sWg.Add(1)
go func() {
defer sWg.Done()
data, err := ioutil.ReadAll(s)
if err != nil {
s.Reset()
t.Error(err)
return
}
if !bytes.HasPrefix(data, testData) {
t.Errorf("expected %q to have prefix %q", string(data), string(testData))
}
n, err := s.Write(data)
if err != nil {
s.Reset()
t.Error(err)
return
}
if n != len(data) {
s.Reset()
t.Error(err)
return
}
s.Close()
}()
}
sWg.Wait()
}()
if !tb.CanDial(list.Multiaddr()) {
t.Error("CanDial should have returned true")
}
c, err := tb.Dial(ctx, list.Multiaddr(), peerA)
if err != nil {
t.Fatal(err)
}
defer c.Close()
for i := 0; i < streams; i++ {
s, err := c.OpenStream()
if err != nil {
t.Error(err)
continue
}
wg.Add(1)
go func(i int) {
defer wg.Done()
data := []byte(fmt.Sprintf("%s - %d", testData, i))
n, err := s.Write(data)
if err != nil {
s.Reset()
t.Error(err)
return
}
if n != len(data) {
s.Reset()
t.Error("failed to write enough data (a->b)")
return
}
s.Close()
ret, err := ioutil.ReadAll(s)
if err != nil {
s.Reset()
t.Error(err)
return
}
if !bytes.Equal(data, ret) {
t.Errorf("expected %q, got %q", string(data), string(ret))
}
}(i)
}
wg.Wait()
}
func SubtestCancel(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
list, err := ta.Listen(maddr)
if err != nil {
t.Fatal(err)
}
defer list.Close()
ctx, cancel := context.WithCancel(context.Background())
cancel()
c, err := tb.Dial(ctx, list.Multiaddr(), peerA)
if err == nil {
c.Close()
t.Fatal("dial should have failed")
}
}

45
transport/test/utils.go Normal file
View File

@ -0,0 +1,45 @@
package test
import (
"reflect"
"runtime"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)
var Subtests = []func(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID){
SubtestProtocols,
SubtestBasic,
SubtestCancel,
SubtestPingPong,
// Stolen from the stream muxer test suite.
SubtestStress1Conn1Stream1Msg,
SubtestStress1Conn1Stream100Msg,
SubtestStress1Conn100Stream100Msg,
SubtestStress50Conn10Stream50Msg,
SubtestStress1Conn1000Stream10Msg,
SubtestStress1Conn100Stream100Msg10MB,
SubtestStreamOpenStress,
SubtestStreamReset,
}
func getFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}
func SubtestTransport(t *testing.T, ta, tb transport.Transport, addr string, peerA peer.ID) {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
for _, f := range Subtests {
t.Run(getFunctionName(f), func(t *testing.T) {
f(t, ta, tb, maddr, peerA)
})
}
}