mirror of
https://github.com/libp2p/go-libp2p-core.git
synced 2025-04-11 15:10:21 +08:00
refactor test utilities, harnesses and suites.
This commit is contained in:
parent
06118baed6
commit
ed42958fbb
@ -1,13 +1,21 @@
|
||||
package test
|
||||
package tcrypto
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
ci "github.com/libp2p/go-libp2p-core/crypto"
|
||||
)
|
||||
|
||||
var generatedPairs int64 = 0
|
||||
|
||||
func RandTestKeyPair(typ, bits int) (ci.PrivKey, ci.PubKey, error) {
|
||||
seed := time.Now().UnixNano()
|
||||
|
||||
// workaround for low time resolution
|
||||
seed += atomic.AddInt64(&generatedPairs, 1) << 32
|
||||
|
||||
return SeededTestKeyPair(typ, bits, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
|
3
go.mod
3
go.mod
@ -6,13 +6,10 @@ require (
|
||||
github.com/gogo/protobuf v1.2.1
|
||||
github.com/ipfs/go-cid v0.0.1
|
||||
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/libp2p/go-flow-metrics v0.0.1
|
||||
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16
|
||||
github.com/mr-tron/base58 v1.1.1
|
||||
github.com/multiformats/go-multiaddr v0.0.2
|
||||
github.com/multiformats/go-multihash v0.0.1
|
||||
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
|
||||
)
|
||||
|
9
go.sum
9
go.sum
@ -30,11 +30,6 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT
|
||||
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s=
|
||||
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
|
||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
|
||||
@ -64,14 +59,10 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0=
|
||||
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
|
||||
|
@ -1,7 +0,0 @@
|
||||
package helpers
|
||||
|
||||
// WithRace returns whether the binary was compiled
|
||||
// with the race flag on.
|
||||
func WithRace() bool {
|
||||
return withRace
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
package helpers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestWithRace(t *testing.T) {
|
||||
t.Logf("WithRace() is %v\n", WithRace())
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
// +build race
|
||||
|
||||
package helpers
|
||||
|
||||
const withRace = true
|
@ -1,5 +0,0 @@
|
||||
// +build !race
|
||||
|
||||
package helpers
|
||||
|
||||
const withRace = false
|
@ -51,6 +51,5 @@ type MuxedConn interface {
|
||||
type Multiplexer interface {
|
||||
|
||||
// NewConn constructs a new connection
|
||||
// TODO rename to Wrap / Multiplex
|
||||
NewConn(c net.Conn, isServer bool) (MuxedConn, error)
|
||||
}
|
||||
|
@ -1,541 +0,0 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
crand "crypto/rand"
|
||||
mrand "math/rand"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/mux"
|
||||
)
|
||||
|
||||
var randomness []byte
|
||||
|
||||
func init() {
|
||||
// read 1MB of randomness
|
||||
randomness = make([]byte, 1<<20)
|
||||
if _, err := crand.Read(randomness); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
tr mux.Multiplexer
|
||||
connNum int
|
||||
streamNum int
|
||||
msgNum int
|
||||
msgMin int
|
||||
msgMax int
|
||||
}
|
||||
|
||||
func randBuf(size int) []byte {
|
||||
n := len(randomness) - size
|
||||
if size < 1 {
|
||||
panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness)))
|
||||
}
|
||||
|
||||
start := mrand.Intn(n)
|
||||
return randomness[start : start+size]
|
||||
}
|
||||
|
||||
func checkErr(t *testing.T, err error) {
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func log(s string, v ...interface{}) {
|
||||
if testing.Verbose() {
|
||||
fmt.Fprintf(os.Stderr, "> "+s+"\n", v...)
|
||||
}
|
||||
}
|
||||
|
||||
func echoStream(s mux.MuxedStream) {
|
||||
defer s.Close()
|
||||
log("accepted stream")
|
||||
io.Copy(&LogWriter{s}, s) // echo everything
|
||||
log("closing stream")
|
||||
}
|
||||
|
||||
type LogWriter struct {
|
||||
W io.Writer
|
||||
}
|
||||
|
||||
func (lw *LogWriter) Write(buf []byte) (int, error) {
|
||||
if testing.Verbose() {
|
||||
log("logwriter: writing %d bytes", len(buf))
|
||||
}
|
||||
return lw.W.Write(buf)
|
||||
}
|
||||
|
||||
func GoServe(t *testing.T, tr mux.Multiplexer, l net.Listener) (done func()) {
|
||||
closed := make(chan struct{}, 1)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
c1, err := l.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-closed:
|
||||
return // closed naturally.
|
||||
default:
|
||||
checkErr(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
log("accepted connection")
|
||||
sc1, err := tr.NewConn(c1, true)
|
||||
checkErr(t, err)
|
||||
go func() {
|
||||
for {
|
||||
str, err := sc1.AcceptStream()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
go echoStream(str)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
return func() {
|
||||
closed <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestSimpleWrite(t *testing.T, tr mux.Multiplexer) {
|
||||
l, err := net.Listen("tcp", "localhost:0")
|
||||
checkErr(t, err)
|
||||
log("listening at %s", l.Addr().String())
|
||||
done := GoServe(t, tr, l)
|
||||
defer done()
|
||||
|
||||
log("dialing to %s", l.Addr().String())
|
||||
nc1, err := net.Dial("tcp", l.Addr().String())
|
||||
checkErr(t, err)
|
||||
defer nc1.Close()
|
||||
|
||||
log("wrapping conn")
|
||||
c1, err := tr.NewConn(nc1, false)
|
||||
checkErr(t, err)
|
||||
defer c1.Close()
|
||||
|
||||
// serve the outgoing conn, because some muxers assume
|
||||
// that we _always_ call serve. (this is an error?)
|
||||
go c1.AcceptStream()
|
||||
|
||||
log("creating stream")
|
||||
s1, err := c1.OpenStream()
|
||||
checkErr(t, err)
|
||||
defer s1.Close()
|
||||
|
||||
buf1 := randBuf(4096)
|
||||
log("writing %d bytes to stream", len(buf1))
|
||||
_, err = s1.Write(buf1)
|
||||
checkErr(t, err)
|
||||
|
||||
buf2 := make([]byte, len(buf1))
|
||||
log("reading %d bytes from stream (echoed)", len(buf2))
|
||||
_, err = s1.Read(buf2)
|
||||
checkErr(t, err)
|
||||
|
||||
if string(buf2) != string(buf1) {
|
||||
t.Errorf("buf1 and buf2 not equal: %s != %s", string(buf1), string(buf2))
|
||||
}
|
||||
log("done")
|
||||
}
|
||||
|
||||
func SubtestStress(t *testing.T, opt Options) {
|
||||
msgsize := 1 << 11
|
||||
errs := make(chan error, 0) // dont block anything.
|
||||
|
||||
rateLimitN := 5000 // max of 5k funcs, because -race has 8k max.
|
||||
rateLimitChan := make(chan struct{}, rateLimitN)
|
||||
for i := 0; i < rateLimitN; i++ {
|
||||
rateLimitChan <- struct{}{}
|
||||
}
|
||||
|
||||
rateLimit := func(f func()) {
|
||||
<-rateLimitChan
|
||||
f()
|
||||
rateLimitChan <- struct{}{}
|
||||
}
|
||||
|
||||
writeStream := func(s mux.MuxedStream, bufs chan<- []byte) {
|
||||
log("writeStream %p, %d msgNum", s, opt.msgNum)
|
||||
|
||||
for i := 0; i < opt.msgNum; i++ {
|
||||
buf := randBuf(msgsize)
|
||||
bufs <- buf
|
||||
log("%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3])
|
||||
if _, err := s.Write(buf); err != nil {
|
||||
errs <- fmt.Errorf("s.Write(buf): %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
readStream := func(s mux.MuxedStream, bufs <-chan []byte) {
|
||||
log("readStream %p, %d msgNum", s, opt.msgNum)
|
||||
|
||||
buf2 := make([]byte, msgsize)
|
||||
i := 0
|
||||
for buf1 := range bufs {
|
||||
i++
|
||||
log("%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
|
||||
|
||||
if _, err := io.ReadFull(s, buf2); err != nil {
|
||||
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
|
||||
log("%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(buf1, buf2) {
|
||||
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
openStreamAndRW := func(c mux.MuxedConn) {
|
||||
log("openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum)
|
||||
|
||||
s, err := c.OpenStream()
|
||||
if err != nil {
|
||||
errs <- fmt.Errorf("Failed to create NewStream: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
bufs := make(chan []byte, opt.msgNum)
|
||||
go func() {
|
||||
writeStream(s, bufs)
|
||||
close(bufs)
|
||||
}()
|
||||
|
||||
readStream(s, bufs)
|
||||
s.Close()
|
||||
}
|
||||
|
||||
openConnAndRW := func() {
|
||||
log("openConnAndRW")
|
||||
|
||||
l, err := net.Listen("tcp", "localhost:0")
|
||||
checkErr(t, err)
|
||||
done := GoServe(t, opt.tr, l)
|
||||
defer done()
|
||||
|
||||
nla := l.Addr()
|
||||
nc, err := net.Dial(nla.Network(), nla.String())
|
||||
checkErr(t, err)
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Errorf("net.Dial(%s, %s): %s", nla.Network(), nla.String(), err))
|
||||
return
|
||||
}
|
||||
|
||||
c, err := opt.tr.NewConn(nc, false)
|
||||
if err != nil {
|
||||
t.Fatal(fmt.Errorf("a.AddConn(%s <--> %s): %s", nc.LocalAddr(), nc.RemoteAddr(), err))
|
||||
return
|
||||
}
|
||||
|
||||
// serve the outgoing conn, because some muxers assume
|
||||
// that we _always_ call serve. (this is an error?)
|
||||
go func() {
|
||||
log("serving connection")
|
||||
for {
|
||||
str, err := c.AcceptStream()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
go echoStream(str)
|
||||
}
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < opt.streamNum; i++ {
|
||||
wg.Add(1)
|
||||
go rateLimit(func() {
|
||||
defer wg.Done()
|
||||
openStreamAndRW(c)
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
c.Close()
|
||||
}
|
||||
|
||||
openConnsAndRW := func() {
|
||||
log("openConnsAndRW, %d conns", opt.connNum)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < opt.connNum; i++ {
|
||||
wg.Add(1)
|
||||
go rateLimit(func() {
|
||||
defer wg.Done()
|
||||
openConnAndRW()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
go func() {
|
||||
openConnsAndRW()
|
||||
close(errs) // done
|
||||
}()
|
||||
|
||||
for err := range errs {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func tcpPipe(t *testing.T) (net.Conn, net.Conn) {
|
||||
list, err := net.Listen("tcp", "0.0.0.0:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
con1, err := net.Dial("tcp", list.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
con2, err := list.Accept()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return con1, con2
|
||||
}
|
||||
|
||||
func SubtestStreamOpenStress(t *testing.T, tr mux.Multiplexer) {
|
||||
a, b := tcpPipe(t)
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
count := 10000
|
||||
go func() {
|
||||
muxa, err := tr.NewConn(a, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
stress := func() {
|
||||
for i := 0; i < count; i++ {
|
||||
s, err := muxa.OpenStream()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s.Close()
|
||||
}
|
||||
}
|
||||
|
||||
go stress()
|
||||
go stress()
|
||||
go stress()
|
||||
go stress()
|
||||
go stress()
|
||||
}()
|
||||
|
||||
muxb, err := tr.NewConn(b, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
recv := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
str, err := muxb.AcceptStream()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
go func() {
|
||||
recv <- struct{}{}
|
||||
str.Close()
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
limit := time.After(time.Second * 10)
|
||||
for i := 0; i < count*5; i++ {
|
||||
select {
|
||||
case <-recv:
|
||||
case <-limit:
|
||||
t.Fatal("timed out receiving streams")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestStreamReset(t *testing.T, tr mux.Multiplexer) {
|
||||
a, b := tcpPipe(t)
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
done := make(chan struct{}, 2)
|
||||
go func() {
|
||||
muxa, err := tr.NewConn(a, true)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s, err := muxa.OpenStream()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
_, err = s.Write([]byte("foo"))
|
||||
if err == nil {
|
||||
t.Error("should have failed to write")
|
||||
}
|
||||
|
||||
s.Close()
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
muxb, err := tr.NewConn(b, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
str, err := muxb.AcceptStream()
|
||||
checkErr(t, err)
|
||||
str.Reset()
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
<-done
|
||||
<-done
|
||||
}
|
||||
|
||||
// check that Close also closes the underlying net.Conn
|
||||
func SubtestWriteAfterClose(t *testing.T, tr mux.Multiplexer) {
|
||||
a, b := tcpPipe(t)
|
||||
|
||||
muxa, err := tr.NewConn(a, true)
|
||||
checkErr(t, err)
|
||||
|
||||
muxb, err := tr.NewConn(b, false)
|
||||
checkErr(t, err)
|
||||
|
||||
err = muxa.Close()
|
||||
checkErr(t, err)
|
||||
err = muxb.Close()
|
||||
checkErr(t, err)
|
||||
|
||||
// make sure the underlying net.Conn was closed
|
||||
if _, err := a.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
t.Fatal("write should have failed")
|
||||
}
|
||||
if _, err := b.Write([]byte("foobar")); err == nil || !strings.Contains(err.Error(), "use of closed network connection") {
|
||||
t.Fatal("write should have failed")
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestStress1Conn1Stream1Msg(t *testing.T, tr mux.Multiplexer) {
|
||||
SubtestStress(t, Options{
|
||||
tr: tr,
|
||||
connNum: 1,
|
||||
streamNum: 1,
|
||||
msgNum: 1,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn1Stream100Msg(t *testing.T, tr mux.Multiplexer) {
|
||||
SubtestStress(t, Options{
|
||||
tr: tr,
|
||||
connNum: 1,
|
||||
streamNum: 1,
|
||||
msgNum: 100,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn100Stream100Msg(t *testing.T, tr mux.Multiplexer) {
|
||||
SubtestStress(t, Options{
|
||||
tr: tr,
|
||||
connNum: 1,
|
||||
streamNum: 100,
|
||||
msgNum: 100,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress50Conn10Stream50Msg(t *testing.T, tr mux.Multiplexer) {
|
||||
SubtestStress(t, Options{
|
||||
tr: tr,
|
||||
connNum: 50,
|
||||
streamNum: 10,
|
||||
msgNum: 50,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn1000Stream10Msg(t *testing.T, tr mux.Multiplexer) {
|
||||
SubtestStress(t, Options{
|
||||
tr: tr,
|
||||
connNum: 1,
|
||||
streamNum: 1000,
|
||||
msgNum: 10,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, tr mux.Multiplexer) {
|
||||
SubtestStress(t, Options{
|
||||
tr: tr,
|
||||
connNum: 1,
|
||||
streamNum: 100,
|
||||
msgNum: 100,
|
||||
msgMax: 10000,
|
||||
msgMin: 1000,
|
||||
})
|
||||
}
|
||||
|
||||
// Subtests are all the subtests run by SubtestAll
|
||||
var Subtests = []TransportTest{
|
||||
SubtestSimpleWrite,
|
||||
SubtestWriteAfterClose,
|
||||
SubtestStress1Conn1Stream1Msg,
|
||||
SubtestStress1Conn1Stream100Msg,
|
||||
SubtestStress1Conn100Stream100Msg,
|
||||
SubtestStress50Conn10Stream50Msg,
|
||||
SubtestStress1Conn1000Stream10Msg,
|
||||
SubtestStress1Conn100Stream100Msg10MB,
|
||||
SubtestStreamOpenStress,
|
||||
SubtestStreamReset,
|
||||
}
|
||||
|
||||
func getFunctionName(i interface{}) string {
|
||||
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
|
||||
}
|
||||
|
||||
// SubtestAll runs all the stream multiplexer tests against the target
|
||||
// transport.
|
||||
func SubtestAll(t *testing.T, tr mux.Multiplexer) {
|
||||
for _, f := range Subtests {
|
||||
t.Run(getFunctionName(f), func(t *testing.T) {
|
||||
f(t, tr)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TransportTest is a stream multiplex transport test case
|
||||
type TransportTest func(t *testing.T, tr mux.Multiplexer)
|
@ -7,13 +7,14 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
ic "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
tpeer "github.com/libp2p/go-libp2p-core/peer/test"
|
||||
|
||||
tu "github.com/libp2p/go-libp2p-core/peer/test"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
ic "github.com/libp2p/go-libp2p-core/crypto"
|
||||
tcrypto "github.com/libp2p/go-libp2p-core/crypto/test"
|
||||
|
||||
b58 "github.com/mr-tron/base58/base58"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
var gen1 keyset // generated
|
||||
@ -48,7 +49,7 @@ type keyset struct {
|
||||
|
||||
func (ks *keyset) generate() error {
|
||||
var err error
|
||||
ks.sk, ks.pk, err = tu.RandTestKeyPair(512)
|
||||
ks.sk, ks.pk, err = tcrypto.RandTestKeyPair(ic.RSA, 512)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -216,7 +217,7 @@ func TestValidate(t *testing.T) {
|
||||
}
|
||||
|
||||
// Non-empty peer ID validates
|
||||
p, err := tu.RandPeerID()
|
||||
p, err := tpeer.RandPeerID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -1,19 +1,16 @@
|
||||
package testutil
|
||||
package tpeer
|
||||
|
||||
import (
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
ci "github.com/libp2p/go-libp2p-core/crypto"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
var generatedPairs int64 = 0
|
||||
|
||||
func RandPeerID() (peer.ID, error) {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
buf := make([]byte, 16)
|
||||
@ -24,17 +21,10 @@ func RandPeerID() (peer.ID, error) {
|
||||
return peer.ID(h), nil
|
||||
}
|
||||
|
||||
func RandTestKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) {
|
||||
seed := time.Now().UnixNano()
|
||||
|
||||
// workaround for low time resolution
|
||||
seed += atomic.AddInt64(&generatedPairs, 1) << 32
|
||||
|
||||
r := rand.New(rand.NewSource(seed))
|
||||
return ci.GenerateKeyPairWithReader(ci.RSA, bits, r)
|
||||
}
|
||||
|
||||
func SeededTestKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) {
|
||||
r := rand.New(rand.NewSource(seed))
|
||||
return ci.GenerateKeyPairWithReader(ci.RSA, 512, r)
|
||||
func RandPeerIDFatal(t testing.TB) peer.ID {
|
||||
p, err := RandPeerID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
@ -39,7 +39,6 @@ var (
|
||||
// Permanent TTLs (distinct so we can distinguish between them, constant as they
|
||||
// are, in fact, permanent)
|
||||
const (
|
||||
|
||||
// PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes).
|
||||
PermanentAddrTTL = math.MaxInt64 - iota
|
||||
|
||||
|
@ -1,373 +0,0 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"math/rand"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/sec"
|
||||
)
|
||||
|
||||
var Subtests = map[string]func(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID){
|
||||
"RW": SubtestRW,
|
||||
"Keys": SubtestKeys,
|
||||
"WrongPeer": SubtestWrongPeer,
|
||||
"Stream": SubtestStream,
|
||||
"CancelHandshakeInbound": SubtestCancelHandshakeInbound,
|
||||
"CancelHandshakeOutbound": SubtestCancelHandshakeOutbound,
|
||||
}
|
||||
|
||||
var TestMessage = []byte("hello world!")
|
||||
var TestStreamLen int64 = 1024 * 1024
|
||||
var TestSeed int64 = 1812
|
||||
|
||||
func SubtestAll(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
|
||||
for n, f := range Subtests {
|
||||
t.Run(n, func(t *testing.T) {
|
||||
f(t, at, bt, ap, bp)
|
||||
f(t, bt, at, bp, ap)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func randStream() io.Reader {
|
||||
return &io.LimitedReader{
|
||||
R: rand.New(rand.NewSource(TestSeed)),
|
||||
N: TestStreamLen,
|
||||
}
|
||||
}
|
||||
|
||||
func testWriteSustain(t *testing.T, c sec.SecureConn) {
|
||||
source := randStream()
|
||||
n := int64(0)
|
||||
for {
|
||||
coppied, err := io.CopyN(c, source, int64(rand.Intn(8000)))
|
||||
n += coppied
|
||||
|
||||
switch err {
|
||||
case io.EOF:
|
||||
if n != TestStreamLen {
|
||||
t.Fatal("incorrect random stream length")
|
||||
}
|
||||
return
|
||||
case nil:
|
||||
default:
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testReadSustain(t *testing.T, c sec.SecureConn) {
|
||||
expected := randStream()
|
||||
total := 0
|
||||
ebuf := make([]byte, 1024)
|
||||
abuf := make([]byte, 1024)
|
||||
for {
|
||||
n, err := c.Read(abuf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
total += n
|
||||
_, err = io.ReadFull(expected, ebuf[:n])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(abuf[:n], ebuf[:n]) {
|
||||
t.Fatal("bytes not equal")
|
||||
}
|
||||
if total == int(TestStreamLen) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
func testWrite(t *testing.T, c sec.SecureConn) {
|
||||
n, err := c.Write(TestMessage)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != len(TestMessage) {
|
||||
t.Errorf("wrote %d bytes, expected to write %d bytes", n, len(TestMessage))
|
||||
}
|
||||
}
|
||||
|
||||
func testRead(t *testing.T, c sec.SecureConn) {
|
||||
buf := make([]byte, 100)
|
||||
n, err := c.Read(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if n != len(TestMessage) {
|
||||
t.Errorf("wrote %d bytes, expected to write %d bytes", n, len(TestMessage))
|
||||
}
|
||||
if !bytes.Equal(buf[:n], TestMessage) {
|
||||
t.Errorf("received bad test message: %s", string(buf[:n]))
|
||||
}
|
||||
}
|
||||
|
||||
func testWriteFail(t *testing.T, c sec.SecureConn) {
|
||||
n, err := c.Write(TestMessage)
|
||||
if n != 0 || err == nil {
|
||||
t.Error("shouldn't have been able to write to a closed conn")
|
||||
}
|
||||
}
|
||||
|
||||
func testReadFail(t *testing.T, c sec.SecureConn) {
|
||||
buf := make([]byte, len(TestMessage))
|
||||
n, err := c.Read(buf)
|
||||
if n != 0 || err == nil {
|
||||
t.Error("shouldn't have been able to write to a closed conn")
|
||||
}
|
||||
}
|
||||
|
||||
func testEOF(t *testing.T, c sec.SecureConn) {
|
||||
buf := make([]byte, 100)
|
||||
n, err := c.Read(buf)
|
||||
if n != 0 {
|
||||
t.Errorf("didn't expect to read any bytes, read: %d", n)
|
||||
}
|
||||
if err != io.EOF {
|
||||
t.Errorf("expected read to fail with EOF, got: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestRW(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
a, b := net.Pipe()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, err := at.SecureInbound(ctx, a)
|
||||
if err != nil {
|
||||
a.Close()
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if c.LocalPeer() != ap {
|
||||
t.Errorf("expected local peer %s, got %s", ap, c.LocalPeer())
|
||||
}
|
||||
testWrite(t, c)
|
||||
testRead(t, c)
|
||||
c.Close()
|
||||
testWriteFail(t, c)
|
||||
testReadFail(t, c)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, err := bt.SecureOutbound(ctx, b, ap)
|
||||
if err != nil {
|
||||
b.Close()
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if c.RemotePeer() != ap {
|
||||
t.Errorf("expected remote peer %s, got %s", ap, c.RemotePeer())
|
||||
}
|
||||
if c.LocalPeer() != bp {
|
||||
t.Errorf("expected local peer %s, got %s", bp, c.LocalPeer())
|
||||
}
|
||||
testRead(t, c)
|
||||
testWrite(t, c)
|
||||
testEOF(t, c)
|
||||
testWriteFail(t, c)
|
||||
c.Close()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func SubtestKeys(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
a, b := net.Pipe()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, err := at.SecureInbound(ctx, a)
|
||||
if err != nil {
|
||||
a.Close()
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if c.RemotePeer() != bp {
|
||||
t.Errorf("expected remote peer %s, got remote peer %s", bp, c.RemotePeer())
|
||||
}
|
||||
if c.LocalPeer() != ap {
|
||||
t.Errorf("expected local peer %s, got local peer %s", ap, c.LocalPeer())
|
||||
}
|
||||
if !c.LocalPeer().MatchesPrivateKey(c.LocalPrivateKey()) {
|
||||
t.Error("local private key mismatch")
|
||||
}
|
||||
if !c.RemotePeer().MatchesPublicKey(c.RemotePublicKey()) {
|
||||
t.Error("local private key mismatch")
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, err := bt.SecureOutbound(ctx, b, ap)
|
||||
if err != nil {
|
||||
b.Close()
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if c.RemotePeer() != ap {
|
||||
t.Errorf("expected remote peer %s, got remote peer %s", ap, c.RemotePeer())
|
||||
}
|
||||
if c.LocalPeer() != bp {
|
||||
t.Errorf("expected local peer %s, got local peer %s", bp, c.LocalPeer())
|
||||
}
|
||||
if !c.LocalPeer().MatchesPrivateKey(c.LocalPrivateKey()) {
|
||||
t.Error("local private key mismatch")
|
||||
}
|
||||
if !c.RemotePeer().MatchesPublicKey(c.RemotePublicKey()) {
|
||||
t.Error("local private key mismatch")
|
||||
}
|
||||
c.Close()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func SubtestWrongPeer(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
a, b := net.Pipe()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer a.Close()
|
||||
_, err := at.SecureInbound(ctx, a)
|
||||
if err == nil {
|
||||
t.Fatal("conection should have failed")
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
defer b.Close()
|
||||
_, err := bt.SecureOutbound(ctx, b, bp)
|
||||
if err == nil {
|
||||
t.Fatal("connection should have failed")
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func SubtestCancelHandshakeOutbound(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := at.SecureOutbound(ctx, a, ap)
|
||||
if err == nil {
|
||||
t.Fatal("connection should have failed")
|
||||
}
|
||||
}()
|
||||
time.Sleep(time.Millisecond)
|
||||
cancel()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := bt.SecureInbound(ctx, b)
|
||||
if err == nil {
|
||||
t.Fatal("connection should have failed")
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
func SubtestCancelHandshakeInbound(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
a, b := net.Pipe()
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := at.SecureInbound(ctx, a)
|
||||
if err == nil {
|
||||
t.Fatal("connection should have failed")
|
||||
}
|
||||
}()
|
||||
time.Sleep(time.Millisecond)
|
||||
cancel()
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_, err := bt.SecureOutbound(ctx, b, bp)
|
||||
if err == nil {
|
||||
t.Fatal("connection should have failed")
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
}
|
||||
|
||||
func SubtestStream(t *testing.T, at, bt sec.SecureTransport, ap, bp peer.ID) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
a, b := net.Pipe()
|
||||
|
||||
defer a.Close()
|
||||
defer b.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
c, err := at.SecureInbound(ctx, a)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var swg sync.WaitGroup
|
||||
swg.Add(2)
|
||||
go func() {
|
||||
defer swg.Done()
|
||||
testWriteSustain(t, c)
|
||||
}()
|
||||
go func() {
|
||||
defer swg.Done()
|
||||
testReadSustain(t, c)
|
||||
}()
|
||||
swg.Wait()
|
||||
c.Close()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, err := bt.SecureOutbound(ctx, b, ap)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
io.Copy(c, c)
|
||||
c.Close()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
@ -1,479 +0,0 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
crand "crypto/rand"
|
||||
mrand "math/rand"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/helpers"
|
||||
"github.com/libp2p/go-libp2p-core/mux"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/transport"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// VerboseDebugging can be set to true to enable verbose debug logging in the
|
||||
// stream stress tests.
|
||||
var VerboseDebugging = false
|
||||
|
||||
var randomness []byte
|
||||
|
||||
var StressTestTimeout = 1 * time.Minute
|
||||
|
||||
func init() {
|
||||
// read 1MB of randomness
|
||||
randomness = make([]byte, 1<<20)
|
||||
if _, err := crand.Read(randomness); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if timeout := os.Getenv("TEST_STRESS_TIMEOUT_MS"); timeout != "" {
|
||||
if v, err := strconv.ParseInt(timeout, 10, 32); err == nil {
|
||||
StressTestTimeout = time.Duration(v) * time.Millisecond
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
connNum int
|
||||
streamNum int
|
||||
msgNum int
|
||||
msgMin int
|
||||
msgMax int
|
||||
}
|
||||
|
||||
func fullClose(t *testing.T, s mux.MuxedStream) {
|
||||
if err := s.Close(); err != nil {
|
||||
t.Error(err)
|
||||
s.Reset()
|
||||
return
|
||||
}
|
||||
b, err := ioutil.ReadAll(s)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if len(b) != 0 {
|
||||
t.Error("expected to be done reading")
|
||||
}
|
||||
}
|
||||
|
||||
func randBuf(size int) []byte {
|
||||
n := len(randomness) - size
|
||||
if size < 1 {
|
||||
panic(fmt.Errorf("requested too large buffer (%d). max is %d", size, len(randomness)))
|
||||
}
|
||||
|
||||
start := mrand.Intn(n)
|
||||
return randomness[start : start+size]
|
||||
}
|
||||
|
||||
func checkErr(t *testing.T, err error) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
debug.PrintStack()
|
||||
// TODO: not safe to call in parallel
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func debugLog(t *testing.T, s string, args ...interface{}) {
|
||||
if VerboseDebugging {
|
||||
t.Logf(s, args...)
|
||||
}
|
||||
}
|
||||
|
||||
func echoStream(t *testing.T, s mux.MuxedStream) {
|
||||
defer s.Close()
|
||||
// echo everything
|
||||
var err error
|
||||
if VerboseDebugging {
|
||||
t.Logf("accepted stream")
|
||||
_, err = io.Copy(&logWriter{t, s}, s)
|
||||
t.Log("closing stream")
|
||||
} else {
|
||||
_, err = io.Copy(s, s) // echo everything
|
||||
}
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
type logWriter struct {
|
||||
t *testing.T
|
||||
W io.Writer
|
||||
}
|
||||
|
||||
func (lw *logWriter) Write(buf []byte) (int, error) {
|
||||
lw.t.Logf("logwriter: writing %d bytes", len(buf))
|
||||
return lw.W.Write(buf)
|
||||
}
|
||||
|
||||
func goServe(t *testing.T, l transport.Listener) (done func()) {
|
||||
closed := make(chan struct{}, 1)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
select {
|
||||
case <-closed:
|
||||
return // closed naturally.
|
||||
default:
|
||||
checkErr(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
debugLog(t, "accepted connection")
|
||||
go func() {
|
||||
for {
|
||||
str, err := c.AcceptStream()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
go echoStream(t, str)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
return func() {
|
||||
closed <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID, opt Options) {
|
||||
msgsize := 1 << 11
|
||||
errs := make(chan error, 0) // dont block anything.
|
||||
|
||||
rateLimitN := 5000 // max of 5k funcs, because -race has 8k max.
|
||||
rateLimitChan := make(chan struct{}, rateLimitN)
|
||||
for i := 0; i < rateLimitN; i++ {
|
||||
rateLimitChan <- struct{}{}
|
||||
}
|
||||
|
||||
rateLimit := func(f func()) {
|
||||
<-rateLimitChan
|
||||
f()
|
||||
rateLimitChan <- struct{}{}
|
||||
}
|
||||
|
||||
writeStream := func(s mux.MuxedStream, bufs chan<- []byte) {
|
||||
debugLog(t, "writeStream %p, %d msgNum", s, opt.msgNum)
|
||||
|
||||
for i := 0; i < opt.msgNum; i++ {
|
||||
buf := randBuf(msgsize)
|
||||
bufs <- buf
|
||||
debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3])
|
||||
if _, err := s.Write(buf); err != nil {
|
||||
errs <- fmt.Errorf("s.Write(buf): %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
readStream := func(s mux.MuxedStream, bufs <-chan []byte) {
|
||||
debugLog(t, "readStream %p, %d msgNum", s, opt.msgNum)
|
||||
|
||||
buf2 := make([]byte, msgsize)
|
||||
i := 0
|
||||
for buf1 := range bufs {
|
||||
i++
|
||||
debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
|
||||
|
||||
if _, err := io.ReadFull(s, buf2); err != nil {
|
||||
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
|
||||
debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(buf1, buf2) {
|
||||
errs <- fmt.Errorf("buffers not equal (%x != %x)", buf1[:3], buf2[:3])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
openStreamAndRW := func(c mux.MuxedConn) {
|
||||
debugLog(t, "openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum)
|
||||
|
||||
s, err := c.OpenStream()
|
||||
if err != nil {
|
||||
errs <- fmt.Errorf("Failed to create NewStream: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
bufs := make(chan []byte, opt.msgNum)
|
||||
go func() {
|
||||
writeStream(s, bufs)
|
||||
close(bufs)
|
||||
}()
|
||||
|
||||
readStream(s, bufs)
|
||||
fullClose(t, s)
|
||||
}
|
||||
|
||||
openConnAndRW := func() {
|
||||
debugLog(t, "openConnAndRW")
|
||||
|
||||
l, err := ta.Listen(maddr)
|
||||
checkErr(t, err)
|
||||
|
||||
done := goServe(t, l)
|
||||
defer done()
|
||||
|
||||
c, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
|
||||
checkErr(t, err)
|
||||
|
||||
// serve the outgoing conn, because some muxers assume
|
||||
// that we _always_ call serve. (this is an error?)
|
||||
go func() {
|
||||
debugLog(t, "serving connection")
|
||||
for {
|
||||
str, err := c.AcceptStream()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
go echoStream(t, str)
|
||||
}
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < opt.streamNum; i++ {
|
||||
wg.Add(1)
|
||||
go rateLimit(func() {
|
||||
defer wg.Done()
|
||||
openStreamAndRW(c)
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
c.Close()
|
||||
}
|
||||
|
||||
openConnsAndRW := func() {
|
||||
debugLog(t, "openConnsAndRW, %d conns", opt.connNum)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < opt.connNum; i++ {
|
||||
wg.Add(1)
|
||||
go rateLimit(func() {
|
||||
defer wg.Done()
|
||||
openConnAndRW()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
go func() {
|
||||
openConnsAndRW()
|
||||
close(errs) // done
|
||||
}()
|
||||
|
||||
for err := range errs {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func SubtestStreamOpenStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
l, err := ta.Listen(maddr)
|
||||
checkErr(t, err)
|
||||
defer l.Close()
|
||||
|
||||
count := 10000
|
||||
workers := 5
|
||||
|
||||
if helpers.WithRace() {
|
||||
// the race detector can only deal with 8128 simultaneous goroutines, so let's make sure we don't go overboard.
|
||||
count = 1000
|
||||
}
|
||||
|
||||
var (
|
||||
connA, connB transport.UpgradedConn
|
||||
)
|
||||
|
||||
accepted := make(chan error, 1)
|
||||
go func() {
|
||||
var err error
|
||||
connA, err = l.Accept()
|
||||
accepted <- err
|
||||
}()
|
||||
connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA)
|
||||
checkErr(t, err)
|
||||
checkErr(t, <-accepted)
|
||||
|
||||
defer func() {
|
||||
if connA != nil {
|
||||
connA.Close()
|
||||
}
|
||||
if connB != nil {
|
||||
connB.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < workers; j++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < count; i++ {
|
||||
s, err := connA.OpenStream()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
fullClose(t, s)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < count*workers; i++ {
|
||||
str, err := connB.AcceptStream()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
fullClose(t, str)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
timeout := time.After(StressTestTimeout)
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatal("timed out receiving streams")
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
l, err := ta.Listen(maddr)
|
||||
checkErr(t, err)
|
||||
|
||||
done := make(chan struct{}, 2)
|
||||
go func() {
|
||||
muxa, err := l.Accept()
|
||||
checkErr(t, err)
|
||||
|
||||
s, err := muxa.OpenStream()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Some transports won't open the stream until we write. That's
|
||||
// fine.
|
||||
s.Write([]byte("foo"))
|
||||
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
_, err = s.Write([]byte("bar"))
|
||||
if err == nil {
|
||||
t.Error("should have failed to write")
|
||||
}
|
||||
|
||||
s.Close()
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
muxb, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
|
||||
checkErr(t, err)
|
||||
|
||||
go func() {
|
||||
str, err := muxb.AcceptStream()
|
||||
checkErr(t, err)
|
||||
str.Reset()
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
<-done
|
||||
<-done
|
||||
}
|
||||
|
||||
func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
SubtestStress(t, ta, tb, maddr, peerA, Options{
|
||||
connNum: 1,
|
||||
streamNum: 1,
|
||||
msgNum: 1,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn1Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
SubtestStress(t, ta, tb, maddr, peerA, Options{
|
||||
connNum: 1,
|
||||
streamNum: 1,
|
||||
msgNum: 100,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn100Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
SubtestStress(t, ta, tb, maddr, peerA, Options{
|
||||
connNum: 1,
|
||||
streamNum: 100,
|
||||
msgNum: 100,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress50Conn10Stream50Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
SubtestStress(t, ta, tb, maddr, peerA, Options{
|
||||
connNum: 50,
|
||||
streamNum: 10,
|
||||
msgNum: 50,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn1000Stream10Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
SubtestStress(t, ta, tb, maddr, peerA, Options{
|
||||
connNum: 1,
|
||||
streamNum: 1000,
|
||||
msgNum: 10,
|
||||
msgMax: 100,
|
||||
msgMin: 100,
|
||||
})
|
||||
}
|
||||
|
||||
func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
SubtestStress(t, ta, tb, maddr, peerA, Options{
|
||||
connNum: 1,
|
||||
streamNum: 100,
|
||||
msgNum: 100,
|
||||
msgMax: 10000,
|
||||
msgMin: 1000,
|
||||
})
|
||||
}
|
@ -1,287 +0,0 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/mux"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/transport"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
var testData = []byte("this is some test data")
|
||||
|
||||
type streamAndConn struct {
|
||||
stream mux.MuxedStream
|
||||
conn transport.UpgradedConn
|
||||
}
|
||||
|
||||
func SubtestProtocols(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
rawIPAddr, _ := ma.NewMultiaddr("/ip4/1.2.3.4")
|
||||
if ta.CanDial(rawIPAddr) || tb.CanDial(rawIPAddr) {
|
||||
t.Error("nothing should be able to dial raw IP")
|
||||
}
|
||||
|
||||
tprotos := make(map[int]bool)
|
||||
for _, p := range ta.Protocols() {
|
||||
tprotos[p] = true
|
||||
}
|
||||
|
||||
if !ta.Proxy() {
|
||||
protos := maddr.Protocols()
|
||||
proto := protos[len(protos)-1]
|
||||
if !tprotos[proto.Code] {
|
||||
t.Errorf("transport should have reported that it supports protocol '%s' (%d)", proto.Name, proto.Code)
|
||||
}
|
||||
} else {
|
||||
found := false
|
||||
for _, proto := range maddr.Protocols() {
|
||||
if tprotos[proto.Code] {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("didn't find any matching proxy protocols in maddr: %s", maddr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestBasic(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
list, err := ta.Listen(maddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer list.Close()
|
||||
|
||||
var (
|
||||
connA, connB transport.UpgradedConn
|
||||
done = make(chan struct{})
|
||||
)
|
||||
defer func() {
|
||||
<-done
|
||||
if connA != nil {
|
||||
connA.Close()
|
||||
}
|
||||
if connB != nil {
|
||||
connB.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
var err error
|
||||
connB, err = list.Accept()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
s, err := connB.AcceptStream()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(s)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if !bytes.Equal(testData, buf) {
|
||||
t.Errorf("expected %s, got %s", testData, buf)
|
||||
}
|
||||
|
||||
n, err := s.Write(testData)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if n != len(testData) {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
err = s.Close()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
if !tb.CanDial(list.Multiaddr()) {
|
||||
t.Error("CanDial should have returned true")
|
||||
}
|
||||
|
||||
connA, err = tb.Dial(ctx, list.Multiaddr(), peerA)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, err := connA.OpenStream()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
n, err := s.Write(testData)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
if n != len(testData) {
|
||||
t.Fatalf("failed to write enough data (a->b)")
|
||||
return
|
||||
}
|
||||
err = s.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(s)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
if !bytes.Equal(testData, buf) {
|
||||
t.Errorf("expected %s, got %s", testData, buf)
|
||||
}
|
||||
}
|
||||
|
||||
func SubtestPingPong(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
streams := 100
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
list, err := ta.Listen(maddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer list.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c, err := list.Accept()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
var sWg sync.WaitGroup
|
||||
for i := 0; i < streams; i++ {
|
||||
s, err := c.AcceptStream()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
sWg.Add(1)
|
||||
go func() {
|
||||
defer sWg.Done()
|
||||
|
||||
data, err := ioutil.ReadAll(s)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if !bytes.HasPrefix(data, testData) {
|
||||
t.Errorf("expected %q to have prefix %q", string(data), string(testData))
|
||||
}
|
||||
|
||||
n, err := s.Write(data)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if n != len(data) {
|
||||
s.Reset()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
s.Close()
|
||||
}()
|
||||
}
|
||||
sWg.Wait()
|
||||
}()
|
||||
|
||||
if !tb.CanDial(list.Multiaddr()) {
|
||||
t.Error("CanDial should have returned true")
|
||||
}
|
||||
|
||||
c, err := tb.Dial(ctx, list.Multiaddr(), peerA)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
for i := 0; i < streams; i++ {
|
||||
s, err := c.OpenStream()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
data := []byte(fmt.Sprintf("%s - %d", testData, i))
|
||||
n, err := s.Write(data)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if n != len(data) {
|
||||
s.Reset()
|
||||
t.Error("failed to write enough data (a->b)")
|
||||
return
|
||||
}
|
||||
s.Close()
|
||||
|
||||
ret, err := ioutil.ReadAll(s)
|
||||
if err != nil {
|
||||
s.Reset()
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if !bytes.Equal(data, ret) {
|
||||
t.Errorf("expected %q, got %q", string(data), string(ret))
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func SubtestCancel(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
|
||||
list, err := ta.Listen(maddr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer list.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
c, err := tb.Dial(ctx, list.Multiaddr(), peerA)
|
||||
if err == nil {
|
||||
c.Close()
|
||||
t.Fatal("dial should have failed")
|
||||
}
|
||||
}
|
@ -1,45 +0,0 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/transport"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
var Subtests = []func(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID){
|
||||
SubtestProtocols,
|
||||
SubtestBasic,
|
||||
SubtestCancel,
|
||||
SubtestPingPong,
|
||||
|
||||
// Stolen from the stream muxer test suite.
|
||||
SubtestStress1Conn1Stream1Msg,
|
||||
SubtestStress1Conn1Stream100Msg,
|
||||
SubtestStress1Conn100Stream100Msg,
|
||||
SubtestStress50Conn10Stream50Msg,
|
||||
SubtestStress1Conn1000Stream10Msg,
|
||||
SubtestStress1Conn100Stream100Msg10MB,
|
||||
SubtestStreamOpenStress,
|
||||
SubtestStreamReset,
|
||||
}
|
||||
|
||||
func getFunctionName(i interface{}) string {
|
||||
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
|
||||
}
|
||||
|
||||
func SubtestTransport(t *testing.T, ta, tb transport.Transport, addr string, peerA peer.ID) {
|
||||
maddr, err := ma.NewMultiaddr(addr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, f := range Subtests {
|
||||
t.Run(getFunctionName(f), func(t *testing.T) {
|
||||
f(t, ta, tb, maddr, peerA)
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user