go-libp2p-resource-manager/rcmgr_test.go
2022-02-03 08:51:08 +02:00

1001 lines
27 KiB
Go

package rcmgr
import (
"testing"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
func TestResourceManager(t *testing.T) {
peerA := peer.ID("A")
peerB := peer.ID("B")
protoA := protocol.ID("/A")
protoB := protocol.ID("/B")
svcA := "A.svc"
svcB := "B.svc"
nmgr, err := NewResourceManager(
&BasicLimiter{
SystemLimits: &StaticLimit{
Memory: 16384,
BaseLimit: BaseLimit{
StreamsInbound: 3,
StreamsOutbound: 3,
Streams: 6,
ConnsInbound: 3,
ConnsOutbound: 3,
Conns: 6,
FD: 2,
},
},
TransientLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
},
DefaultServiceLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
},
DefaultServicePeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
},
ServiceLimits: map[string]Limit{
svcA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
},
ServicePeerLimits: map[string]Limit{
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
},
DefaultProtocolLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
ProtocolLimits: map[protocol.ID]Limit{
protoA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
},
},
},
ProtocolPeerLimits: map[protocol.ID]Limit{
protoB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
},
DefaultPeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
},
DefaultProtocolPeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
},
PeerLimits: map[peer.ID]Limit{
peerA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
},
ConnLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
},
StreamLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
},
},
})
if err != nil {
t.Fatal(err)
}
mgr := nmgr.(*resourceManager)
defer mgr.Close()
checkRefCnt := func(s *resourceScope, count int) {
t.Helper()
if refCnt := s.refCnt; refCnt != count {
t.Fatalf("expected refCnt of %d, got %d", count, refCnt)
}
}
checkSystem := func(check func(s *resourceScope)) {
if err := mgr.ViewSystem(func(s network.ResourceScope) error {
check(s.(*systemScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkTransient := func(check func(s *resourceScope)) {
if err := mgr.ViewTransient(func(s network.ResourceScope) error {
check(s.(*transientScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkService := func(svc string, check func(s *resourceScope)) {
if err := mgr.ViewService(svc, func(s network.ServiceScope) error {
check(s.(*serviceScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkProtocol := func(p protocol.ID, check func(s *resourceScope)) {
if err := mgr.ViewProtocol(p, func(s network.ProtocolScope) error {
check(s.(*protocolScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkPeer := func(p peer.ID, check func(s *resourceScope)) {
if err := mgr.ViewPeer(p, func(s network.PeerScope) error {
check(s.(*peerScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open an inbound connection, using an fd
conn, err := mgr.OpenConnection(network.DirInbound, true)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// the connection is transient, we shouldn't be able to open a second one
if _, err := mgr.OpenConnection(network.DirInbound, true); err == nil {
t.Fatal("expected OpenConnection to fail")
}
if _, err := mgr.OpenConnection(network.DirInbound, false); err == nil {
t.Fatal("expected OpenConnection to fail")
}
// close it to check resources are reclaimed
conn.Done()
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open another inbound connection, using an fd
conn1, err := mgr.OpenConnection(network.DirInbound, true)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// check nility of current peer scope
if conn1.PeerScope() != nil {
t.Fatal("peer scope should be nil")
}
// attach to a peer
if err := conn1.SetPeer(peerA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// we should be able to open a second transient connection now
conn2, err := mgr.OpenConnection(network.DirInbound, true)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// but we shouldn't be able to attach it to the same peer due to the fd limit
if err := conn2.SetPeer(peerA); err == nil {
t.Fatal("expected SetPeer to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// close it and reopen without using an FD -- we should be able to attach now
conn2.Done()
conn2, err = mgr.OpenConnection(network.DirInbound, false)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 0})
})
if err := conn2.SetPeer(peerA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open a stream
stream, err := mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
// the stream is transient we shouldn't be able to open a second one
if _, err := mgr.OpenStream(peerA, network.DirInbound); err == nil {
t.Fatal("expected OpenStream to fail")
}
// close the stream to check resource reclamation
stream.Done()
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open another stream, but this time attach it to a protocol
stream1, err := mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
// check nility of protocol scope
if stream1.ProtocolScope() != nil {
t.Fatal("protocol scope should be nil")
}
if err := stream1.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// and now we should be able to open another stream and attach it to the protocol
stream2, err := mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream2.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open a 3rd stream, and try to attach it to the same protocol
stream3, err := mgr.OpenStream(peerB, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 10)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream3.SetProtocol(protoA); err == nil {
t.Fatal("expected SetProtocol to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 10)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
// but we should be able to set to another protocol
if err := stream3.SetProtocol(protoB); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 11)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// check nility of current service scope
if stream1.ServiceScope() != nil {
t.Fatal("service scope should be nil")
}
// we should be able to attach stream1 and stream2 to svcA, but stream3 should fail due to limit
if err := stream1.SetService(svcA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 12)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
if err := stream2.SetService(svcA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 12)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
if err := stream3.SetService(svcA); err == nil {
t.Fatal("expected SetService to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 12)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// and now let's reclaim our resources to make sure we can gc unused peer and proto scopes
// but first check internal refs
mgr.mx.Lock()
_, okProtoA := mgr.proto[protoA]
_, okProtoB := mgr.proto[protoB]
_, okPeerA := mgr.peer[peerA]
_, okPeerB := mgr.peer[peerB]
mgr.mx.Unlock()
if !okProtoA {
t.Fatal("protocol scope is not stored")
}
if !okProtoB {
t.Fatal("protocol scope is not stored")
}
if !okPeerA {
t.Fatal("peer scope is not stored")
}
if !okPeerB {
t.Fatal("peer scope is not stored")
}
// ok, reclaim
stream1.Done()
stream2.Done()
stream3.Done()
conn1.Done()
conn2.Done()
// check everything released
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
mgr.gc()
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
mgr.mx.Lock()
lenProto := len(mgr.proto)
lenPeer := len(mgr.peer)
mgr.mx.Unlock()
if lenProto != 0 {
t.Fatal("protocols were not gc'ed")
}
if lenPeer != 0 {
t.Fatal("perrs were not gc'ed")
}
// check that per protocol peer scopes work as intended
stream1, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream1.SetProtocol(protoB); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
stream2, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream2.SetProtocol(protoB); err == nil {
t.Fatal("expected SetProtocol to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
stream1.Done()
stream2.Done()
// check that per service peer scopes work as intended
stream1, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream1.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
stream2, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream2.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
if err := stream1.SetService(svcB); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkService(svcB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 9)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// now we should fail to set the service for stream2 to svcB because of the service peer limit
if err := stream2.SetService(svcB); err == nil {
t.Fatal("expected SetService to fail")
}
// now release resources and check interior gc of per service peer scopes
stream1.Done()
stream2.Done()
mgr.gc()
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
mgr.mx.Lock()
lenProto = len(mgr.proto)
lenPeer = len(mgr.peer)
mgr.mx.Unlock()
svc := mgr.svc[svcB]
svc.Lock()
lenSvcPeer := len(svc.peers)
svc.Unlock()
if lenProto != 0 {
t.Fatal("protocols were not gc'ed")
}
if lenPeer != 0 {
t.Fatal("peers were not gc'ed")
}
if lenSvcPeer != 0 {
t.Fatal("service peers were not gc'ed")
}
}