package rcmgr import ( "testing" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" ) func TestResourceManager(t *testing.T) { peerA := peer.ID("A") peerB := peer.ID("B") protoA := protocol.ID("/A") protoB := protocol.ID("/B") svcA := "A.svc" svcB := "B.svc" nmgr, err := NewResourceManager( &BasicLimiter{ SystemLimits: &StaticLimit{ Memory: 16384, BaseLimit: BaseLimit{ StreamsInbound: 3, StreamsOutbound: 3, Streams: 6, ConnsInbound: 3, ConnsOutbound: 3, Conns: 6, FD: 2, }, }, TransientLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, ConnsInbound: 1, ConnsOutbound: 1, Conns: 2, FD: 1, }, }, DefaultServiceLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, ConnsInbound: 1, ConnsOutbound: 1, Conns: 2, FD: 1, }, }, DefaultServicePeerLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ StreamsInbound: 5, StreamsOutbound: 5, Streams: 10, }, }, ServiceLimits: map[string]Limit{ svcA: &StaticLimit{ Memory: 8192, BaseLimit: BaseLimit{ StreamsInbound: 2, StreamsOutbound: 2, Streams: 4, ConnsInbound: 2, ConnsOutbound: 2, Conns: 4, FD: 1, }, }, svcB: &StaticLimit{ Memory: 8192, BaseLimit: BaseLimit{ StreamsInbound: 2, StreamsOutbound: 2, Streams: 4, ConnsInbound: 2, ConnsOutbound: 2, Conns: 4, FD: 1, }, }, }, ServicePeerLimits: map[string]Limit{ svcB: &StaticLimit{ Memory: 8192, BaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, }, }, }, DefaultProtocolLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, }, }, ProtocolLimits: map[protocol.ID]Limit{ protoA: &StaticLimit{ Memory: 8192, BaseLimit: BaseLimit{ StreamsInbound: 2, StreamsOutbound: 2, Streams: 2, }, }, }, ProtocolPeerLimits: map[protocol.ID]Limit{ protoB: &StaticLimit{ Memory: 8192, BaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, }, }, }, DefaultPeerLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, ConnsInbound: 1, ConnsOutbound: 1, Conns: 2, FD: 1, }, }, DefaultProtocolPeerLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ StreamsInbound: 5, StreamsOutbound: 5, Streams: 10, }, }, PeerLimits: map[peer.ID]Limit{ peerA: &StaticLimit{ Memory: 8192, BaseLimit: BaseLimit{ StreamsInbound: 2, StreamsOutbound: 2, Streams: 4, ConnsInbound: 2, ConnsOutbound: 2, Conns: 4, FD: 1, }, }, }, ConnLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ ConnsInbound: 1, ConnsOutbound: 1, Conns: 1, FD: 1, }, }, StreamLimits: &StaticLimit{ Memory: 4096, BaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 1, }, }, }) if err != nil { t.Fatal(err) } mgr := nmgr.(*resourceManager) defer mgr.Close() checkRefCnt := func(s *resourceScope, count int) { t.Helper() if refCnt := s.refCnt; refCnt != count { t.Fatalf("expected refCnt of %d, got %d", count, refCnt) } } checkSystem := func(check func(s *resourceScope)) { if err := mgr.ViewSystem(func(s network.ResourceScope) error { check(s.(*systemScope).resourceScope) return nil }); err != nil { t.Fatal(err) } } checkTransient := func(check func(s *resourceScope)) { if err := mgr.ViewTransient(func(s network.ResourceScope) error { check(s.(*transientScope).resourceScope) return nil }); err != nil { t.Fatal(err) } } checkService := func(svc string, check func(s *resourceScope)) { if err := mgr.ViewService(svc, func(s network.ServiceScope) error { check(s.(*serviceScope).resourceScope) return nil }); err != nil { t.Fatal(err) } } checkProtocol := func(p protocol.ID, check func(s *resourceScope)) { if err := mgr.ViewProtocol(p, func(s network.ProtocolScope) error { check(s.(*protocolScope).resourceScope) return nil }); err != nil { t.Fatal(err) } } checkPeer := func(p peer.ID, check func(s *resourceScope)) { if err := mgr.ViewPeer(p, func(s network.PeerScope) error { check(s.(*peerScope).resourceScope) return nil }); err != nil { t.Fatal(err) } } checkSystem(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // open an inbound connection, using an fd conn, err := mgr.OpenConnection(network.DirInbound, true) if err != nil { t.Fatal(err) } checkSystem(func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) // the connection is transient, we shouldn't be able to open a second one if _, err := mgr.OpenConnection(network.DirInbound, true); err == nil { t.Fatal("expected OpenConnection to fail") } if _, err := mgr.OpenConnection(network.DirInbound, false); err == nil { t.Fatal("expected OpenConnection to fail") } // close it to check resources are reclaimed conn.Done() checkSystem(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // open another inbound connection, using an fd conn1, err := mgr.OpenConnection(network.DirInbound, true) if err != nil { t.Fatal(err) } checkSystem(func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) // check nility of current peer scope if conn1.PeerScope() != nil { t.Fatal("peer scope should be nil") } // attach to a peer if err := conn1.SetPeer(peerA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 4) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // we should be able to open a second transient connection now conn2, err := mgr.OpenConnection(network.DirInbound, true) if err != nil { t.Fatal(err) } checkSystem(func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 2}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) // but we shouldn't be able to attach it to the same peer due to the fd limit if err := conn2.SetPeer(peerA); err == nil { t.Fatal("expected SetPeer to fail") } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 2}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1}) }) // close it and reopen without using an FD -- we should be able to attach now conn2.Done() conn2, err = mgr.OpenConnection(network.DirInbound, false) if err != nil { t.Fatal(err) } checkSystem(func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 0}) }) if err := conn2.SetPeer(peerA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // open a stream stream, err := mgr.OpenStream(peerA, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 4) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 6) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) // the stream is transient we shouldn't be able to open a second one if _, err := mgr.OpenStream(peerA, network.DirInbound); err == nil { t.Fatal("expected OpenStream to fail") } // close the stream to check resource reclamation stream.Done() checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // open another stream, but this time attach it to a protocol stream1, err := mgr.OpenStream(peerA, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 4) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 6) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) // check nility of protocol scope if stream1.ProtocolScope() != nil { t.Fatal("protocol scope should be nil") } if err := stream1.SetProtocol(protoA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 4) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 7) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // and now we should be able to open another stream and attach it to the protocol stream2, err := mgr.OpenStream(peerA, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 8) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) if err := stream2.SetProtocol(protoA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 8) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // open a 3rd stream, and try to attach it to the same protocol stream3, err := mgr.OpenStream(peerB, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkPeer(peerB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 10) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) if err := stream3.SetProtocol(protoA); err == nil { t.Fatal("expected SetProtocol to fail") } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkPeer(peerB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 10) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) // but we should be able to set to another protocol if err := stream3.SetProtocol(protoB); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkPeer(peerB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkProtocol(protoB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 11) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // check nility of current service scope if stream1.ServiceScope() != nil { t.Fatal("service scope should be nil") } // we should be able to attach stream1 and stream2 to svcA, but stream3 should fail due to limit if err := stream1.SetService(svcA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkPeer(peerB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkService(svcA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkProtocol(protoB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 12) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) if err := stream2.SetService(svcA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkPeer(peerB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkService(svcA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkProtocol(protoB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 12) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) if err := stream3.SetService(svcA); err == nil { t.Fatal("expected SetService to fail") } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1}) }) checkPeer(peerB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkService(svcA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkProtocol(protoB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 12) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // and now let's reclaim our resources to make sure we can gc unused peer and proto scopes // but first check internal refs mgr.mx.Lock() _, okProtoA := mgr.proto[protoA] _, okProtoB := mgr.proto[protoB] _, okPeerA := mgr.peer[peerA] _, okPeerB := mgr.peer[peerB] mgr.mx.Unlock() if !okProtoA { t.Fatal("protocol scope is not stored") } if !okProtoB { t.Fatal("protocol scope is not stored") } if !okPeerA { t.Fatal("peer scope is not stored") } if !okPeerB { t.Fatal("peer scope is not stored") } // ok, reclaim stream1.Done() stream2.Done() stream3.Done() conn1.Done() conn2.Done() // check everything released checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) checkPeer(peerB, func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) checkService(svcA, func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) checkProtocol(protoB, func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 7) checkResources(t, &s.rc, network.ScopeStat{}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) mgr.gc() checkSystem(func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) mgr.mx.Lock() lenProto := len(mgr.proto) lenPeer := len(mgr.peer) mgr.mx.Unlock() if lenProto != 0 { t.Fatal("protocols were not gc'ed") } if lenPeer != 0 { t.Fatal("perrs were not gc'ed") } // check that per protocol peer scopes work as intended stream1, err = mgr.OpenStream(peerA, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 5) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) if err := stream1.SetProtocol(protoB); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkProtocol(protoB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 6) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) stream2, err = mgr.OpenStream(peerA, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 7) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) if err := stream2.SetProtocol(protoB); err == nil { t.Fatal("expected SetProtocol to fail") } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 7) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkProtocol(protoB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) stream1.Done() stream2.Done() // check that per service peer scopes work as intended stream1, err = mgr.OpenStream(peerA, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 6) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) if err := stream1.SetProtocol(protoA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 7) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) stream2, err = mgr.OpenStream(peerA, network.DirInbound) if err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 8) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) if err := stream2.SetProtocol(protoA); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 8) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) if err := stream1.SetService(svcB); err != nil { t.Fatal(err) } checkPeer(peerA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkService(svcB, func(s *resourceScope) { checkRefCnt(s, 2) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) }) checkProtocol(protoA, func(s *resourceScope) { checkRefCnt(s, 3) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkSystem(func(s *resourceScope) { checkRefCnt(s, 9) checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) // now we should fail to set the service for stream2 to svcB because of the service peer limit if err := stream2.SetService(svcB); err == nil { t.Fatal("expected SetService to fail") } // now release resources and check interior gc of per service peer scopes stream1.Done() stream2.Done() mgr.gc() checkSystem(func(s *resourceScope) { checkRefCnt(s, 4) checkResources(t, &s.rc, network.ScopeStat{}) }) checkTransient(func(s *resourceScope) { checkRefCnt(s, 1) checkResources(t, &s.rc, network.ScopeStat{}) }) mgr.mx.Lock() lenProto = len(mgr.proto) lenPeer = len(mgr.peer) mgr.mx.Unlock() svc := mgr.svc[svcB] svc.Lock() lenSvcPeer := len(svc.peers) svc.Unlock() if lenProto != 0 { t.Fatal("protocols were not gc'ed") } if lenPeer != 0 { t.Fatal("peers were not gc'ed") } if lenSvcPeer != 0 { t.Fatal("service peers were not gc'ed") } }