Merge pull request #1 from libp2p/implementation

Network Resource Manager Implementation
This commit is contained in:
vyzo 2022-01-17 12:51:26 +02:00 committed by GitHub
commit 617d17d0f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 5742 additions and 2 deletions

356
README.md
View File

@ -1,2 +1,354 @@
# go-libp2p-rcmgr
An implmentation of the libp2p network resource manager interface
# The libp2p Network Resource Manager
This package contains the canonical implementation of the libp2p
Network Resource Manager interface.
The implementation is based on the concept of Resource Management
Scopes, whereby resource usage is constrained by a DAG of scopes,
accounting for multiple levels of resource constraints.
## Design Considerations
- The Resource Manager must account for basic resource usage at all
levels of the stack, from the internals to application components
that use the network facilities of libp2p.
- Basic resources include memory, streams, connections, and file
descriptors. These account for both space and time used by
the stack, as each resource has a direct effect on the system
availability and performance.
- The design must support seamless integration for user applications,
which should reap the benefits of resource management without any
changes. That is, existing applications should be oblivious of the
resource manager and transparently obtain limits which protect it
from resource exhaustion and OOM conditions.
- At the same time, the design must support opt-in resource usage
accounting for applications who want to explicitly utilize the
facilities of the system to inform about and constrain their own
resource usage.
- The design must allow the user to set its own limits, which can be
static (fixed) or dynamic.
## Basic Resources
### Memory
Perhaps the most fundamental resource is memory, and in particular
buffers used for network operations. The system must provide an
interface for components to reserve memory that accounts for buffers
(and possibly other live objects), which is scoped within the component.
Before a new buffer is allocated, the component should try a memory
reservation, which can fail if the resource limit is exceeded. It is
then up to the component to react to the error condition, depending on
the situation. For example, a muxer failing to grow a buffer in
response to a window change should simply retain the old buffer and
operate at perhaps degraded performance.
### File Descriptors
File descriptors are an important resource that uses memory (and
computational time) at the system level. They are also a scarce
resource, as typically (unless the user explicitly intervenes) they
are constrained by the system. Exhaustion of file descriptors may
render the application incapable of operating (e.g. because it is
unable to open a file), most importantly for libp2p because most
operating systems represent sockets as file descriptors.
### Connections
Connections are a higher level concept endemic to libp2p; in order to
communicate with another peer, a connection must first be
established. Connections are an important resource in libp2p, as they
consume memory, goroutines, and possibly file descriptors.
We distinguish between inbound and outbound connections, as the former
are initiated by remote peers and consume resources in response to
network events and thus need to be tightly controlled in order to
protect the application from overload or attack. Outbound
connections are typically initiated by the application's volition and
don't need to be controlled as tightly. However, outbound connections
still consume resources and may be initiated in response to network
events because of (potentially faulty) application logic, so they
still need to be constrained.
### Streams
Streams are the fundamental object of interaction in libp2p; all
protocol interactions happen through a stream that goes over some
connection. Streams are a fundamental resource in libp2p, as they
consume memory and goroutines at all levels of the stack.
Streams always belong to a peer, specify a protocol and they may
belong to some service in the system. Hence, this suggests that apart
from global limits, we can constrain stream usage at finer
granularity, at the protocol and service level.
Once again, we disinguish between inbound and outbound streams.
Inbound streams are initiated by remote peers and consume resources in
response to network events; controlling inbound stream usage is again
paramount for protecting the system from overload or attack.
Outbound streams are normally initiated by the application or some
service in the system in order to effect some protocol
interaction. However, they can also be initiated in response to
network events because of application or service logic, so we still
need to constrain them.
## Resource Scopes
The Resource Manager is based on the concept of resource
scopes. Resource Scopes account for resource usage that is temporally
delimited for the span of the scope. Resource Scopes conceptually
form a DAG, providing us with a mechanism to enforce multiresolution
resource accounting. Downstream resource usage is aggregated at scopes
higher up the graph.
The following diagram depicts the canonical scope graph:
```
System
+------------> Transient.............+................+
| . .
+------------> Service------------- . ----------+ .
| . | .
+-------------> Protocol----------- . ----------+ .
| . | .
+-------------->* Peer \/ | .
+------------> Connection | .
| \/ \/
+---------------------------> Stream
```
### The System Scope
The system scope is the top level scope that accounts for global
resource usage at all levels of the system. This scope nests and
constrains all other scopes and institutes global hard limits.
### The Transient Scope
The transient scope accounts for resources that are in the process of
full establishment. For instance, a new connection prior to the
handshake does not belong to any peer, but it still needs to be
constrained as this opens an avenue for attacks in transient resource
usage. Similarly, a stream that has not negotiated a protocol yet is
constrained by the transient scope.
The transient scope effectively represents a DMZ (DeMilitarized Zone),
where resource usage can be accounted for connections and streams that
are not fully established.
### Service Scopes
The system is typically organized across services, which may be
ambient and provide basic functionality to the system (e.g. identify,
autonat, relay, etc). Alternatively, services may be explicitly
instantiated by the application, and provide core components of its
functionality (e.g. pubsub, the DHT, etc).
Services are logical groupings of streams that implement protocol flow
and may additionally consume resources such as memory. Services
typically have at least one stream handler, so they are subject to
inbound stream creation and resource usage in response to network
events. As such, the system explicitly models them allowing for
isolated resource usage that can be tuned by the user.
### Protocol Scopes
Protocol Scopes account for resources at the protocol level. They are
an intermediate resource scope which can constrain streams which may
not have a service associated or for resource control within a
service. It also provides an opportunity for system operators to
explicitly restrict specific protocols.
For instance, a service that is not aware of the resource manager and
has not been ported to mark its streams, may still gain limits
transparently without any programmer intervention. Furthermore, the
protocol scope can constrain resource usage for services that
implement multiple protocols for the sake of backwards
compatibility. A tighter limit in some older protocol can protect the
application from resource consumption caused by legacy clients or
potential attacks.
For a concrete example, consider pubsub with the gossipsub router: the
service also understands the floodsub protocol for backwards
compatibility and support for unsophisticated clients that are lagging
in the implementation effort. By specifying a lower limit for the
floodsub protocol, we can can constrain the service level for legacy
clients using an inefficient protocol.
### Peer Scopes
The peer scope accounts for resource usage by an individual peer. This
constrains connections and streams and limits the blast radius of
resource consumption by a single remote peer.
This ensures that no single peer can use more resources than allowed
by the peer limits. Every peer has a default limit, but the programmer
may raise (or lower) limits for specific peers.
### Connection Scopes
The connection scope is delimited to the duration of a connection and
constrains resource usage by a single connection. The scope is a leaf
in the DAG, with a span that begins when a connection is established
and ends when the connection is closed. Its resources are aggregated
to the resource usage of a peer.
### Stream Scopes
The stream scope is delimited to the duration of a stream, and
constrains resource usage by a single stream. This scope is also a
leaf in the DAG, with span that begins when a stream is created and
ends when the stream is closed. Its resources are aggregated to the
resource usage of a peer, and constrained by a service and protocol
scope.
### User Transaction Scopes
User transaction scopes can be created as a child of any extant
resource scope, and provide the prgrammer with a delimited scope for
easy resource accounting. Transactions may form a tree that is rooted
to some canonical scope in the scope DAG.
For instance, a programmer may create a transaction scope within a
service that accounts for some control flow delimited resource
usage. Similarly, a programmer may create a transaction scope for some
interaction within a stream, e.g. a Request/Response interaction that
uses a buffer.
## Limits
Each resource scope has an associated limit object, which designates
limits for all basic resources. The limit is checked every time some
resource is reserved and provides the system with an opportunity to
constrain resource usage.
There are separate limits for each class of scope, allowing us for
multiresolution and aggregate resource accounting. As such, we have
limits for the system and transient scopes, default and specific
limits for services, protocols, and peers, and limits for connections
and streams.
## Examples
Here we consider some concrete examples that can ellucidate the abstract
design as described so far.
### Stream Lifetime
Let's consider a stream and the limits that apply to it.
When the stream scope is first opened, it is created by calling
`ResourceManager.OpenStream`.
Initially the stream is constrained by:
- the system scope, where global hard limits apply.
- the transient scope, where unnegotiated streams live.
- the peer scope, where the limits for the peer at the other end of the stream
apply.
Once the protocol has been negotiated, the protocol is set by calling
`StreamManagementScope.SetProtocol`. The constraint from the
transient scope is removed and the stream is now constrained by the
protocol instead.
More specifically, the following constraints apply:
- the system scope, where global hard limits apply.
- the peer scope, where the limits for the peer at the other end of the stream
apply.
- the protocol scope, where the limits of the specific protocol used apply.
The existence of the protocol limit allows us to implicitly constrain
streams for services that have not been ported to the resource manager
yet. Once the programmer attaches a stream to a service by calling
`StreamScope.SetService`, the stream resources are aggregated and constrained
by the service scope in addition to its protocol scope.
More specifically the following constraints apply:
- the system scope, where global hard limits apply.
- the peer scope, where the limits for the peer at the other end of the stream
apply.
- the service scope, where the limits of the specific service owning the stream apply.
- the protcol scope, where the limits of the specific protocol for the stream apply.
The resource transfer that happens in the `SetProtocol` and `SetService`
gives the opportunity to the resource manager to gate the streams. If
the transfer results in exceeding the scope limits, then a error
indicating "resource limit exceeded" is returned. The wrapped error
includes the name of the scope rejecting the resource acquisition to
aid understanding of applicable limits. Note that the (wrapped) error
implements `net.Error` and is marked as temporary, so that the
programmer can handle by backoff retry.
### Default Limits
The provided default static limiters apply the following limits, where
memoryCap is provided by the programmer, either as a fixed number (in
bytes) or a fraction of total system memory:
```
DefaultSystemBaseLimit:
StreamsInbound: 4096,
StreamsOutbound: 16384,
ConnsInbound: 256,
ConnsOutbound: 512,
FD: 512,
DefaultTransientBaseLimit:
StreamsInbound: 128,
StreamsOutbound: 512,
ConnsInbound: 32,
ConnsOutbound: 128,
FD: 128,
DefaultProtocolBaseLimit:
StreamsInbound: 1024,
StreamsOutbound: 4096,
DefaultServiceBaseLimit:
StreamsInbound: 2048,
StreamsOutbound: 8192,
system:
Memory: memoryCap
BaseLimit: DefaultSystemBaseLimit
transient:
Memory: memoryCap / 16
BaseLimit: DefaultTransientBaseLimit
svc =
Memory: memoryCap / 2
BaseLimit: DefaultServiceBaseLimit
proto:
Memory: memoryCap / 4
BaseLimit: DefaultProtocolBaseLimit
peer:
Memory: memoryCap / 16
BaseLimit: DefaultPeerBaseLimit
conn:
Memory: 16 << 20,
stream:
Memory: 16 << 20,
```
We also provide a dynamic limiter which uses the same base limits, but
the memory limit is dynamically computed at each memory reservation check
based on free memory.
## Implementation Notes
- The package only exports a constructor for the resource manager and
basic types for defining limits. Internals are not exposed.
- Internally, there is a resources object that is embedded in every scope and
implements resource accounting.
- There is a single implementation of a generic resource scope, that
provides all necessary interface methods.
- There are concrete types for all canonical scopes, embedding a
pointer to a generic resource scope.
- Peer and Protocol scopes, which may be created in response to
network events, are periodically garbage collected.

137
extapi.go Normal file
View File

@ -0,0 +1,137 @@
package rcmgr
import (
"bytes"
"sort"
"strings"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// ResourceScopeLimiter is a trait interface that allows you to access scope limits.
type ResourceScopeLimiter interface {
Limit() Limit
SetLimit(Limit)
}
var _ ResourceScopeLimiter = (*resourceScope)(nil)
// ResourceManagerStat is a trait that allows you to access resource manager state.
type ResourceManagerState interface {
ListServices() []string
ListProtocols() []protocol.ID
ListPeers() []peer.ID
Stat() ResourceManagerStat
}
type ResourceManagerStat struct {
System network.ScopeStat
Transient network.ScopeStat
Services map[string]network.ScopeStat
Protocols map[protocol.ID]network.ScopeStat
Peers map[peer.ID]network.ScopeStat
}
var _ ResourceManagerState = (*resourceManager)(nil)
func (s *resourceScope) Limit() Limit {
s.Lock()
defer s.Unlock()
return s.rc.limit
}
func (s *resourceScope) SetLimit(limit Limit) {
s.Lock()
defer s.Unlock()
s.rc.limit = limit
}
func (r *resourceManager) ListServices() []string {
r.mx.Lock()
defer r.mx.Unlock()
result := make([]string, 0, len(r.svc))
for svc := range r.svc {
result = append(result, svc)
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(result[i], result[j]) < 0
})
return result
}
func (r *resourceManager) ListProtocols() []protocol.ID {
r.mx.Lock()
defer r.mx.Unlock()
result := make([]protocol.ID, 0, len(r.proto))
for p := range r.proto {
result = append(result, p)
}
sort.Slice(result, func(i, j int) bool {
return strings.Compare(string(result[i]), string(result[j])) < 0
})
return result
}
func (r *resourceManager) ListPeers() []peer.ID {
r.mx.Lock()
defer r.mx.Unlock()
result := make([]peer.ID, 0, len(r.peer))
for p := range r.peer {
result = append(result, p)
}
sort.Slice(result, func(i, j int) bool {
return bytes.Compare([]byte(result[i]), []byte(result[j])) < 0
})
return result
}
func (r *resourceManager) Stat() (result ResourceManagerStat) {
r.mx.Lock()
svcs := make([]*serviceScope, 0, len(r.svc))
for _, svc := range r.svc {
svcs = append(svcs, svc)
}
protos := make([]*protocolScope, 0, len(r.proto))
for _, proto := range r.proto {
protos = append(protos, proto)
}
peers := make([]*peerScope, 0, len(r.peer))
for _, peer := range r.peer {
peers = append(peers, peer)
}
r.mx.Unlock()
// Note: there is no global lock, so the system is updating while we are dumping its state...
// as such stats might not exactly add up to the system level; we take the system stat
// last nonetheless so that this is the most up-to-date snapshot
result.Peers = make(map[peer.ID]network.ScopeStat, len(peers))
for _, peer := range peers {
result.Peers[peer.peer] = peer.Stat()
}
result.Protocols = make(map[protocol.ID]network.ScopeStat, len(protos))
for _, proto := range protos {
result.Protocols[proto.proto] = proto.Stat()
}
result.Services = make(map[string]network.ScopeStat, len(svcs))
for _, svc := range svcs {
result.Services[svc.name] = svc.Stat()
}
result.Transient = r.transient.Stat()
result.System = r.system.Stat()
return result
}

10
go.mod Normal file
View File

@ -0,0 +1,10 @@
module github.com/libp2p/go-libp2p-resource-manager
go 1.16
require (
github.com/ipfs/go-log/v2 v2.5.0
github.com/libp2p/go-libp2p-core v0.14.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/stretchr/testify v1.7.0
)

141
go.sum Normal file
View File

@ -0,0 +1,141 @@
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/btcsuite/btcd v0.20.1-beta h1:Ik4hyJqN8Jfyv3S4AGBOmyouMsYE3EdYODkMbQjwPGw=
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-log/v2 v2.5.0 h1:+MhAooFd9XZNvR0i9FriKW6HB0ql7HNXUuflWtc0dd4=
github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-core v0.14.0 h1:0kYSgiK/D7Eo28GTuRXo5YHsWwAisVpFCqCVPUd/vJs=
github.com/libp2p/go-libp2p-core v0.14.0/go.mod h1:tLasfcVdTXnixsLB0QYaT1syJOhsbrhG7q6pGrHtBg8=
github.com/libp2p/go-msgio v0.0.6/go.mod h1:4ecVB6d9f4BDSL5fqvPiC4A3KivjWn+Venn/1ALLMWA=
github.com/libp2p/go-openssl v0.0.7 h1:eCAzdLejcNVBzP/iZM9vqHnQm+XyCEbSSIheIPRGNsw=
github.com/libp2p/go-openssl v0.0.7/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4=
github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM=
github.com/multiformats/go-multiaddr v0.4.1 h1:Pq37uLx3hsyNlTDir7FZyU8+cFCTqd5y1KiM2IzOutI=
github.com/multiformats/go-multiaddr v0.4.1/go.mod h1:3afI9HfVW8csiF8UZqtpYRiDyew8pRX7qLIGHu9FLuM=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I=
github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY=
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU=
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

187
limit.go Normal file
View File

@ -0,0 +1,187 @@
package rcmgr
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// Limit is an object that specifies basic resource limits.
type Limit interface {
// GetMemoryLimit returns the (current) memory limit.
GetMemoryLimit() int64
// GetStreamLimit returns the stream limit, for inbound or outbound streams.
GetStreamLimit(network.Direction) int
// GetStreamTotalLimit returns the total stream limit
GetStreamTotalLimit() int
// GetConnLimit returns the connection limit, for inbound or outbound connections.
GetConnLimit(network.Direction) int
// GetConnTotalLimit returns the total connection limit
GetConnTotalLimit() int
// GetFDLimit returns the file descriptor limit.
GetFDLimit() int
// WithMemoryLimit creates a copy of this limit object, with memory limit adjusted to
// the specified memFraction of its current value, bounded by minMemory and maxMemory.
WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit
// WithStreamLimit creates a copy of this limit object, with stream limits adjusted
// as specified.
WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit
// WithConnLimit creates a copy of this limit object, with connetion limits adjusted
// as specified.
WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit
// WithFDLimit creates a copy of this limit object, with file descriptor limits adjusted
// as specified
WithFDLimit(numFD int) Limit
}
// Limiter is the interface for providing limits to the resource manager.
type Limiter interface {
GetSystemLimits() Limit
GetTransientLimits() Limit
GetServiceLimits(svc string) Limit
GetServicePeerLimits(svc string) Limit
GetProtocolLimits(proto protocol.ID) Limit
GetProtocolPeerLimits(proto protocol.ID) Limit
GetPeerLimits(p peer.ID) Limit
GetStreamLimits(p peer.ID) Limit
GetConnLimits() Limit
}
// BasicLimiter is a limiter with fixed limits.
type BasicLimiter struct {
SystemLimits Limit
TransientLimits Limit
DefaultServiceLimits Limit
DefaultServicePeerLimits Limit
ServiceLimits map[string]Limit
ServicePeerLimits map[string]Limit
DefaultProtocolLimits Limit
DefaultProtocolPeerLimits Limit
ProtocolLimits map[protocol.ID]Limit
ProtocolPeerLimits map[protocol.ID]Limit
DefaultPeerLimits Limit
PeerLimits map[peer.ID]Limit
ConnLimits Limit
StreamLimits Limit
}
var _ Limiter = (*BasicLimiter)(nil)
// BaseLimit is a mixin type for basic resource limits.
type BaseLimit struct {
Streams int
StreamsInbound int
StreamsOutbound int
Conns int
ConnsInbound int
ConnsOutbound int
FD int
}
// MemoryLimit is a mixin type for memory limits
type MemoryLimit struct {
MemoryFraction float64
MinMemory int64
MaxMemory int64
}
func (l *BaseLimit) GetStreamLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.StreamsInbound
} else {
return l.StreamsOutbound
}
}
func (l *BaseLimit) GetStreamTotalLimit() int {
return l.Streams
}
func (l *BaseLimit) GetConnLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.ConnsInbound
} else {
return l.ConnsOutbound
}
}
func (l *BaseLimit) GetConnTotalLimit() int {
return l.Conns
}
func (l *BaseLimit) GetFDLimit() int {
return l.FD
}
func (l *BasicLimiter) GetSystemLimits() Limit {
return l.SystemLimits
}
func (l *BasicLimiter) GetTransientLimits() Limit {
return l.TransientLimits
}
func (l *BasicLimiter) GetServiceLimits(svc string) Limit {
sl, ok := l.ServiceLimits[svc]
if !ok {
return l.DefaultServiceLimits
}
return sl
}
func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit {
pl, ok := l.ServicePeerLimits[svc]
if !ok {
return l.DefaultServicePeerLimits
}
return pl
}
func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolLimits[proto]
if !ok {
return l.DefaultProtocolLimits
}
return pl
}
func (l *BasicLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolPeerLimits[proto]
if !ok {
return l.DefaultProtocolPeerLimits
}
return pl
}
func (l *BasicLimiter) GetPeerLimits(p peer.ID) Limit {
pl, ok := l.PeerLimits[p]
if !ok {
return l.DefaultPeerLimits
}
return pl
}
func (l *BasicLimiter) GetStreamLimits(p peer.ID) Limit {
return l.StreamLimits
}
func (l *BasicLimiter) GetConnLimits() Limit {
return l.ConnLimits
}
func (l *MemoryLimit) GetMemory(memoryCap int64) int64 {
return memoryLimit(memoryCap, l.MemoryFraction, l.MinMemory, l.MaxMemory)
}
func memoryLimit(memoryCap int64, memFraction float64, minMemory, maxMemory int64) int64 {
memoryCap = int64(float64(memoryCap) * memFraction)
switch {
case memoryCap < minMemory:
return minMemory
case memoryCap > maxMemory:
return maxMemory
default:
return memoryCap
}
}

305
limit_config.go Normal file
View File

@ -0,0 +1,305 @@
package rcmgr
import (
"encoding/json"
"fmt"
"io"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/pbnjay/memory"
)
type limitConfig struct {
// if true, then a dynamic limit is used
Dynamic bool
// either Memory is set for fixed memory limit
Memory int64
// or the following 3 fields for computed memory limits
MinMemory int64
MaxMemory int64
MemoryFraction float64
StreamsInbound int
StreamsOutbound int
Streams int
ConnsInbound int
ConnsOutbound int
Conns int
FD int
}
func (cfg *limitConfig) toLimit(base BaseLimit, mem MemoryLimit) (Limit, error) {
if cfg == nil {
m := mem.GetMemory(int64(memory.TotalMemory()))
return &StaticLimit{
Memory: m,
BaseLimit: base,
}, nil
}
if cfg.Streams > 0 {
base.Streams = cfg.Streams
}
if cfg.StreamsInbound > 0 {
base.StreamsInbound = cfg.StreamsInbound
}
if cfg.StreamsOutbound > 0 {
base.StreamsOutbound = cfg.StreamsOutbound
}
if cfg.Conns > 0 {
base.Conns = cfg.Conns
}
if cfg.ConnsInbound > 0 {
base.ConnsInbound = cfg.ConnsInbound
}
if cfg.ConnsOutbound > 0 {
base.ConnsOutbound = cfg.ConnsOutbound
}
if cfg.FD > 0 {
base.FD = cfg.FD
}
switch {
case cfg.Memory > 0:
return &StaticLimit{
Memory: cfg.Memory,
BaseLimit: base,
}, nil
case cfg.Dynamic:
if cfg.MemoryFraction < 0 {
return nil, fmt.Errorf("negative memory fraction: %f", cfg.MemoryFraction)
}
if cfg.MemoryFraction > 0 {
mem.MemoryFraction = cfg.MemoryFraction
}
if cfg.MinMemory > 0 {
mem.MinMemory = cfg.MinMemory
}
if cfg.MaxMemory > 0 {
mem.MaxMemory = cfg.MaxMemory
}
return &DynamicLimit{
MemoryLimit: mem,
BaseLimit: base,
}, nil
default:
if cfg.MemoryFraction < 0 {
return nil, fmt.Errorf("negative memory fraction: %f", cfg.MemoryFraction)
}
if cfg.MemoryFraction > 0 {
mem.MemoryFraction = cfg.MemoryFraction
}
if cfg.MinMemory > 0 {
mem.MinMemory = cfg.MinMemory
}
if cfg.MaxMemory > 0 {
mem.MaxMemory = cfg.MaxMemory
}
m := mem.GetMemory(int64(memory.TotalMemory()))
return &StaticLimit{
Memory: m,
BaseLimit: base,
}, nil
}
}
func (cfg *limitConfig) toLimitFixed(base BaseLimit, mem int64) (Limit, error) {
if cfg == nil {
return &StaticLimit{
Memory: mem,
BaseLimit: base,
}, nil
}
if cfg.Streams > 0 {
base.Streams = cfg.Streams
}
if cfg.StreamsInbound > 0 {
base.StreamsInbound = cfg.StreamsInbound
}
if cfg.StreamsOutbound > 0 {
base.StreamsOutbound = cfg.StreamsOutbound
}
if cfg.Conns > 0 {
base.Conns = cfg.Conns
}
if cfg.ConnsInbound > 0 {
base.ConnsInbound = cfg.ConnsInbound
}
if cfg.ConnsOutbound > 0 {
base.ConnsOutbound = cfg.ConnsOutbound
}
if cfg.FD > 0 {
base.FD = cfg.FD
}
switch {
case cfg.Memory > 0:
return &StaticLimit{
Memory: cfg.Memory,
BaseLimit: base,
}, nil
case cfg.Dynamic:
return nil, fmt.Errorf("cannot specify dynamic limit for fixed memory limit")
default:
if cfg.MemoryFraction > 0 || cfg.MinMemory > 0 || cfg.MaxMemory > 0 {
return nil, fmt.Errorf("cannot specify dynamic range for fixed memory limit")
}
return &StaticLimit{
Memory: mem,
BaseLimit: base,
}, nil
}
}
type limiterConfig struct {
System *limitConfig
Transient *limitConfig
ServiceDefault *limitConfig
ServicePeerDefault *limitConfig
Service map[string]limitConfig
ServicePeer map[string]limitConfig
ProtocolDefault *limitConfig
ProtocolPeerDefault *limitConfig
Protocol map[string]limitConfig
ProtocolPeer map[string]limitConfig
PeerDefault *limitConfig
Peer map[string]limitConfig
Conn *limitConfig
Stream *limitConfig
}
// NewDefaultLimiterFromJSON creates a new limiter by parsing a json configuration,
// using the default limits for fallback.
func NewDefaultLimiterFromJSON(in io.Reader) (*BasicLimiter, error) {
return NewLimiterFromJSON(in, DefaultLimits)
}
// NewLimiterFromJSON creates a new limiter by parsing a json configuration.
func NewLimiterFromJSON(in io.Reader, defaults DefaultLimitConfig) (*BasicLimiter, error) {
jin := json.NewDecoder(in)
var cfg limiterConfig
if err := jin.Decode(&cfg); err != nil {
return nil, err
}
limiter := new(BasicLimiter)
var err error
limiter.SystemLimits, err = cfg.System.toLimit(defaults.SystemBaseLimit, defaults.SystemMemory)
if err != nil {
return nil, fmt.Errorf("invalid system limit: %w", err)
}
limiter.TransientLimits, err = cfg.Transient.toLimit(defaults.TransientBaseLimit, defaults.TransientMemory)
if err != nil {
return nil, fmt.Errorf("invalid transient limit: %w", err)
}
limiter.DefaultServiceLimits, err = cfg.ServiceDefault.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory)
if err != nil {
return nil, fmt.Errorf("invlaid default service limit: %w", err)
}
limiter.DefaultServicePeerLimits, err = cfg.ServicePeerDefault.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory)
if err != nil {
return nil, fmt.Errorf("invlaid default service peer limit: %w", err)
}
if len(cfg.Service) > 0 {
limiter.ServiceLimits = make(map[string]Limit, len(cfg.Service))
for svc, cfgLimit := range cfg.Service {
limiter.ServiceLimits[svc], err = cfgLimit.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory)
if err != nil {
return nil, fmt.Errorf("invalid service limit for %s: %w", svc, err)
}
}
}
if len(cfg.ServicePeer) > 0 {
limiter.ServicePeerLimits = make(map[string]Limit, len(cfg.ServicePeer))
for svc, cfgLimit := range cfg.ServicePeer {
limiter.ServicePeerLimits[svc], err = cfgLimit.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid service peer limit for %s: %w", svc, err)
}
}
}
limiter.DefaultProtocolLimits, err = cfg.ProtocolDefault.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory)
if err != nil {
return nil, fmt.Errorf("invlaid default protocol limit: %w", err)
}
limiter.DefaultProtocolPeerLimits, err = cfg.ProtocolPeerDefault.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory)
if err != nil {
return nil, fmt.Errorf("invlaid default protocol peer limit: %w", err)
}
if len(cfg.Protocol) > 0 {
limiter.ProtocolLimits = make(map[protocol.ID]Limit, len(cfg.Protocol))
for p, cfgLimit := range cfg.Protocol {
limiter.ProtocolLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory)
if err != nil {
return nil, fmt.Errorf("invalid service limit for %s: %w", p, err)
}
}
}
if len(cfg.ProtocolPeer) > 0 {
limiter.ProtocolPeerLimits = make(map[protocol.ID]Limit, len(cfg.ProtocolPeer))
for p, cfgLimit := range cfg.ProtocolPeer {
limiter.ProtocolPeerLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid service peer limit for %s: %w", p, err)
}
}
}
limiter.DefaultPeerLimits, err = cfg.PeerDefault.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid peer limit: %w", err)
}
if len(cfg.Peer) > 0 {
limiter.PeerLimits = make(map[peer.ID]Limit, len(cfg.Peer))
for p, cfgLimit := range cfg.Peer {
pid, err := peer.IDFromString(p)
if err != nil {
return nil, fmt.Errorf("invalid peer ID %s: %w", p, err)
}
limiter.PeerLimits[pid], err = cfgLimit.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid peer limit for %s: %w", p, err)
}
}
}
limiter.ConnLimits, err = cfg.Conn.toLimitFixed(defaults.ConnBaseLimit, defaults.ConnMemory)
if err != nil {
return nil, fmt.Errorf("invalid conn limit: %w", err)
}
limiter.StreamLimits, err = cfg.Stream.toLimitFixed(defaults.StreamBaseLimit, defaults.StreamMemory)
if err != nil {
return nil, fmt.Errorf("invalid stream limit: %w", err)
}
return limiter, nil
}

124
limit_config_test.go Normal file
View File

@ -0,0 +1,124 @@
package rcmgr
import (
"os"
"testing"
"github.com/stretchr/testify/require"
)
func TestLimitConfigParser(t *testing.T) {
in, err := os.Open("limit_config_test.json")
require.NoError(t, err)
defer in.Close()
limiter, err := NewDefaultLimiterFromJSON(in)
require.NoError(t, err)
require.Equal(t,
&DynamicLimit{
MemoryLimit: MemoryLimit{
MinMemory: 16384,
MaxMemory: 65536,
MemoryFraction: 0.125,
},
BaseLimit: BaseLimit{
Streams: 64,
StreamsInbound: 32,
StreamsOutbound: 48,
Conns: 16,
ConnsInbound: 8,
ConnsOutbound: 16,
FD: 16,
},
},
limiter.SystemLimits)
require.Equal(t,
&StaticLimit{
Memory: 4096,
BaseLimit: DefaultLimits.TransientBaseLimit,
},
limiter.TransientLimits)
require.Equal(t,
&StaticLimit{
Memory: 8192,
BaseLimit: DefaultLimits.ServiceBaseLimit,
},
limiter.DefaultServiceLimits)
require.Equal(t,
&StaticLimit{
Memory: 2048,
BaseLimit: DefaultLimits.ServicePeerBaseLimit,
},
limiter.DefaultServicePeerLimits)
require.Equal(t, 1, len(limiter.ServiceLimits))
require.Equal(t,
&StaticLimit{
Memory: 8192,
BaseLimit: DefaultLimits.ServiceBaseLimit,
},
limiter.ServiceLimits["A"])
require.Equal(t, 1, len(limiter.ServicePeerLimits))
require.Equal(t,
&StaticLimit{
Memory: 4096,
BaseLimit: DefaultLimits.ServicePeerBaseLimit,
},
limiter.ServicePeerLimits["A"])
require.Equal(t,
&StaticLimit{
Memory: 2048,
BaseLimit: DefaultLimits.ProtocolBaseLimit,
},
limiter.DefaultProtocolLimits)
require.Equal(t,
&StaticLimit{
Memory: 1024,
BaseLimit: DefaultLimits.ProtocolPeerBaseLimit,
},
limiter.DefaultProtocolPeerLimits)
require.Equal(t, 1, len(limiter.ProtocolLimits))
require.Equal(t,
&StaticLimit{
Memory: 8192,
BaseLimit: DefaultLimits.ProtocolBaseLimit,
},
limiter.ProtocolLimits["/A"])
require.Equal(t, 1, len(limiter.ProtocolPeerLimits))
require.Equal(t,
&StaticLimit{
Memory: 4096,
BaseLimit: DefaultLimits.ProtocolPeerBaseLimit,
},
limiter.ProtocolPeerLimits["/A"])
require.Equal(t,
&StaticLimit{
Memory: 4096,
BaseLimit: DefaultLimits.PeerBaseLimit,
},
limiter.DefaultPeerLimits)
require.Equal(t,
&StaticLimit{
Memory: 1 << 20,
BaseLimit: DefaultLimits.ConnBaseLimit,
},
limiter.ConnLimits)
require.Equal(t,
&StaticLimit{
Memory: 16 << 20,
BaseLimit: DefaultLimits.StreamBaseLimit,
},
limiter.StreamLimits)
}

55
limit_config_test.json Normal file
View File

@ -0,0 +1,55 @@
{
"System": {
"Dynamic": true,
"MinMemory": 16384,
"MaxMemory": 65536,
"MemoryFraction": 0.125,
"Streams": 64,
"StreamsInbound": 32,
"StreamsOutbound": 48,
"Conns": 16,
"ConnsInbound": 8,
"ConnsOutbound": 16,
"FD": 16
},
"Transient": {
"MinMemory": 1024,
"MaxMemory": 4096,
"MemoryFraction": 0.03125
},
"ServiceDefault": {
"Memory": 8192
},
"ServicePeerDefault": {
"Memory": 2048
},
"Service": {
"A": {
"Memory": 8192
}
},
"ServicePeer": {
"A": {
"Memory": 4096
}
},
"ProtocolDefault": {
"Memory": 2048
},
"ProtocolPeerDefault": {
"Memory": 1024
},
"Protocol": {
"/A": {
"Memory": 8192
}
},
"ProtocolPeer": {
"/A": {
"Memory": 4096
}
},
"PeerDefault": {
"Memory": 4096
}
}

162
limit_defaults.go Normal file
View File

@ -0,0 +1,162 @@
package rcmgr
// DefaultLimitConfig is a struct for configuring default limits.
type DefaultLimitConfig struct {
SystemBaseLimit BaseLimit
SystemMemory MemoryLimit
TransientBaseLimit BaseLimit
TransientMemory MemoryLimit
ServiceBaseLimit BaseLimit
ServiceMemory MemoryLimit
ServicePeerBaseLimit BaseLimit
ServicePeerMemory MemoryLimit
ProtocolBaseLimit BaseLimit
ProtocolMemory MemoryLimit
ProtocolPeerBaseLimit BaseLimit
ProtocolPeerMemory MemoryLimit
PeerBaseLimit BaseLimit
PeerMemory MemoryLimit
ConnBaseLimit BaseLimit
ConnMemory int64
StreamBaseLimit BaseLimit
StreamMemory int64
}
func (cfg *DefaultLimitConfig) WithSystemMemory(memFraction float64, minMemory, maxMemory int64) DefaultLimitConfig {
refactor := memFraction / cfg.SystemMemory.MemoryFraction
r := *cfg
r.SystemMemory.MemoryFraction = memFraction
r.SystemMemory.MinMemory = minMemory
r.SystemMemory.MaxMemory = maxMemory
r.TransientMemory.MemoryFraction *= refactor
r.ServiceMemory.MemoryFraction *= refactor
r.ServicePeerMemory.MemoryFraction *= refactor
r.ProtocolMemory.MemoryFraction *= refactor
r.ProtocolPeerMemory.MemoryFraction *= refactor
r.PeerMemory.MemoryFraction *= refactor
return r
}
// DefaultLimits are the limits used by the default limiter constructors.
var DefaultLimits = DefaultLimitConfig{
SystemBaseLimit: BaseLimit{
StreamsInbound: 4096,
StreamsOutbound: 16384,
Streams: 16384,
ConnsInbound: 256,
ConnsOutbound: 1024,
Conns: 1024,
FD: 512,
},
SystemMemory: MemoryLimit{
MemoryFraction: 0.125,
MinMemory: 128 << 20,
MaxMemory: 1 << 30,
},
TransientBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 512,
Streams: 512,
ConnsInbound: 32,
ConnsOutbound: 128,
Conns: 128,
FD: 128,
},
TransientMemory: MemoryLimit{
MemoryFraction: 1,
MinMemory: 64 << 20,
MaxMemory: 64 << 20,
},
ServiceBaseLimit: BaseLimit{
StreamsInbound: 2048,
StreamsOutbound: 8192,
Streams: 8192,
},
ServiceMemory: MemoryLimit{
MemoryFraction: 0.125 / 4,
MinMemory: 64 << 20,
MaxMemory: 256 << 20,
},
ServicePeerBaseLimit: BaseLimit{
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
},
ServicePeerMemory: MemoryLimit{
MemoryFraction: 0.125 / 16,
MinMemory: 16 << 20,
MaxMemory: 64 << 20,
},
ProtocolBaseLimit: BaseLimit{
StreamsInbound: 1024,
StreamsOutbound: 4096,
Streams: 4096,
},
ProtocolMemory: MemoryLimit{
MemoryFraction: 0.125 / 8,
MinMemory: 64 << 20,
MaxMemory: 128 << 20,
},
ProtocolPeerBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 512,
},
ProtocolPeerMemory: MemoryLimit{
MemoryFraction: 0.125 / 16,
MinMemory: 16 << 20,
MaxMemory: 64 << 20,
},
PeerBaseLimit: BaseLimit{
StreamsInbound: 512,
StreamsOutbound: 1024,
Streams: 1024,
ConnsInbound: 8,
ConnsOutbound: 16,
Conns: 16,
FD: 8,
},
PeerMemory: MemoryLimit{
MemoryFraction: 0.125 / 16,
MinMemory: 64 << 20,
MaxMemory: 128 << 20,
},
ConnBaseLimit: BaseLimit{
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
ConnMemory: 1 << 20,
StreamBaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
},
StreamMemory: 16 << 20,
}

128
limit_dynamic.go Normal file
View File

@ -0,0 +1,128 @@
package rcmgr
import (
"runtime"
"github.com/pbnjay/memory"
)
// DynamicLimit is a limit with dynamic memory values, based on available (free) memory
type DynamicLimit struct {
BaseLimit
MemoryLimit
}
var _ Limit = (*DynamicLimit)(nil)
func (l *DynamicLimit) GetMemoryLimit() int64 {
freemem := memory.FreeMemory()
// account for memory retained by the runtime that is actually free
// HeapInuse - HeapAlloc is the memory available in allocator spans
// HeapIdle - HeapReleased is memory held by the runtime that could be returned to the OS
var memstat runtime.MemStats
runtime.ReadMemStats(&memstat)
freemem += (memstat.HeapInuse - memstat.HeapAlloc) + (memstat.HeapIdle - memstat.HeapReleased)
return l.MemoryLimit.GetMemory(int64(freemem))
}
func (l *DynamicLimit) WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit {
r := new(DynamicLimit)
*r = *l
r.MemoryLimit.MemoryFraction *= memFraction
r.MemoryLimit.MinMemory = minMemory
r.MemoryLimit.MaxMemory = maxMemory
return r
}
func (l *DynamicLimit) WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit {
r := new(DynamicLimit)
*r = *l
r.BaseLimit.StreamsInbound = numStreamsIn
r.BaseLimit.StreamsOutbound = numStreamsOut
r.BaseLimit.Streams = numStreams
return r
}
func (l *DynamicLimit) WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit {
r := new(DynamicLimit)
*r = *l
r.BaseLimit.ConnsInbound = numConnsIn
r.BaseLimit.ConnsOutbound = numConnsOut
r.BaseLimit.Conns = numConns
return r
}
func (l *DynamicLimit) WithFDLimit(numFD int) Limit {
r := new(DynamicLimit)
*r = *l
r.BaseLimit.FD = numFD
return r
}
// NewDefaultDynamicLimiter creates a limiter with default limits and a memory cap
// dynamically computed based on available memory.
func NewDefaultDynamicLimiter(memFraction float64, minMemory, maxMemory int64) *BasicLimiter {
return NewDynamicLimiter(DefaultLimits.WithSystemMemory(memFraction, minMemory, maxMemory))
}
// NewDynamicLimiter crates a dynamic limiter with the specified defaults
func NewDynamicLimiter(cfg DefaultLimitConfig) *BasicLimiter {
system := &DynamicLimit{
MemoryLimit: cfg.SystemMemory,
BaseLimit: cfg.SystemBaseLimit,
}
transient := &DynamicLimit{
MemoryLimit: cfg.TransientMemory,
BaseLimit: cfg.TransientBaseLimit,
}
svc := &DynamicLimit{
MemoryLimit: cfg.ServiceMemory,
BaseLimit: cfg.ServiceBaseLimit,
}
svcPeer := &DynamicLimit{
MemoryLimit: cfg.ServicePeerMemory,
BaseLimit: cfg.ServicePeerBaseLimit,
}
proto := &DynamicLimit{
MemoryLimit: cfg.ProtocolMemory,
BaseLimit: cfg.ProtocolBaseLimit,
}
protoPeer := &DynamicLimit{
MemoryLimit: cfg.ProtocolPeerMemory,
BaseLimit: cfg.ProtocolPeerBaseLimit,
}
peer := &DynamicLimit{
MemoryLimit: cfg.PeerMemory,
BaseLimit: cfg.PeerBaseLimit,
}
conn := &StaticLimit{
Memory: cfg.ConnMemory,
BaseLimit: cfg.ConnBaseLimit,
}
stream := &StaticLimit{
Memory: cfg.StreamMemory,
BaseLimit: cfg.StreamBaseLimit,
}
return &BasicLimiter{
SystemLimits: system,
TransientLimits: transient,
DefaultServiceLimits: svc,
DefaultServicePeerLimits: svcPeer,
DefaultProtocolLimits: proto,
DefaultProtocolPeerLimits: protoPeer,
DefaultPeerLimits: peer,
ConnLimits: conn,
StreamLimits: stream,
}
}

138
limit_static.go Normal file
View File

@ -0,0 +1,138 @@
package rcmgr
import (
"github.com/pbnjay/memory"
)
// StaticLimit is a limit with static values.
type StaticLimit struct {
BaseLimit
Memory int64
}
var _ Limit = (*StaticLimit)(nil)
func (l *StaticLimit) GetMemoryLimit() int64 {
return l.Memory
}
func (l *StaticLimit) WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit {
r := new(StaticLimit)
*r = *l
r.Memory = int64(memFraction * float64(r.Memory))
if r.Memory < minMemory {
r.Memory = minMemory
} else if r.Memory > maxMemory {
r.Memory = maxMemory
}
return r
}
func (l *StaticLimit) WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.StreamsInbound = numStreamsIn
r.BaseLimit.StreamsOutbound = numStreamsOut
r.BaseLimit.Streams = numStreams
return r
}
func (l *StaticLimit) WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.ConnsInbound = numConnsIn
r.BaseLimit.ConnsOutbound = numConnsOut
r.BaseLimit.Conns = numConns
return r
}
func (l *StaticLimit) WithFDLimit(numFD int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.FD = numFD
return r
}
// NewDefaultStaticLimiter creates a static limiter with default base limits and a system memory cap
// specified as a fraction of total system memory. The assigned memory will not be less than
// minMemory or more than maxMemory.
func NewDefaultStaticLimiter(memFraction float64, minMemory, maxMemory int64) *BasicLimiter {
return NewStaticLimiter(DefaultLimits.WithSystemMemory(memFraction, minMemory, maxMemory))
}
// NewDefaultFixedLimiter creates a static limiter with default base limits and a specified system
// memory cap.
func NewDefaultFixedLimiter(memoryCap int64) *BasicLimiter {
return NewStaticLimiter(DefaultLimits.WithSystemMemory(1, memoryCap, memoryCap))
}
// NewDefaultLimiter creates a static limiter with the default limits
func NewDefaultLimiter() *BasicLimiter {
return NewStaticLimiter(DefaultLimits)
}
// NewStaticLimiter creates a static limiter using the specified system memory cap and default
// limit config.
func NewStaticLimiter(cfg DefaultLimitConfig) *BasicLimiter {
memoryCap := memoryLimit(
int64(memory.TotalMemory()),
cfg.SystemMemory.MemoryFraction,
cfg.SystemMemory.MinMemory,
cfg.SystemMemory.MaxMemory)
system := &StaticLimit{
Memory: memoryCap,
BaseLimit: cfg.SystemBaseLimit,
}
transient := &StaticLimit{
Memory: cfg.TransientMemory.GetMemory(memoryCap),
BaseLimit: cfg.TransientBaseLimit,
}
svc := &StaticLimit{
Memory: cfg.ServiceMemory.GetMemory(memoryCap),
BaseLimit: cfg.ServiceBaseLimit,
}
svcPeer := &StaticLimit{
Memory: cfg.ServicePeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.ServicePeerBaseLimit,
}
proto := &StaticLimit{
Memory: cfg.ProtocolMemory.GetMemory(memoryCap),
BaseLimit: cfg.ProtocolBaseLimit,
}
protoPeer := &StaticLimit{
Memory: cfg.ProtocolPeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.ProtocolPeerBaseLimit,
}
peer := &StaticLimit{
Memory: cfg.PeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.PeerBaseLimit,
}
conn := &StaticLimit{
Memory: cfg.ConnMemory,
BaseLimit: cfg.ConnBaseLimit,
}
stream := &StaticLimit{
Memory: cfg.StreamMemory,
BaseLimit: cfg.StreamBaseLimit,
}
return &BasicLimiter{
SystemLimits: system,
TransientLimits: transient,
DefaultServiceLimits: svc,
DefaultServicePeerLimits: svcPeer,
DefaultProtocolLimits: proto,
DefaultProtocolPeerLimits: protoPeer,
DefaultPeerLimits: peer,
ConnLimits: conn,
StreamLimits: stream,
}
}

582
rcmgr.go Normal file
View File

@ -0,0 +1,582 @@
package rcmgr
import (
"context"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("rcmgr")
type resourceManager struct {
limits Limiter
trace *trace
system *systemScope
transient *transientScope
cancelCtx context.Context
cancel func()
wg sync.WaitGroup
mx sync.Mutex
svc map[string]*serviceScope
proto map[protocol.ID]*protocolScope
peer map[peer.ID]*peerScope
connId, streamId int64
}
var _ network.ResourceManager = (*resourceManager)(nil)
type systemScope struct {
*resourceScope
}
var _ network.ResourceScope = (*systemScope)(nil)
type transientScope struct {
*resourceScope
system *systemScope
}
var _ network.ResourceScope = (*transientScope)(nil)
type serviceScope struct {
*resourceScope
name string
rcmgr *resourceManager
peers map[peer.ID]*resourceScope
}
var _ network.ServiceScope = (*serviceScope)(nil)
type protocolScope struct {
*resourceScope
proto protocol.ID
rcmgr *resourceManager
peers map[peer.ID]*resourceScope
}
var _ network.ProtocolScope = (*protocolScope)(nil)
type peerScope struct {
*resourceScope
peer peer.ID
rcmgr *resourceManager
}
var _ network.PeerScope = (*peerScope)(nil)
type connectionScope struct {
*resourceScope
dir network.Direction
usefd bool
rcmgr *resourceManager
peer *peerScope
}
var _ network.ConnScope = (*connectionScope)(nil)
var _ network.ConnManagementScope = (*connectionScope)(nil)
type streamScope struct {
*resourceScope
dir network.Direction
rcmgr *resourceManager
peer *peerScope
svc *serviceScope
proto *protocolScope
peerProtoScope *resourceScope
peerSvcScope *resourceScope
}
var _ network.StreamScope = (*streamScope)(nil)
var _ network.StreamManagementScope = (*streamScope)(nil)
type Option func(*resourceManager) error
func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager, error) {
r := &resourceManager{
limits: limits,
svc: make(map[string]*serviceScope),
proto: make(map[protocol.ID]*protocolScope),
peer: make(map[peer.ID]*peerScope),
}
for _, opt := range opts {
if err := opt(r); err != nil {
return nil, err
}
}
if err := r.trace.Start(limits); err != nil {
return nil, err
}
r.system = newSystemScope(limits.GetSystemLimits(), r)
r.system.IncRef()
r.transient = newTransientScope(limits.GetTransientLimits(), r)
r.transient.IncRef()
r.cancelCtx, r.cancel = context.WithCancel(context.Background())
r.wg.Add(1)
go r.background()
return r, nil
}
func (r *resourceManager) ViewSystem(f func(network.ResourceScope) error) error {
return f(r.system)
}
func (r *resourceManager) ViewTransient(f func(network.ResourceScope) error) error {
return f(r.transient)
}
func (r *resourceManager) ViewService(srv string, f func(network.ServiceScope) error) error {
s := r.getServiceScope(srv)
defer s.DecRef()
return f(s)
}
func (r *resourceManager) ViewProtocol(proto protocol.ID, f func(network.ProtocolScope) error) error {
s := r.getProtocolScope(proto)
defer s.DecRef()
return f(s)
}
func (r *resourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error {
s := r.getPeerScope(p)
defer s.DecRef()
return f(s)
}
func (r *resourceManager) getServiceScope(svc string) *serviceScope {
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.svc[svc]
if !ok {
s = newServiceScope(svc, r.limits.GetServiceLimits(svc), r)
r.svc[svc] = s
}
s.IncRef()
return s
}
func (r *resourceManager) getProtocolScope(proto protocol.ID) *protocolScope {
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.proto[proto]
if !ok {
s = newProtocolScope(proto, r.limits.GetProtocolLimits(proto), r)
r.proto[proto] = s
}
s.IncRef()
return s
}
func (r *resourceManager) getPeerScope(p peer.ID) *peerScope {
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.peer[p]
if !ok {
s = newPeerScope(p, r.limits.GetPeerLimits(p), r)
r.peer[p] = s
}
s.IncRef()
return s
}
func (r *resourceManager) nextConnId() int64 {
r.mx.Lock()
defer r.mx.Unlock()
r.connId++
return r.connId
}
func (r *resourceManager) nextStreamId() int64 {
r.mx.Lock()
defer r.mx.Unlock()
r.streamId++
return r.streamId
}
func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) {
conn := newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r)
if err := conn.AddConn(dir, usefd); err != nil {
conn.Done()
return nil, err
}
return conn, nil
}
func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
peer := r.getPeerScope(p)
stream := newStreamScope(dir, r.limits.GetStreamLimits(p), peer, r)
peer.DecRef() // we have the reference in edges
err := stream.AddStream(dir)
if err != nil {
stream.Done()
return nil, err
}
return stream, nil
}
func (r *resourceManager) Close() error {
r.cancel()
r.wg.Wait()
r.trace.Close()
return nil
}
func (r *resourceManager) background() {
defer r.wg.Done()
// periodically garbage collects unused peer and protocol scopes
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.gc()
case <-r.cancelCtx.Done():
return
}
}
}
func (r *resourceManager) gc() {
r.mx.Lock()
defer r.mx.Unlock()
for proto, s := range r.proto {
if s.IsUnused() {
s.Done()
delete(r.proto, proto)
}
}
var deadPeers []peer.ID
for p, s := range r.peer {
if s.IsUnused() {
s.Done()
delete(r.peer, p)
deadPeers = append(deadPeers, p)
}
}
for _, s := range r.svc {
s.Lock()
for _, p := range deadPeers {
ps, ok := s.peers[p]
if ok {
ps.Done()
delete(s.peers, p)
}
}
s.Unlock()
}
for _, s := range r.proto {
s.Lock()
for _, p := range deadPeers {
ps, ok := s.peers[p]
if ok {
ps.Done()
delete(s.peers, p)
}
}
s.Unlock()
}
}
func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope {
return &systemScope{
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace),
}
}
func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope {
return &transientScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, "transient", rcmgr.trace),
system: rcmgr.system,
}
}
func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope {
return &serviceScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service:%s", name), rcmgr.trace),
name: name,
rcmgr: rcmgr,
}
}
func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope {
return &protocolScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol:%s", proto), rcmgr.trace),
proto: proto,
rcmgr: rcmgr,
}
}
func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
return &peerScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer:%s", p), rcmgr.trace),
peer: p,
rcmgr: rcmgr,
}
}
func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager) *connectionScope {
return &connectionScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
}
}
func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope {
return &streamScope{
resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
}
}
func (s *serviceScope) Name() string {
return s.name
}
func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.Lock()
defer s.Unlock()
ps, ok := s.peers[p]
if ok {
ps.IncRef()
return ps
}
l := s.rcmgr.limits.GetServicePeerLimits(s.name)
if s.peers == nil {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
s.peers[p] = ps
ps.IncRef()
return ps
}
func (s *protocolScope) Protocol() protocol.ID {
return s.proto
}
func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope {
s.Lock()
defer s.Unlock()
ps, ok := s.peers[p]
if ok {
ps.IncRef()
return ps
}
l := s.rcmgr.limits.GetProtocolPeerLimits(s.proto)
if s.peers == nil {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
s.peers[p] = ps
ps.IncRef()
return ps
}
func (s *peerScope) Peer() peer.ID {
return s.peer
}
func (s *connectionScope) PeerScope() network.PeerScope {
s.Lock()
defer s.Unlock()
return s.peer
}
func (s *connectionScope) SetPeer(p peer.ID) error {
s.Lock()
defer s.Unlock()
if s.peer != nil {
return fmt.Errorf("connection scope already attached to a peer")
}
s.peer = s.rcmgr.getPeerScope(p)
// juggle resources from transient scope to peer scope
stat := s.resourceScope.rc.stat()
if err := s.peer.ReserveForChild(stat); err != nil {
s.peer.DecRef()
s.peer = nil
return err
}
s.rcmgr.transient.ReleaseForChild(stat)
s.rcmgr.transient.DecRef() // removed from edges
// update edges
edges := []*resourceScope{
s.peer.resourceScope,
s.rcmgr.system.resourceScope,
}
s.resourceScope.edges = edges
return nil
}
func (s *streamScope) ProtocolScope() network.ProtocolScope {
s.Lock()
defer s.Unlock()
return s.proto
}
func (s *streamScope) SetProtocol(proto protocol.ID) error {
s.Lock()
defer s.Unlock()
if s.proto != nil {
return fmt.Errorf("stream scope already attached to a protocol")
}
s.proto = s.rcmgr.getProtocolScope(proto)
// juggle resources from transient scope to protocol scope
stat := s.resourceScope.rc.stat()
if err := s.proto.ReserveForChild(stat); err != nil {
s.proto.DecRef()
s.proto = nil
return err
}
s.peerProtoScope = s.proto.getPeerScope(s.peer.peer)
if err := s.peerProtoScope.ReserveForChild(stat); err != nil {
s.proto.ReleaseForChild(stat)
s.proto.DecRef()
s.proto = nil
s.peerProtoScope.DecRef()
s.peerProtoScope = nil
return err
}
s.rcmgr.transient.ReleaseForChild(stat)
s.rcmgr.transient.DecRef() // removed from edges
// update edges
edges := []*resourceScope{
s.peer.resourceScope,
s.peerProtoScope,
s.proto.resourceScope,
s.rcmgr.system.resourceScope,
}
s.resourceScope.edges = edges
return nil
}
func (s *streamScope) ServiceScope() network.ServiceScope {
s.Lock()
defer s.Unlock()
return s.svc
}
func (s *streamScope) SetService(svc string) error {
s.Lock()
defer s.Unlock()
if s.svc != nil {
return fmt.Errorf("stream scope already attached to a service")
}
if s.proto == nil {
return fmt.Errorf("stream scope not attached to a protocol")
}
s.svc = s.rcmgr.getServiceScope(svc)
// reserve resources in service
stat := s.resourceScope.rc.stat()
if err := s.svc.ReserveForChild(stat); err != nil {
s.svc.DecRef()
s.svc = nil
return err
}
// get the per peer service scope constraint, if any
s.peerSvcScope = s.svc.getPeerScope(s.peer.peer)
if err := s.peerSvcScope.ReserveForChild(stat); err != nil {
s.svc.ReleaseForChild(stat)
s.svc.DecRef()
s.svc = nil
s.peerSvcScope.DecRef()
s.peerSvcScope = nil
return err
}
// update edges
edges := []*resourceScope{
s.peer.resourceScope,
s.peerProtoScope,
s.peerSvcScope,
s.proto.resourceScope,
s.svc.resourceScope,
s.rcmgr.system.resourceScope,
}
s.resourceScope.edges = edges
return nil
}
func (s *streamScope) PeerScope() network.PeerScope {
s.Lock()
defer s.Unlock()
return s.peer
}

985
rcmgr_test.go Normal file
View File

@ -0,0 +1,985 @@
package rcmgr
import (
"testing"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
func TestResourceManager(t *testing.T) {
peerA := peer.ID("A")
peerB := peer.ID("B")
protoA := protocol.ID("/A")
protoB := protocol.ID("/B")
svcA := "A.svc"
svcB := "B.svc"
nmgr, err := NewResourceManager(
&BasicLimiter{
SystemLimits: &StaticLimit{
Memory: 16384,
BaseLimit: BaseLimit{
StreamsInbound: 3,
StreamsOutbound: 3,
Streams: 6,
ConnsInbound: 3,
ConnsOutbound: 3,
Conns: 6,
FD: 2,
},
},
TransientLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
},
DefaultServiceLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
},
DefaultServicePeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
},
ServiceLimits: map[string]Limit{
svcA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
},
ServicePeerLimits: map[string]Limit{
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
},
DefaultProtocolLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
ProtocolLimits: map[protocol.ID]Limit{
protoA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
},
},
},
ProtocolPeerLimits: map[protocol.ID]Limit{
protoB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
},
DefaultPeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
},
DefaultProtocolPeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
},
PeerLimits: map[peer.ID]Limit{
peerA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
},
ConnLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
},
StreamLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
},
},
})
if err != nil {
t.Fatal(err)
}
mgr := nmgr.(*resourceManager)
defer mgr.Close()
checkRefCnt := func(s *resourceScope, count int) {
t.Helper()
if refCnt := s.refCnt; refCnt != count {
t.Fatalf("expected refCnt of %d, got %d", count, refCnt)
}
}
checkSystem := func(check func(s *resourceScope)) {
if err := mgr.ViewSystem(func(s network.ResourceScope) error {
check(s.(*systemScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkTransient := func(check func(s *resourceScope)) {
if err := mgr.ViewTransient(func(s network.ResourceScope) error {
check(s.(*transientScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkService := func(svc string, check func(s *resourceScope)) {
if err := mgr.ViewService(svc, func(s network.ServiceScope) error {
check(s.(*serviceScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkProtocol := func(p protocol.ID, check func(s *resourceScope)) {
if err := mgr.ViewProtocol(p, func(s network.ProtocolScope) error {
check(s.(*protocolScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkPeer := func(p peer.ID, check func(s *resourceScope)) {
if err := mgr.ViewPeer(p, func(s network.PeerScope) error {
check(s.(*peerScope).resourceScope)
return nil
}); err != nil {
t.Fatal(err)
}
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open an inbound connection, using an fd
conn, err := mgr.OpenConnection(network.DirInbound, true)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// the connection is transient, we shouldn't be able to open a second one
if _, err := mgr.OpenConnection(network.DirInbound, true); err == nil {
t.Fatal("expected OpenConnection to fail")
}
if _, err := mgr.OpenConnection(network.DirInbound, false); err == nil {
t.Fatal("expected OpenConnection to fail")
}
// close it to check resources are reclaimed
conn.Done()
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open another inbound connection, using an fd
conn1, err := mgr.OpenConnection(network.DirInbound, true)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// attach to a peer
if err := conn1.SetPeer(peerA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// we should be able to open a second transient connection now
conn2, err := mgr.OpenConnection(network.DirInbound, true)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// but we shouldn't be able to attach it to the same peer due to the fd limit
if err := conn2.SetPeer(peerA); err == nil {
t.Fatal("expected SetPeer to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 1})
})
// close it and reopen without using an FD -- we should be able to attach now
conn2.Done()
conn2, err = mgr.OpenConnection(network.DirInbound, false)
if err != nil {
t.Fatal(err)
}
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 1, NumFD: 0})
})
if err := conn2.SetPeer(peerA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open a stream
stream, err := mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
// the stream is transient we shouldn't be able to open a second one
if _, err := mgr.OpenStream(peerA, network.DirInbound); err == nil {
t.Fatal("expected OpenStream to fail")
}
// close the stream to check resource reclamation
stream.Done()
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open another stream, but this time attach it to a protocol
stream1, err := mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream1.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// and now we should be able to open another stream and attach it to the protocol
stream2, err := mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream2.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// open a 3rd stream, and try to attach it to the same protocol
stream3, err := mgr.OpenStream(peerB, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 10)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream3.SetProtocol(protoA); err == nil {
t.Fatal("expected SetProtocol to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 10)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
// but we should be able to set to another protocol
if err := stream3.SetProtocol(protoB); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 11)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// we should be able to attach stream1 and stream2 to svcA, but stream3 should fail due to limit
if err := stream1.SetService(svcA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 12)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
if err := stream2.SetService(svcA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 12)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
if err := stream3.SetService(svcA); err == nil {
t.Fatal("expected SetService to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2, NumConnsInbound: 2, NumFD: 1})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 12)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 3, NumConnsInbound: 2, NumFD: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// and now let's reclaim our resources to make sure we can gc unused peer and proto scopes
// but first check internal refs
mgr.mx.Lock()
_, okProtoA := mgr.proto[protoA]
_, okProtoB := mgr.proto[protoB]
_, okPeerA := mgr.peer[peerA]
_, okPeerB := mgr.peer[peerB]
mgr.mx.Unlock()
if !okProtoA {
t.Fatal("protocol scope is not stored")
}
if !okProtoB {
t.Fatal("protocol scope is not stored")
}
if !okPeerA {
t.Fatal("peer scope is not stored")
}
if !okPeerB {
t.Fatal("peer scope is not stored")
}
// ok, reclaim
stream1.Done()
stream2.Done()
stream3.Done()
conn1.Done()
conn2.Done()
// check everything released
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkPeer(peerB, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkService(svcA, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
mgr.gc()
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
mgr.mx.Lock()
lenProto := len(mgr.proto)
lenPeer := len(mgr.peer)
mgr.mx.Unlock()
if lenProto != 0 {
t.Fatal("protocols were not gc'ed")
}
if lenPeer != 0 {
t.Fatal("perrs were not gc'ed")
}
// check that per protocol peer scopes work as intended
stream1, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 5)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream1.SetProtocol(protoB); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
stream2, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream2.SetProtocol(protoB); err == nil {
t.Fatal("expected SetProtocol to fail")
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
stream1.Done()
stream2.Done()
// check that per service peer scopes work as intended
stream1, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 6)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream1.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 7)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
stream2, err = mgr.OpenStream(peerA, network.DirInbound)
if err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
if err := stream2.SetProtocol(protoA); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 8)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
if err := stream1.SetService(svcB); err != nil {
t.Fatal(err)
}
checkPeer(peerA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkService(svcB, func(s *resourceScope) {
checkRefCnt(s, 2)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 1})
})
checkProtocol(protoA, func(s *resourceScope) {
checkRefCnt(s, 3)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 9)
checkResources(t, &s.rc, network.ScopeStat{NumStreamsInbound: 2})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
// now we should fail to set the service for stream2 to svcB because of the service peer limit
if err := stream2.SetService(svcB); err == nil {
t.Fatal("expected SetService to fail")
}
// now release resources and check interior gc of per service peer scopes
stream1.Done()
stream2.Done()
mgr.gc()
checkSystem(func(s *resourceScope) {
checkRefCnt(s, 4)
checkResources(t, &s.rc, network.ScopeStat{})
})
checkTransient(func(s *resourceScope) {
checkRefCnt(s, 1)
checkResources(t, &s.rc, network.ScopeStat{})
})
mgr.mx.Lock()
lenProto = len(mgr.proto)
lenPeer = len(mgr.peer)
mgr.mx.Unlock()
svc := mgr.svc[svcB]
svc.Lock()
lenSvcPeer := len(svc.peers)
svc.Unlock()
if lenProto != 0 {
t.Fatal("protocols were not gc'ed")
}
if lenPeer != 0 {
t.Fatal("peers were not gc'ed")
}
if lenSvcPeer != 0 {
t.Fatal("service peers were not gc'ed")
}
}

690
scope.go Normal file
View File

@ -0,0 +1,690 @@
package rcmgr
import (
"fmt"
"sync"
"github.com/libp2p/go-libp2p-core/network"
)
// resources tracks the current state of resource consumption
type resources struct {
limit Limit
nconnsIn, nconnsOut int
nstreamsIn, nstreamsOut int
nfd int
memory int64
}
// A resourceScope can be a DAG, where a downstream node is not allowed to outlive an upstream node
// (ie cannot call Done in the upstream node before the downstream node) and account for resources
// using a linearized parent set.
// A resourceScope can be a span scope, where it has a specific owner; span scopes create a tree rooted
// at the owner (which can be a DAG scope) and can outlive their parents -- this is important because
// span scopes are the main *user* interface for memory management, and the user may call
// Done in a span scope after the system has closed the root of the span tree in some background
// goroutine.
// If we didn't make this distinction we would have a double release problem in that case.
type resourceScope struct {
sync.Mutex
done bool
refCnt int
rc resources
owner *resourceScope // set in span scopes, which define trees
edges []*resourceScope // set in DAG scopes, it's the linearized parent set
name string // for debugging purposes
trace *trace // debug tracing
}
var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.ResourceScopeSpan = (*resourceScope)(nil)
func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace) *resourceScope {
for _, e := range edges {
e.IncRef()
}
r := &resourceScope{
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
}
r.trace.CreateScope(name, limit)
return r
}
func newResourceScopeSpan(owner *resourceScope) *resourceScope {
r := &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.span", owner.name),
trace: owner.trace,
}
r.trace.CreateScope(r.name, r.rc.limit)
return r
}
// Resources implementation
func (rc *resources) checkMemory(rsvp int64, prio uint8) error {
// overflow check; this also has the side effect that we cannot reserve negative memory.
newmem := rc.memory + rsvp
limit := rc.limit.GetMemoryLimit()
threshold := (1 + int64(prio)) * limit / 256
if newmem > threshold {
return network.ErrResourceLimitExceeded
}
return nil
}
func (rc *resources) reserveMemory(size int64, prio uint8) error {
if err := rc.checkMemory(size, prio); err != nil {
return err
}
rc.memory += size
return nil
}
func (rc *resources) releaseMemory(size int64) {
rc.memory -= size
// sanity check for bugs upstream
if rc.memory < 0 {
log.Error("BUG: too much memory released")
rc.memory = 0
}
}
func (rc *resources) addStream(dir network.Direction) error {
if dir == network.DirInbound {
return rc.addStreams(1, 0)
}
return rc.addStreams(0, 1)
}
func (rc *resources) addStreams(incount, outcount int) error {
if incount > 0 && rc.nstreamsIn+incount > rc.limit.GetStreamLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
}
if outcount > 0 && rc.nstreamsOut+outcount > rc.limit.GetStreamLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
}
if rc.nstreamsIn+incount+rc.nstreamsOut+outcount > rc.limit.GetStreamTotalLimit() {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
}
rc.nstreamsIn += incount
rc.nstreamsOut += outcount
return nil
}
func (rc *resources) removeStream(dir network.Direction) {
if dir == network.DirInbound {
rc.removeStreams(1, 0)
} else {
rc.removeStreams(0, 1)
}
}
func (rc *resources) removeStreams(incount, outcount int) {
rc.nstreamsIn -= incount
rc.nstreamsOut -= outcount
if rc.nstreamsIn < 0 {
log.Error("BUG: too many inbound streams released")
rc.nstreamsIn = 0
}
if rc.nstreamsOut < 0 {
log.Error("BUG: too many outbound streams released")
rc.nstreamsOut = 0
}
}
func (rc *resources) addConn(dir network.Direction, usefd bool) error {
var fd int
if usefd {
fd = 1
}
if dir == network.DirInbound {
return rc.addConns(1, 0, fd)
}
return rc.addConns(0, 1, fd)
}
func (rc *resources) addConns(incount, outcount, fdcount int) error {
if incount > 0 && rc.nconnsIn+incount > rc.limit.GetConnLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
}
if outcount > 0 && rc.nconnsOut+outcount > rc.limit.GetConnLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
}
if rc.nconnsIn+incount+rc.nconnsOut+outcount > rc.limit.GetConnTotalLimit() {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
}
if fdcount > 0 && rc.nfd+fdcount > rc.limit.GetFDLimit() {
return fmt.Errorf("cannot reserve file descriptor: %w", network.ErrResourceLimitExceeded)
}
rc.nconnsIn += incount
rc.nconnsOut += outcount
rc.nfd += fdcount
return nil
}
func (rc *resources) removeConn(dir network.Direction, usefd bool) {
var fd int
if usefd {
fd = 1
}
if dir == network.DirInbound {
rc.removeConns(1, 0, fd)
} else {
rc.removeConns(0, 1, fd)
}
}
func (rc *resources) removeConns(incount, outcount, fdcount int) {
rc.nconnsIn -= incount
rc.nconnsOut -= outcount
rc.nfd -= fdcount
if rc.nconnsIn < 0 {
log.Error("BUG: too many inbound connections released")
rc.nconnsIn = 0
}
if rc.nconnsOut < 0 {
log.Error("BUG: too many outbound connections released")
rc.nconnsOut = 0
}
if rc.nfd < 0 {
log.Error("BUG: too many file descriptors released")
rc.nfd = 0
}
}
func (rc *resources) stat() network.ScopeStat {
return network.ScopeStat{
Memory: rc.memory,
NumStreamsInbound: rc.nstreamsIn,
NumStreamsOutbound: rc.nstreamsOut,
NumConnsInbound: rc.nconnsIn,
NumConnsOutbound: rc.nconnsOut,
NumFD: rc.nfd,
}
}
// resourceScope implementation
func (s *resourceScope) wrapError(err error) error {
return fmt.Errorf("%s: %w", s.name, err)
}
func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.reserveMemory(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "error", err)
s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory)
return s.wrapError(err)
}
if err := s.reserveMemoryForEdges(size, prio); err != nil {
log.Debugw("blocked memory reservation from constraining edge", "scope", s.name, "size", size, "priority", prio, "error", err)
s.rc.releaseMemory(int64(size))
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, prio, int64(size), s.rc.memory)
return nil
}
func (s *resourceScope) reserveMemoryForEdges(size int, prio uint8) error {
if s.owner != nil {
return s.owner.ReserveMemory(size, prio)
}
var reserved int
var err error
for _, e := range s.edges {
if err = e.ReserveMemoryForChild(int64(size), prio); err != nil {
break
}
reserved++
}
if err != nil {
// we failed because of a constraint; undo memory reservations
for _, e := range s.edges[:reserved] {
e.ReleaseMemoryForChild(int64(size))
}
}
return err
}
func (s *resourceScope) releaseMemoryForEdges(size int) {
if s.owner != nil {
s.owner.ReleaseMemory(size)
return
}
for _, e := range s.edges {
e.ReleaseMemoryForChild(int64(size))
}
}
func (s *resourceScope) ReserveMemoryForChild(size int64, prio uint8) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.reserveMemory(size, prio); err != nil {
s.trace.BlockReserveMemory(s.name, prio, size, s.rc.memory)
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, prio, size, s.rc.memory)
return nil
}
func (s *resourceScope) ReleaseMemory(size int) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(int64(size))
s.releaseMemoryForEdges(size)
s.trace.ReleaseMemory(s.name, int64(size), s.rc.memory)
}
func (s *resourceScope) ReleaseMemoryForChild(size int64) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(size)
s.trace.ReleaseMemory(s.name, size, s.rc.memory)
}
func (s *resourceScope) AddStream(dir network.Direction) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addStream(dir); err != nil {
log.Debugw("blocked stream", "scope", s.name, "direction", dir, "error", err)
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
}
if err := s.addStreamForEdges(dir); err != nil {
log.Debugw("blocked stream from constraining edge", "scope", s.name, "direction", dir, "error", err)
s.rc.removeStream(dir)
return s.wrapError(err)
}
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return nil
}
func (s *resourceScope) addStreamForEdges(dir network.Direction) error {
if s.owner != nil {
return s.owner.AddStream(dir)
}
var err error
var reserved int
for _, e := range s.edges {
if err = e.AddStreamForChild(dir); err != nil {
break
}
reserved++
}
if err != nil {
for _, e := range s.edges[:reserved] {
e.RemoveStreamForChild(dir)
}
}
return err
}
func (s *resourceScope) AddStreamForChild(dir network.Direction) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addStream(dir); err != nil {
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
}
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return nil
}
func (s *resourceScope) RemoveStream(dir network.Direction) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeStream(dir)
s.removeStreamForEdges(dir)
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
}
func (s *resourceScope) removeStreamForEdges(dir network.Direction) {
if s.owner != nil {
s.owner.RemoveStream(dir)
return
}
for _, e := range s.edges {
e.RemoveStreamForChild(dir)
}
}
func (s *resourceScope) RemoveStreamForChild(dir network.Direction) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeStream(dir)
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
}
func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addConn(dir, usefd); err != nil {
log.Debugw("blocked connection", "scope", s.name, "direction", dir, "usefd", usefd, "error", err)
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.wrapError(err)
}
if err := s.addConnForEdges(dir, usefd); err != nil {
log.Debugw("blocked connection from constraining edge", "scope", s.name, "direction", dir, "usefd", usefd, "error", err)
s.rc.removeConn(dir, usefd)
return s.wrapError(err)
}
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
func (s *resourceScope) addConnForEdges(dir network.Direction, usefd bool) error {
if s.owner != nil {
return s.owner.AddConn(dir, usefd)
}
var err error
var reserved int
for _, e := range s.edges {
if err = e.AddConnForChild(dir, usefd); err != nil {
break
}
reserved++
}
if err != nil {
for _, e := range s.edges[:reserved] {
e.RemoveConnForChild(dir, usefd)
}
}
return err
}
func (s *resourceScope) AddConnForChild(dir network.Direction, usefd bool) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.addConn(dir, usefd); err != nil {
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.wrapError(err)
}
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
func (s *resourceScope) RemoveConn(dir network.Direction, usefd bool) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeConn(dir, usefd)
s.removeConnForEdges(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) removeConnForEdges(dir network.Direction, usefd bool) {
if s.owner != nil {
s.owner.RemoveConn(dir, usefd)
}
for _, e := range s.edges {
e.RemoveConnForChild(dir, usefd)
}
}
func (s *resourceScope) RemoveConnForChild(dir network.Direction, usefd bool) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeConn(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) ReserveForChild(st network.ScopeStat) error {
s.Lock()
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
}
if err := s.rc.reserveMemory(st.Memory, network.ReservationPriorityAlways); err != nil {
s.trace.BlockReserveMemory(s.name, 255, st.Memory, s.rc.memory)
return s.wrapError(err)
}
if err := s.rc.addStreams(st.NumStreamsInbound, st.NumStreamsOutbound); err != nil {
s.trace.BlockAddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.rc.releaseMemory(st.Memory)
return s.wrapError(err)
}
if err := s.rc.addConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD); err != nil {
s.trace.BlockAddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, 255, st.Memory, s.rc.memory)
s.trace.AddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.AddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
func (s *resourceScope) ReleaseForChild(st network.ScopeStat) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) ReleaseResources(st network.ScopeStat) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
if s.owner != nil {
s.owner.ReleaseResources(st)
} else {
for _, e := range s.edges {
e.ReleaseForChild(st)
}
}
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) BeginSpan() (network.ResourceScopeSpan, error) {
s.Lock()
defer s.Unlock()
if s.done {
return nil, s.wrapError(network.ErrResourceScopeClosed)
}
s.refCnt++
return newResourceScopeSpan(s), nil
}
func (s *resourceScope) Done() {
s.Lock()
defer s.Unlock()
if s.done {
return
}
stat := s.rc.stat()
if s.owner != nil {
s.owner.ReleaseResources(stat)
s.owner.DecRef()
} else {
for _, e := range s.edges {
e.ReleaseForChild(stat)
e.DecRef()
}
}
s.rc.nstreamsIn = 0
s.rc.nstreamsOut = 0
s.rc.nconnsIn = 0
s.rc.nconnsOut = 0
s.rc.nfd = 0
s.rc.memory = 0
s.done = true
s.trace.DestroyScope(s.name)
}
func (s *resourceScope) Stat() network.ScopeStat {
s.Lock()
defer s.Unlock()
return s.rc.stat()
}
func (s *resourceScope) IncRef() {
s.Lock()
defer s.Unlock()
s.refCnt++
}
func (s *resourceScope) DecRef() {
s.Lock()
defer s.Unlock()
s.refCnt--
}
func (s *resourceScope) IsUnused() bool {
s.Lock()
defer s.Unlock()
if s.done {
return true
}
if s.refCnt > 0 {
return false
}
st := s.rc.stat()
return st.NumStreamsInbound == 0 &&
st.NumStreamsOutbound == 0 &&
st.NumConnsInbound == 0 &&
st.NumConnsOutbound == 0 &&
st.NumFD == 0
}

1234
scope_test.go Normal file

File diff suppressed because it is too large Load Diff

510
trace.go Normal file
View File

@ -0,0 +1,510 @@
package rcmgr
import (
"compress/gzip"
"context"
"encoding/json"
"io"
"os"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
)
type trace struct {
path string
ctx context.Context
cancel func()
closed chan struct{}
mx sync.Mutex
done bool
pend []interface{}
}
func WithTrace(path string) Option {
return func(r *resourceManager) error {
r.trace = &trace{path: path}
return nil
}
}
const (
traceStartEvt = iota
traceCreateScopeEvt
traceDestroyScopeEvt
traceReserveMemoryEvt
traceBlockReserveMemoryEvt
traceReleaseMemoryEvt
traceAddStreamEvt
traceBlockAddStreamEvt
traceRemoveStreamEvt
traceAddConnEvt
traceBlockAddConnEvt
traceRemoveConnEvt
)
type traceEvt struct {
Type int
Scope string `json:",omitempty"`
Limit interface{} `json:",omitempty"`
Priority uint8 `json:",omitempty"`
Delta int64 `json:",omitempty"`
DeltaIn int `json:",omitempty"`
DeltaOut int `json:",omitempty"`
Memory int64 `json:",omitempty"`
StreamsIn int `json:",omitempty"`
StreamsOut int `json:",omitempty"`
ConnsIn int `json:",omitempty"`
ConnsOut int `json:",omitempty"`
FD int `json:",omitempty"`
}
func (t *trace) push(evt interface{}) {
t.mx.Lock()
defer t.mx.Unlock()
if t.done {
return
}
t.pend = append(t.pend, evt)
}
func (t *trace) background(out io.WriteCloser) {
defer close(t.closed)
defer out.Close()
gzOut := gzip.NewWriter(out)
defer gzOut.Close()
jsonOut := json.NewEncoder(gzOut)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var pend []interface{}
getEvents := func() {
t.mx.Lock()
tmp := t.pend
t.pend = pend[:0]
pend = tmp
t.mx.Unlock()
}
for {
select {
case <-ticker.C:
getEvents()
if len(pend) == 0 {
continue
}
if err := t.writeEvents(pend, jsonOut); err != nil {
log.Warnf("error writing rcmgr trace: %s", err)
t.mx.Lock()
t.done = true
t.mx.Unlock()
return
}
if err := gzOut.Flush(); err != nil {
log.Warnf("error flushing rcmgr trace: %s", err)
t.mx.Lock()
t.done = true
t.mx.Unlock()
return
}
case <-t.ctx.Done():
getEvents()
if len(pend) == 0 {
return
}
if err := t.writeEvents(pend, jsonOut); err != nil {
log.Warnf("error writing rcmgr trace: %s", err)
return
}
if err := gzOut.Flush(); err != nil {
log.Warnf("error flushing rcmgr trace: %s", err)
}
return
}
}
}
func (t *trace) writeEvents(pend []interface{}, jout *json.Encoder) error {
for _, e := range pend {
if err := jout.Encode(e); err != nil {
return err
}
}
return nil
}
func (t *trace) Start(limits Limiter) error {
if t == nil {
return nil
}
t.ctx, t.cancel = context.WithCancel(context.Background())
t.closed = make(chan struct{})
out, err := os.OpenFile(t.path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return nil
}
go t.background(out)
t.push(traceEvt{
Type: traceStartEvt,
Limit: limits,
})
return nil
}
func (t *trace) Close() error {
if t == nil {
return nil
}
t.mx.Lock()
if t.done {
t.mx.Unlock()
return nil
}
t.cancel()
t.done = true
t.mx.Unlock()
<-t.closed
return nil
}
func (t *trace) CreateScope(scope string, limit Limit) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceCreateScopeEvt,
Scope: scope,
Limit: limit,
})
}
func (t *trace) DestroyScope(scope string) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceDestroyScopeEvt,
Scope: scope,
})
}
func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceReserveMemoryEvt,
Scope: scope,
Priority: prio,
Delta: size,
Memory: mem,
})
}
func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceBlockReserveMemoryEvt,
Scope: scope,
Priority: prio,
Delta: size,
Memory: mem,
})
}
func (t *trace) ReleaseMemory(scope string, size, mem int64) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceReleaseMemoryEvt,
Scope: scope,
Delta: -size,
Memory: mem,
})
}
func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t.push(traceEvt{
Type: traceAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t.push(traceEvt{
Type: traceBlockAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
t.push(traceEvt{
Type: traceRemoveStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) AddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) BlockAddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceBlockAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) RemoveStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceRemoveStreamEvt,
Scope: scope,
DeltaIn: -deltaIn,
DeltaOut: -deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t.push(traceEvt{
Type: traceAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t.push(traceEvt{
Type: traceBlockAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
if usefd {
deltafd = -1
}
t.push(traceEvt{
Type: traceRemoveConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) AddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) BlockAddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceBlockAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) RemoveConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
t.push(traceEvt{
Type: traceRemoveConnEvt,
Scope: scope,
DeltaIn: -deltaIn,
DeltaOut: -deltaOut,
Delta: -int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}