diff --git a/limit.go b/limit.go index 3d805de..f7335c3 100644 --- a/limit.go +++ b/limit.go @@ -49,7 +49,7 @@ type BasicLimiter struct { TransientLimits Limit DefaultServiceLimits Limit ServiceLimits map[string]Limit - SercicePeerLimits map[string]Limit + ServicePeerLimits map[string]Limit DefaultProtocolLimits Limit ProtocolLimits map[protocol.ID]Limit DefaultPeerLimits Limit @@ -106,7 +106,7 @@ func (l *BasicLimiter) GetServiceLimits(svc string) Limit { } func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit { - return l.SercicePeerLimits[svc] + return l.ServicePeerLimits[svc] } func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit { diff --git a/rcmgr.go b/rcmgr.go index f155438..f1da328 100644 --- a/rcmgr.go +++ b/rcmgr.go @@ -261,7 +261,11 @@ func (r *resourceManager) gc() { for _, s := range r.svc { s.Lock() for _, p := range deadPeers { - delete(s.peers, p) + ps, ok := s.peers[p] + if ok { + ps.Done() + delete(s.peers, p) + } } s.Unlock() } diff --git a/rcmgr_test.go b/rcmgr_test.go index ccb2098..7962e21 100644 --- a/rcmgr_test.go +++ b/rcmgr_test.go @@ -14,6 +14,7 @@ func TestResourceManager(t *testing.T) { protoA := protocol.ID("/A") protoB := protocol.ID("/B") svcA := "A.svc" + svcB := "B.svc" mgr := NewResourceManager( &BasicLimiter{ SystemLimits: &StaticLimit{ @@ -57,6 +58,25 @@ func TestResourceManager(t *testing.T) { FD: 1, }, }, + svcB: &StaticLimit{ + Memory: 8192, + BaseLimit: BaseLimit{ + StreamsInbound: 2, + StreamsOutbound: 2, + ConnsInbound: 2, + ConnsOutbound: 2, + FD: 1, + }, + }, + }, + ServicePeerLimits: map[string]Limit{ + svcB: &StaticLimit{ + Memory: 8192, + BaseLimit: BaseLimit{ + StreamsInbound: 1, + StreamsOutbound: 1, + }, + }, }, DefaultProtocolLimits: &StaticLimit{ Memory: 4096, @@ -686,4 +706,148 @@ func TestResourceManager(t *testing.T) { t.Fatal("perrs were not gc'ed") } + // check that per service peer scopes work as intended + stream1, err = mgr.OpenStream(peerA, network.DirInbound) + if err != nil { + t.Fatal(err) + } + + checkPeer(peerA, func(s *resourceScope) { + checkRefCnt(s, 2) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + checkSystem(func(s *resourceScope) { + checkRefCnt(s, 5) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + checkTransient(func(s *resourceScope) { + checkRefCnt(s, 2) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + + if err := stream1.SetProtocol(protoA); err != nil { + t.Fatal(err) + } + + checkPeer(peerA, func(s *resourceScope) { + checkRefCnt(s, 2) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + checkProtocol(protoA, func(s *resourceScope) { + checkRefCnt(s, 2) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + checkSystem(func(s *resourceScope) { + checkRefCnt(s, 6) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + checkTransient(func(s *resourceScope) { + checkRefCnt(s, 1) + checkResources(t, &s.rc, network.ScopeStat{}) + }) + + stream2, err = mgr.OpenStream(peerA, network.DirInbound) + if err != nil { + t.Fatal(err) + } + + checkPeer(peerA, func(s *resourceScope) { + checkRefCnt(s, 3) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) + }) + checkSystem(func(s *resourceScope) { + checkRefCnt(s, 7) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) + }) + checkTransient(func(s *resourceScope) { + checkRefCnt(s, 2) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + + if err := stream2.SetProtocol(protoA); err != nil { + t.Fatal(err) + } + + checkPeer(peerA, func(s *resourceScope) { + checkRefCnt(s, 3) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) + }) + checkProtocol(protoA, func(s *resourceScope) { + checkRefCnt(s, 3) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) + }) + checkSystem(func(s *resourceScope) { + checkRefCnt(s, 7) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) + }) + checkTransient(func(s *resourceScope) { + checkRefCnt(s, 1) + checkResources(t, &s.rc, network.ScopeStat{}) + }) + + if err := stream1.SetService(svcB); err != nil { + t.Fatal(err) + } + + checkPeer(peerA, func(s *resourceScope) { + checkRefCnt(s, 3) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) + }) + checkService(svcB, func(s *resourceScope) { + checkRefCnt(s, 2) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + checkProtocol(protoA, func(s *resourceScope) { + checkRefCnt(s, 2) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1}) + }) + checkSystem(func(s *resourceScope) { + checkRefCnt(s, 8) + checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2}) + }) + checkTransient(func(s *resourceScope) { + checkRefCnt(s, 1) + checkResources(t, &s.rc, network.ScopeStat{}) + }) + + // now we should fail to set the service for stream2 to svcB because of the service peer limit + if err := stream2.SetService(svcB); err == nil { + t.Fatal("expected SetService to fail") + } + + // now release resources and check interior gc of per service peer scopes + stream1.Done() + stream2.Done() + + mgr.gc() + + checkSystem(func(s *resourceScope) { + checkRefCnt(s, 4) + checkResources(t, &s.rc, network.ScopeStat{}) + }) + checkTransient(func(s *resourceScope) { + checkRefCnt(s, 1) + checkResources(t, &s.rc, network.ScopeStat{}) + }) + + mgr.mx.Lock() + lenProto = len(mgr.proto) + lenPeer = len(mgr.peer) + mgr.mx.Unlock() + + svc := mgr.svc[svcB] + svc.Lock() + lenSvcPeer := len(svc.peers) + svc.Unlock() + + if lenProto != 0 { + t.Fatal("protocols were not gc'ed") + } + if lenPeer != 0 { + t.Fatal("peers were not gc'ed") + } + if lenSvcPeer != 0 { + t.Fatal("service peers were not gc'ed") + } + }