diff --git a/scope.go b/scope.go index 969dcfb..e52783b 100644 --- a/scope.go +++ b/scope.go @@ -17,7 +17,8 @@ type Resources struct { nfd int memory int64 - buffers map[interface{}][]byte + buffers map[int][]byte + nextBuf int } // DAG ResourceScopes. @@ -35,6 +36,14 @@ type ResourceScope struct { var _ network.ResourceScope = (*ResourceScope)(nil) var _ network.TransactionalScope = (*ResourceScope)(nil) +type Buffer struct { + s *ResourceScope + data []byte + key int +} + +var _ network.Buffer = (*Buffer)(nil) + func NewResources(limit Limit) *Resources { return &Resources{ limit: limit, @@ -68,10 +77,10 @@ func (rc *Resources) checkMemory(rsvp int64) error { } func (rc *Resources) releaseBuffers() { - for key, buf := range rc.buffers { + for _, buf := range rc.buffers { pool.Put(buf) - delete(rc.buffers, key) } + rc.buffers = nil } func (rc *Resources) reserveMemory(size int64) error { @@ -92,45 +101,46 @@ func (rc *Resources) releaseMemory(size int64) { } } -func (rc *Resources) getBuffer(size int) ([]byte, error) { +func (rc *Resources) getBuffer(size int) ([]byte, int, error) { if err := rc.checkMemory(int64(size)); err != nil { - return nil, err + return nil, -1, err } buf := pool.Get(size) + key := rc.nextBuf rc.memory += int64(size) if rc.buffers == nil { - rc.buffers = make(map[interface{}][]byte) + rc.buffers = make(map[int][]byte) } - rc.buffers[buf] = buf + rc.buffers[key] = buf + rc.nextBuf++ - return buf, nil + return buf, key, nil } -func (rc *Resources) growBuffer(oldbuf []byte, newsize int) ([]byte, error) { +func (rc *Resources) growBuffer(key int, newsize int) ([]byte, error) { + oldbuf, ok := rc.buffers[key] + if !ok { + return nil, fmt.Errorf("invalid buffer; cannot grow buffer not allocated through this scope") + } + grow := newsize - len(oldbuf) if err := rc.checkMemory(int64(grow)); err != nil { return nil, err } - _, ok := rc.buffers[oldbuf] - if !ok { - return nil, fmt.Errorf("invalid buffer; cannot grow buffer not allocated through this scope") - } - newbuf := pool.Get(newsize) copy(newbuf, oldbuf) rc.memory += int64(grow) - rc.buffers[newbuf] = newbuf - delete(rc.buffers, oldbuf) + rc.buffers[key] = newbuf return newbuf, nil } -func (rc *Resources) releaseBuffer(buf []byte) { - _, ok := rc.buffers[buf] +func (rc *Resources) releaseBuffer(key int) { + buf, ok := rc.buffers[key] if !ok { panic("BUG: release unknown buffer") } @@ -142,7 +152,7 @@ func (rc *Resources) releaseBuffer(buf []byte) { panic("BUG: too much memory released") } - delete(rc.buffers, buf) + delete(rc.buffers, key) pool.Put(buf) } @@ -259,6 +269,12 @@ func (s *ResourceScope) reserveMemoryForConstraints(size int) error { return err } +func (s *ResourceScope) releaseMemoryForConstraints(size int) { + for _, cst := range s.constraints { + cst.ReleaseMemoryForChild(int64(size)) + } +} + func (s *ResourceScope) ReserveMemoryForChild(size int64) error { s.Lock() defer s.Unlock() @@ -295,7 +311,7 @@ func (s *ResourceScope) ReleaseMemoryForChild(size int64) { s.rc.releaseMemory(size) } -func (s *ResourceScope) GetBuffer(size int) ([]byte, error) { +func (s *ResourceScope) GetBuffer(size int) (network.Buffer, error) { s.Lock() defer s.Unlock() @@ -303,52 +319,61 @@ func (s *ResourceScope) GetBuffer(size int) ([]byte, error) { return nil, ErrResourceScopeClosed } - buf, err := s.rc.getBuffer(size) + buf, key, err := s.rc.getBuffer(size) if err != nil { return nil, err } if err := s.reserveMemoryForConstraints(size); err != nil { - s.rc.releaseBuffer(buf) + s.rc.releaseBuffer(key) return nil, err } - return buf, err + return &Buffer{s: s, data: buf, key: key}, nil } -func (s *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error) { - s.Lock() - defer s.Unlock() +func (b *Buffer) Data() []byte { return b.data } - if s.done { - return nil, ErrResourceScopeClosed +func (b *Buffer) Grow(newsize int) error { + b.s.Lock() + defer b.s.Unlock() + + if b.s.done { + return ErrResourceScopeClosed } - buf, err := s.rc.growBuffer(oldbuf, newsize) + grow := newsize - len(b.data) + if err := b.s.reserveMemoryForConstraints(grow); err != nil { + return err + } + + newbuf, err := b.s.rc.growBuffer(b.key, newsize) if err != nil { - return nil, err + b.s.releaseMemoryForConstraints(grow) + return err } - if err := s.reserveMemoryForConstraints(newsize - len(oldbuf)); err != nil { - s.rc.releaseBuffer(buf) - return nil, err - } - - return buf, err + b.data = newbuf + return nil } -func (s *ResourceScope) ReleaseBuffer(buf []byte) { - s.Lock() - defer s.Unlock() +func (b *Buffer) Release() { + b.s.Lock() + defer b.s.Unlock() - if s.done { + if b.data == nil { return } - s.rc.releaseBuffer(buf) - for _, cst := range s.constraints { - cst.ReleaseMemoryForChild(int64(len(buf))) + if b.s.done { + return } + + for _, cst := range b.s.constraints { + cst.ReleaseMemoryForChild(int64(len(b.data))) + } + b.s.rc.releaseBuffer(b.key) + b.data = nil } func (s *ResourceScope) AddStream(dir network.Direction) error {