add test for per service peer limits

This commit is contained in:
vyzo 2022-01-08 22:21:33 +02:00
parent 5487b6bdac
commit 488f08cb1d
3 changed files with 171 additions and 3 deletions

View File

@ -49,7 +49,7 @@ type BasicLimiter struct {
TransientLimits Limit
DefaultServiceLimits Limit
ServiceLimits map[string]Limit
SercicePeerLimits map[string]Limit
ServicePeerLimits map[string]Limit
DefaultProtocolLimits Limit
ProtocolLimits map[protocol.ID]Limit
DefaultPeerLimits Limit
@ -106,7 +106,7 @@ func (l *BasicLimiter) GetServiceLimits(svc string) Limit {
}
func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit {
return l.SercicePeerLimits[svc]
return l.ServicePeerLimits[svc]
}
func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit {

View File

@ -261,7 +261,11 @@ func (r *resourceManager) gc() {
for _, s := range r.svc {
s.Lock()
for _, p := range deadPeers {
delete(s.peers, p)
ps, ok := s.peers[p]
if ok {
ps.Done()
delete(s.peers, p)
}
}
s.Unlock()
}

View File

@ -14,6 +14,7 @@ func TestResourceManager(t *testing.T) {
protoA := protocol.ID("/A")
protoB := protocol.ID("/B")
svcA := "A.svc"
svcB := "B.svc"
mgr := NewResourceManager(
&BasicLimiter{
SystemLimits: &StaticLimit{
@ -57,6 +58,25 @@ func TestResourceManager(t *testing.T) {
FD: 1,
},
},
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
FD: 1,
},
},
},
ServicePeerLimits: map[string]Limit{
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
},
},
},
DefaultProtocolLimits: &StaticLimit{
Memory: 4096,
@ -686,4 +706,148 @@ func TestResourceManager(t *testing.T) {
t.Fatal("perrs were not gc'ed")
}
// check that per service peer scopes work as intended
stream1, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream1.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
stream2, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream2.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
if err := stream1.SetService(svcB); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkService(svcB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// now we should fail to set the service for stream2 to svcB because of the service peer limit
if err := stream2.SetService(svcB); err == nil {
t.Fatal("expected SetService to fail")
}
// now release resources and check interior gc of per service peer scopes
stream1.Done()
stream2.Done()
mgr.gc()
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
mgr.mx.Lock()
lenProto = len(mgr.proto)
lenPeer = len(mgr.peer)
mgr.mx.Unlock()
svc := mgr.svc[svcB]
svc.Lock()
lenSvcPeer := len(svc.peers)
svc.Unlock()
if lenProto != 0 {
t.Fatal("protocols were not gc'ed")
}
if lenPeer != 0 {
t.Fatal("peers were not gc'ed")
}
if lenSvcPeer != 0 {
t.Fatal("service peers were not gc'ed")
}
}