implement buffers

This commit is contained in:
vyzo 2021-12-27 15:51:15 +02:00
parent 79d15fa18c
commit 4f7d23aa6f

111
scope.go
View File

@ -17,7 +17,8 @@ type Resources struct {
nfd int nfd int
memory int64 memory int64
buffers map[interface{}][]byte buffers map[int][]byte
nextBuf int
} }
// DAG ResourceScopes. // DAG ResourceScopes.
@ -35,6 +36,14 @@ type ResourceScope struct {
var _ network.ResourceScope = (*ResourceScope)(nil) var _ network.ResourceScope = (*ResourceScope)(nil)
var _ network.TransactionalScope = (*ResourceScope)(nil) var _ network.TransactionalScope = (*ResourceScope)(nil)
type Buffer struct {
s *ResourceScope
data []byte
key int
}
var _ network.Buffer = (*Buffer)(nil)
func NewResources(limit Limit) *Resources { func NewResources(limit Limit) *Resources {
return &Resources{ return &Resources{
limit: limit, limit: limit,
@ -68,10 +77,10 @@ func (rc *Resources) checkMemory(rsvp int64) error {
} }
func (rc *Resources) releaseBuffers() { func (rc *Resources) releaseBuffers() {
for key, buf := range rc.buffers { for _, buf := range rc.buffers {
pool.Put(buf) pool.Put(buf)
delete(rc.buffers, key)
} }
rc.buffers = nil
} }
func (rc *Resources) reserveMemory(size int64) error { func (rc *Resources) reserveMemory(size int64) error {
@ -92,45 +101,46 @@ func (rc *Resources) releaseMemory(size int64) {
} }
} }
func (rc *Resources) getBuffer(size int) ([]byte, error) { func (rc *Resources) getBuffer(size int) ([]byte, int, error) {
if err := rc.checkMemory(int64(size)); err != nil { if err := rc.checkMemory(int64(size)); err != nil {
return nil, err return nil, -1, err
} }
buf := pool.Get(size) buf := pool.Get(size)
key := rc.nextBuf
rc.memory += int64(size) rc.memory += int64(size)
if rc.buffers == nil { if rc.buffers == nil {
rc.buffers = make(map[interface{}][]byte) rc.buffers = make(map[int][]byte)
} }
rc.buffers[buf] = buf rc.buffers[key] = buf
rc.nextBuf++
return buf, nil return buf, key, nil
} }
func (rc *Resources) growBuffer(oldbuf []byte, newsize int) ([]byte, error) { func (rc *Resources) growBuffer(key int, newsize int) ([]byte, error) {
oldbuf, ok := rc.buffers[key]
if !ok {
return nil, fmt.Errorf("invalid buffer; cannot grow buffer not allocated through this scope")
}
grow := newsize - len(oldbuf) grow := newsize - len(oldbuf)
if err := rc.checkMemory(int64(grow)); err != nil { if err := rc.checkMemory(int64(grow)); err != nil {
return nil, err return nil, err
} }
_, ok := rc.buffers[oldbuf]
if !ok {
return nil, fmt.Errorf("invalid buffer; cannot grow buffer not allocated through this scope")
}
newbuf := pool.Get(newsize) newbuf := pool.Get(newsize)
copy(newbuf, oldbuf) copy(newbuf, oldbuf)
rc.memory += int64(grow) rc.memory += int64(grow)
rc.buffers[newbuf] = newbuf rc.buffers[key] = newbuf
delete(rc.buffers, oldbuf)
return newbuf, nil return newbuf, nil
} }
func (rc *Resources) releaseBuffer(buf []byte) { func (rc *Resources) releaseBuffer(key int) {
_, ok := rc.buffers[buf] buf, ok := rc.buffers[key]
if !ok { if !ok {
panic("BUG: release unknown buffer") panic("BUG: release unknown buffer")
} }
@ -142,7 +152,7 @@ func (rc *Resources) releaseBuffer(buf []byte) {
panic("BUG: too much memory released") panic("BUG: too much memory released")
} }
delete(rc.buffers, buf) delete(rc.buffers, key)
pool.Put(buf) pool.Put(buf)
} }
@ -259,6 +269,12 @@ func (s *ResourceScope) reserveMemoryForConstraints(size int) error {
return err return err
} }
func (s *ResourceScope) releaseMemoryForConstraints(size int) {
for _, cst := range s.constraints {
cst.ReleaseMemoryForChild(int64(size))
}
}
func (s *ResourceScope) ReserveMemoryForChild(size int64) error { func (s *ResourceScope) ReserveMemoryForChild(size int64) error {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -295,7 +311,7 @@ func (s *ResourceScope) ReleaseMemoryForChild(size int64) {
s.rc.releaseMemory(size) s.rc.releaseMemory(size)
} }
func (s *ResourceScope) GetBuffer(size int) ([]byte, error) { func (s *ResourceScope) GetBuffer(size int) (network.Buffer, error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -303,52 +319,61 @@ func (s *ResourceScope) GetBuffer(size int) ([]byte, error) {
return nil, ErrResourceScopeClosed return nil, ErrResourceScopeClosed
} }
buf, err := s.rc.getBuffer(size) buf, key, err := s.rc.getBuffer(size)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := s.reserveMemoryForConstraints(size); err != nil { if err := s.reserveMemoryForConstraints(size); err != nil {
s.rc.releaseBuffer(buf) s.rc.releaseBuffer(key)
return nil, err return nil, err
} }
return buf, err return &Buffer{s: s, data: buf, key: key}, nil
} }
func (s *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error) { func (b *Buffer) Data() []byte { return b.data }
s.Lock()
defer s.Unlock()
if s.done { func (b *Buffer) Grow(newsize int) error {
return nil, ErrResourceScopeClosed b.s.Lock()
defer b.s.Unlock()
if b.s.done {
return ErrResourceScopeClosed
} }
buf, err := s.rc.growBuffer(oldbuf, newsize) grow := newsize - len(b.data)
if err := b.s.reserveMemoryForConstraints(grow); err != nil {
return err
}
newbuf, err := b.s.rc.growBuffer(b.key, newsize)
if err != nil { if err != nil {
return nil, err b.s.releaseMemoryForConstraints(grow)
return err
} }
if err := s.reserveMemoryForConstraints(newsize - len(oldbuf)); err != nil { b.data = newbuf
s.rc.releaseBuffer(buf) return nil
return nil, err
}
return buf, err
} }
func (s *ResourceScope) ReleaseBuffer(buf []byte) { func (b *Buffer) Release() {
s.Lock() b.s.Lock()
defer s.Unlock() defer b.s.Unlock()
if s.done { if b.data == nil {
return return
} }
s.rc.releaseBuffer(buf) if b.s.done {
for _, cst := range s.constraints { return
cst.ReleaseMemoryForChild(int64(len(buf)))
} }
for _, cst := range b.s.constraints {
cst.ReleaseMemoryForChild(int64(len(b.data)))
}
b.s.rc.releaseBuffer(b.key)
b.data = nil
} }
func (s *ResourceScope) AddStream(dir network.Direction) error { func (s *ResourceScope) AddStream(dir network.Direction) error {