diff --git a/concurrent/Cond.go b/concurrent/Cond.go new file mode 100644 index 0000000..b4be267 --- /dev/null +++ b/concurrent/Cond.go @@ -0,0 +1,60 @@ +package concurrent + +import ( + "sync" + "time" +) + +type ( + Cond interface { + Signal() + Broadcast() + // Wait wait within unlocked environment + Wait() + // LockedWait wait within locked environment + LockedWait() + WaitOrTimeout(timeout time.Duration) bool + } + + CondImpl sync.Cond +) + +func NewCond(l sync.Locker) Cond { + return (*CondImpl)(sync.NewCond(l)) +} + +func (c *CondImpl) Signal() { + (*sync.Cond)(c).Signal() +} + +func (c *CondImpl) Broadcast() { + (*sync.Cond)(c).Broadcast() +} + +func (c *CondImpl) Wait() { + c.L.Lock() + defer c.L.Unlock() + (*sync.Cond)(c).Wait() +} + +func (c *CondImpl) LockedWait() { + (*sync.Cond)(c).Wait() +} + +func (c *CondImpl) WaitOrTimeout(timeout time.Duration) bool { + done := make(chan struct{}) + go func() { + c.L.Lock() + defer c.L.Unlock() + + c.Wait() + close(done) + }() + + select { + case <-time.After(timeout): + return false + case <-done: + return true + } +} diff --git a/concurrent/Once.go b/concurrent/Once.go new file mode 100644 index 0000000..3348dae --- /dev/null +++ b/concurrent/Once.go @@ -0,0 +1,19 @@ +package concurrent + +import ( + "sync" + "unsafe" +) + +type ( + Once sync.Once +) + +func (o *Once) Do(f func()) { + (*sync.Once)(o).Do(f) +} + +func (o *Once) IsDone() bool { + i := *(*uint32)(unsafe.Pointer(o)) + return i != 0 +} diff --git a/concurrent/collections/MessageQueue.go b/concurrent/collections/MessageQueue.go index 840f40c..f053b13 100644 --- a/concurrent/collections/MessageQueue.go +++ b/concurrent/collections/MessageQueue.go @@ -3,76 +3,55 @@ package collections import ( "sync" - "github.com/tursom/GoCollections/concurrent" - "github.com/tursom/GoCollections/lang/atomic" + "github.com/tursom/GoCollections/exceptions" + "github.com/tursom/GoCollections/lang" +) + +var ( + // MessageQueueCapacity message capacity of MQ + // -1 to unlimited + // this variable can let you discover problems before OOM crash + MessageQueueCapacity = 128 + MessageQueueWarnLimit = MessageQueueCapacity / 2 ) type ( - MessageQueue[T any] struct { - lock sync.Mutex - cond *sync.Cond - end *messageQueueNode[T] + MessageQueue[T any] interface { + // Subscribe subscribe this message queue + Subscribe() lang.ReceiveChannel[T] + Send(msg T) } - - messageQueueNode[T any] struct { - value T - next *messageQueueNode[T] + MessageQueueImpl[T lang.Object] struct { + chLock sync.Mutex + ch lang.Channel[T] } ) -func (q *MessageQueue[T]) getEnd() *messageQueueNode[T] { - if q.end == nil { - q.lock.Lock() - defer q.lock.Unlock() - if q.end == nil { - q.end = &messageQueueNode[T]{} - } +func (m *MessageQueueImpl[T]) checkCh() { + if m.ch != nil { + return } - return q.end + + m.chLock.Lock() + defer m.chLock.Unlock() + + if m.ch != nil { + return + } + + m.ch = lang.NewChannel[T](MessageQueueCapacity) } -func (q *MessageQueue[T]) getCond() *sync.Cond { - if q.cond == nil { - q.lock.Lock() - defer q.lock.Unlock() - q.cond = sync.NewCond(&q.lock) - } - return q.cond +func (m *MessageQueueImpl[T]) Subscribe() lang.ReceiveChannel[T] { + m.checkCh() + // package ch, remove closer to empty body + // closer is nil will just close this channel + return lang.WithReceiveChannel[T](m.ch, func() {}) } -func (q *MessageQueue[T]) Subscribe() (in <-chan T, canceler func()) { - end := q.getEnd() - ch := make(chan T) - canceled := false - go func() { - cond := q.getCond() - node := end - for !canceled { - for node.next != nil { - if canceled { - return - } - node = node.next - ch <- node.value - } - concurrent.WaitCond(cond) - } - }() - return ch, func() { - canceled = true +func (m *MessageQueueImpl[T]) Send(msg T) { + m.checkCh() + if !m.ch.TrySend(msg) { + panic(exceptions.NewIndexOutOfBound("object buffer of this MQ is full", nil)) } } - -func (q *MessageQueue[T]) Send(msg T) { - node := &messageQueueNode[T]{ - value: msg, - } - p := &q.getEnd().next - for !atomic.CompareAndSwapPointer(p, nil, node) { - for *p != nil { - p = &q.end.next - } - } - q.end = node - q.getCond().Broadcast() -} diff --git a/concurrent/collections/MessageQueue_test.go b/concurrent/collections/MessageQueue_test.go deleted file mode 100644 index 5552e2e..0000000 --- a/concurrent/collections/MessageQueue_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package collections - -import ( - "fmt" - "testing" - "time" -) - -func TestMessageQueue_Subscribe(t *testing.T) { - var mq MessageQueue[int] - for i := 0; i < 3; i++ { - id := i - go func() { - fmt.Println("run subscriber", id) - subscribe, canceler := mq.Subscribe() - for i := 0; i < 10; i++ { - msg := <-subscribe - fmt.Println(id, msg) - } - canceler() - }() - } - for i := 0; i < 16; i++ { - mq.Send(i) - } - time.Sleep(time.Second * 10) -} diff --git a/concurrent/collections/PublisherMessageQueue.go b/concurrent/collections/PublisherMessageQueue.go new file mode 100644 index 0000000..1aff637 --- /dev/null +++ b/concurrent/collections/PublisherMessageQueue.go @@ -0,0 +1,109 @@ +package collections + +import ( + "log" + "sync" + "time" + + "github.com/tursom/GoCollections/concurrent" + "github.com/tursom/GoCollections/exceptions" + "github.com/tursom/GoCollections/lang" + "github.com/tursom/GoCollections/lang/atomic" +) + +type ( + // PublisherMessageQueue + // Enable an application to announce events to multiple interested consumers asynchronously, + // without coupling the senders to the receivers + PublisherMessageQueue[T any] struct { + end *publisherMessageQueueNode[T] + lock sync.Mutex + cond concurrent.Cond + } + + publisherMessageQueueNode[T any] struct { + index int + value T + next *publisherMessageQueueNode[T] + } +) + +func (q *PublisherMessageQueue[T]) getEnd() *publisherMessageQueueNode[T] { + if q.end == nil { + q.lock.Lock() + defer q.lock.Unlock() + if q.end == nil { + q.end = &publisherMessageQueueNode[T]{} + } + } + return q.end +} + +func (q *PublisherMessageQueue[T]) getCond() concurrent.Cond { + if q.cond == nil { + q.lock.Lock() + defer q.lock.Unlock() + q.cond = concurrent.NewCond(&q.lock) + } + return q.cond +} + +func (q *PublisherMessageQueue[T]) Subscribe() lang.ReceiveChannel[T] { + end := q.getEnd() + ch := lang.NewChannel[T](0) + canceled := false + go func() { + defer ch.Close() + + cond := q.getCond() + node := &end.next + for !canceled { + // node may be nil when MQ created + for *node != nil { + if canceled { + return + } + for !ch.SendTimeout((*node).value, time.Second) && !canceled { + // check MessageQueueCapacity + if MessageQueueCapacity != -1 { + continue + } + diff := q.end.index - (*node).index + if diff >= MessageQueueWarnLimit { + log.Printf("MD is on warn stack") + } + if diff > MessageQueueCapacity { + panic(exceptions.NewIndexOutOfBound("object buffer of this MQ is full", nil)) + } + } + node = &(*node).next + } + cond.Wait() + } + }() + return lang.WithReceiveChannel[T](ch, func() { + canceled = true + }) +} + +func (q *PublisherMessageQueue[T]) Send(msg T) { + index := 0 + if q.end != nil { + index = q.end.index + 1 + } + + node := &publisherMessageQueueNode[T]{ + index: index, + value: msg, + } + + p := &q.getEnd().next + for !atomic.CompareAndSwapPointer(p, nil, node) { + for *p != nil { + p = &q.end.next + } + node.index = q.end.index + 1 + } + q.end = node + q.getCond().Broadcast() +} diff --git a/concurrent/collections/PublisherMessageQueue_test.go b/concurrent/collections/PublisherMessageQueue_test.go new file mode 100644 index 0000000..3ed5d45 --- /dev/null +++ b/concurrent/collections/PublisherMessageQueue_test.go @@ -0,0 +1,44 @@ +package collections + +import ( + "fmt" + "testing" + "time" + + "github.com/tursom/GoCollections/util/unsafe" +) + +func TestPublisherMessageQueueNode_Sizeof(t *testing.T) { + fmt.Println(unsafe.Sizeof[publisherMessageQueueNode[int]]()) +} + +func TestMessageQueue_Subscribe(t *testing.T) { + var mq PublisherMessageQueue[string] + for i := 0; i < 3; i++ { + id := i + subscribe := mq.Subscribe() + go func() { + fmt.Println("run subscriber", id) + defer func() { + subscribe.Close() + fmt.Println("subscriber", id, "closed") + }() + + for true { + msg := subscribe.Receive() + fmt.Println(id, msg) + } + }() + } + time.Sleep(time.Second / 10) + + for i := 0; i < 3; i++ { + index := i + go func() { + for j := 0; j < 16; j++ { + mq.Send(fmt.Sprintf("%d-%d", index, j)) + } + }() + } + time.Sleep(time.Second * 10) +} diff --git a/concurrent/collections/Sequence.go b/concurrent/collections/Sequence.go index 38144ee..2e986ae 100644 --- a/concurrent/collections/Sequence.go +++ b/concurrent/collections/Sequence.go @@ -3,9 +3,10 @@ package collections import ( "sync" + "github.com/tursom/GoCollections/concurrent" "github.com/tursom/GoCollections/exceptions" + "github.com/tursom/GoCollections/lang" "github.com/tursom/GoCollections/lang/atomic" - "github.com/tursom/GoCollections/util" ) type ( @@ -13,10 +14,10 @@ type ( // 数据的接收顺序与 Sender 的生成顺序保持一致 Sequence[T any] struct { lock sync.Mutex - ch chan T + ch lang.Channel[T] head *sequenceNode[T] end **sequenceNode[T] - close sync.Once + close concurrent.Once } // SequenceSender 用于给 Sequence 发送信息 @@ -35,13 +36,13 @@ type ( ) // channel 懒加载 Sequence channel -func (s *Sequence[T]) channel() chan T { +func (s *Sequence[T]) channel() lang.Channel[T] { // 经典懒加载单例写法 if s.ch == nil { s.lock.Lock() defer s.lock.Unlock() if s.ch == nil { - s.ch = make(chan T, 16) + s.ch = lang.NewChannel[T](16) } } @@ -49,10 +50,14 @@ func (s *Sequence[T]) channel() chan T { } // Channel 获取用于读取 Sequence 数据的 channel -func (s *Sequence[T]) Channel() <-chan T { +func (s *Sequence[T]) Channel() lang.ReceiveChannel[T] { return s.channel() } +func (s *Sequence[T]) RawChannel() <-chan T { + return s.channel().RCh() +} + func (s *Sequence[T]) Send(msg T) { s.Alloc().Send(msg) } @@ -85,7 +90,7 @@ func (s *Sequence[T]) send() { head := s.head for head != nil && head.sent { if head.cause == nil { - channel <- head.value + channel.Send(head.value) } else { s.Close() panic(head.cause) @@ -102,12 +107,12 @@ func (s *Sequence[T]) send() { func (s *Sequence[T]) Close() { s.close.Do(func() { - close(s.channel()) + s.channel().Close() }) } func (s *Sequence[T]) Closed() bool { - return util.OnceDone(&s.close) + return s.close.IsDone() } func (s *sequenceNode[T]) Send(value T) { diff --git a/concurrent/collections/Sequence_test.go b/concurrent/collections/Sequence_test.go index d046d61..f00a557 100644 --- a/concurrent/collections/Sequence_test.go +++ b/concurrent/collections/Sequence_test.go @@ -10,7 +10,7 @@ import ( func TestSequence_Alloc(t *testing.T) { var sequence Sequence[int] go func() { - for i := range sequence.Channel() { + for i := range sequence.RawChannel() { fmt.Println(i) } }() @@ -24,4 +24,5 @@ func TestSequence_Alloc(t *testing.T) { }() } time.Sleep(time.Second * 10) + sequence.Close() } diff --git a/concurrent/util/Pipeline.go b/concurrent/util/Pipeline.go index 2291337..dd0387c 100644 --- a/concurrent/util/Pipeline.go +++ b/concurrent/util/Pipeline.go @@ -1,6 +1,9 @@ package util -import "github.com/tursom/GoCollections/concurrent/collections" +import ( + "github.com/tursom/GoCollections/concurrent/collections" + "github.com/tursom/GoCollections/lang" +) type ( pipeline[T, R any] struct { @@ -9,16 +12,16 @@ type ( } ) -func NewPipeline[T, R any](producer <-chan T, concurrency int, consumer func(T) R) <-chan R { +func NewPipeline[T, R any](producer <-chan T, concurrency int, consumer func(T) R) lang.ReceiveChannel[R] { var sequence collections.Sequence[R] - c := make(chan pipeline[T, R]) + c := lang.NewChannel[pipeline[T, R]](0) go func() { defer close(c) for value := range producer { - c <- pipeline[T, R]{ + c.Send(pipeline[T, R]{ value: value, sender: sequence.Alloc(), - } + }) } }() diff --git a/concurrent/util/Pipeline_test.go b/concurrent/util/Pipeline_test.go index 86f2ef9..397c1ad 100644 --- a/concurrent/util/Pipeline_test.go +++ b/concurrent/util/Pipeline_test.go @@ -20,7 +20,7 @@ func TestPipeline(t *testing.T) { time.Sleep(time.Second) }() - for i := range r { + for i := range r.RCh() { fmt.Println(i) } } diff --git a/go.mod b/go.mod index f0bd38a..3d9f0a4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/tursom/GoCollections -go 1.18 +go 1.19 require ( github.com/petermattis/goid v0.0.0-20220302125637-5f11c28912df diff --git a/lang/Channel.go b/lang/Channel.go new file mode 100644 index 0000000..aaec53b --- /dev/null +++ b/lang/Channel.go @@ -0,0 +1,110 @@ +package lang + +import "time" + +type ( + SendChannel[T any] interface { + Close() + SCh() chan<- T + Send(obj T) + TrySend(obj T) bool + SendTimeout(obj T, timeout time.Duration) bool + } + + ReceiveChannel[T any] interface { + Close() + RCh() <-chan T + Receive() T + TryReceive() (T, bool) + ReceiveTimeout(timeout time.Duration) (T, bool) + } + + Channel[T any] interface { + SendChannel[T] + ReceiveChannel[T] + Ch() chan T + } + + RawChannel[T any] chan T + + withReceiveChannel[T any] struct { + ReceiveChannel[T] + closer func() + } +) + +func NewChannel[T any](cap int) RawChannel[T] { + return make(chan T, cap) +} + +func WithReceiveChannel[T any](channel ReceiveChannel[T], closer func()) ReceiveChannel[T] { + return withReceiveChannel[T]{channel, closer} +} + +func (ch RawChannel[T]) Ch() chan T { + return ch +} + +func (ch RawChannel[T]) Close() { + close(ch) +} + +func (ch RawChannel[T]) SCh() chan<- T { + return ch +} + +func (ch RawChannel[T]) Send(obj T) { + ch <- obj +} + +func (ch RawChannel[T]) TrySend(obj T) bool { + select { + case ch <- obj: + return true + default: + return false + } +} + +func (ch RawChannel[T]) SendTimeout(obj T, timeout time.Duration) bool { + select { + case ch <- obj: + return true + case <-time.After(timeout): + return false + } +} + +func (ch RawChannel[T]) RCh() <-chan T { + return ch +} + +func (ch RawChannel[T]) Receive() T { + return <-ch +} + +func (ch RawChannel[T]) TryReceive() (T, bool) { + select { + case obj := <-ch: + return obj, true + default: + return Nil[T](), false + } +} + +func (ch RawChannel[T]) ReceiveTimeout(timeout time.Duration) (T, bool) { + select { + case obj := <-ch: + return obj, true + case <-time.After(timeout): + return Nil[T](), false + } +} + +func (i withReceiveChannel[T]) Close() { + if i.closer != nil { + i.closer() + } else { + i.ReceiveChannel.Close() + } +} diff --git a/lang/Channel_test.go b/lang/Channel_test.go new file mode 100644 index 0000000..46f6937 --- /dev/null +++ b/lang/Channel_test.go @@ -0,0 +1,26 @@ +package lang + +import ( + "fmt" + "testing" + "time" +) + +func TestChannel_Send(t *testing.T) { + ch := NewChannel[int](0) + + fmt.Println(ch.TrySend(0)) + + go func() { + for i := range ch { + fmt.Println(i) + } + }() + + ch.Send(1) + ch.SCh() <- 2 + time.Sleep(time.Second / 10) + fmt.Println(ch.TrySend(3)) + + ch.Close() +} diff --git a/lang/atomic/Array.go b/lang/atomic/Array.go index 44cbeaf..30953d2 100644 --- a/lang/atomic/Array.go +++ b/lang/atomic/Array.go @@ -2,7 +2,7 @@ package atomic type ( Array[T any] struct { - atomic *Atomic[T] + atomic Atomic[T] array []T } Int32Array struct { @@ -36,7 +36,7 @@ func CapArray[T any](array []*T) *Array[*T] { func NewInt32Array(size int) *Int32Array { return &Int32Array{ Array[int32]{ - atomic: &Int32F, + atomic: Int32F, array: make([]int32, size), }, } @@ -45,7 +45,7 @@ func NewInt32Array(size int) *Int32Array { func NewInt64Array(size int) *Int64Array { return &Int64Array{ Array[int64]{ - atomic: &Int64F, + atomic: Int64F, array: make([]int64, size), }, } @@ -54,7 +54,7 @@ func NewInt64Array(size int) *Int64Array { func NewUInt32Array(size int) *UInt32Array { return &UInt32Array{ Array[uint32]{ - atomic: &UInt32F, + atomic: UInt32F, array: make([]uint32, size), }, } @@ -63,7 +63,7 @@ func NewUInt32Array(size int) *UInt32Array { func NewUInt64Array(size int) *UInt64Array { return &UInt64Array{ Array[uint64]{ - atomic: &UInt64F, + atomic: UInt64F, array: make([]uint64, size), }, } @@ -78,19 +78,19 @@ func (a *Array[T]) Array() []T { } func (a *Array[T]) Get(index int) T { - return a.atomic.Load(&a.array[index]) + return a.atomic.Load()(&a.array[index]) } func (a *Array[T]) Set(index int, p T) { - a.atomic.Store(&a.array[index], p) + a.atomic.Store()(&a.array[index], p) } func (a *Array[T]) Swap(index int, p T) (old T) { - return a.atomic.Swap(&a.array[index], p) + return a.atomic.Swap()(&a.array[index], p) } func (a *Array[T]) CompareAndSwap(index int, old, new T) (swapped bool) { - return a.atomic.CompareAndSwap(&a.array[index], old, new) + return a.atomic.CompareAndSwap()(&a.array[index], old, new) } func (a *Int32Array) Add(index int, value int32) { diff --git a/lang/atomic/Atomic.go b/lang/atomic/Atomic.go index a9c40c9..edc1d73 100644 --- a/lang/atomic/Atomic.go +++ b/lang/atomic/Atomic.go @@ -20,41 +20,51 @@ type ( CompareAndSwapBit(bit int, old, new bool) (swapped bool) } - Atomic[T any] struct { - Swap func(addr *T, new T) (old T) - CompareAndSwap func(addr *T, old, new T) (swapped bool) - Load func(addr *T) (val T) - Store func(addr *T, val T) + Atomic[T any] interface { + Swap() func(addr *T, new T) (old T) + CompareAndSwap() func(addr *T, old, new T) (swapped bool) + Load() func(addr *T) (val T) + Store() func(addr *T, val T) + } + + atomicImpl[T any] struct { + swap func(addr *T, new T) (old T) + compareAndSwap func(addr *T, old, new T) (swapped bool) + load func(addr *T) (val T) + store func(addr *T, val T) + } + + atomicTyped[T any] struct { } ) //goland:noinspection GoUnusedGlobalVariable var ( - Int32F = Atomic[int32]{ + Int32F Atomic[int32] = &atomicImpl[int32]{ SwapInt32, CompareAndSwapInt32, LoadInt32, StoreInt32, } - Int64F = Atomic[int64]{ + Int64F Atomic[int64] = &atomicImpl[int64]{ SwapInt64, CompareAndSwapInt64, LoadInt64, StoreInt64, } - UInt32F = Atomic[uint32]{ + UInt32F Atomic[uint32] = &atomicImpl[uint32]{ SwapUInt32, CompareAndSwapUInt32, LoadUint32, StoreUInt32, } - UInt64F = Atomic[uint64]{ + UInt64F Atomic[uint64] = &atomicImpl[uint64]{ SwapUInt64, CompareAndSwapUInt64, LoadUint64, StoreUInt64, } - PointerF = Atomic[unsafe.Pointer]{ + PointerF Atomic[unsafe.Pointer] = &atomicImpl[unsafe.Pointer]{ UnsafeSwapPointer, UnsafeCompareAndSwapPointer, UnsafeLoadPointer, @@ -62,13 +72,40 @@ var ( } ) -func GetAtomic[T any]() *Atomic[*T] { - return &Atomic[*T]{ - SwapPointer[T], - CompareAndSwapPointer[T], - LoadPointer[T], - StorePointer[T], - } +func GetAtomic[T any]() Atomic[*T] { + return atomicTyped[T]{} +} + +func (a atomicTyped[T]) Swap() func(addr **T, new *T) (old *T) { + return SwapPointer[T] +} + +func (a atomicTyped[T]) CompareAndSwap() func(addr **T, old *T, new *T) (swapped bool) { + return CompareAndSwapPointer[T] +} + +func (a atomicTyped[T]) Load() func(addr **T) (val *T) { + return LoadPointer[T] +} + +func (a atomicTyped[T]) Store() func(addr **T, val *T) { + return StorePointer[T] +} + +func (a *atomicImpl[T]) Swap() func(addr *T, new T) (old T) { + return a.swap +} + +func (a *atomicImpl[T]) CompareAndSwap() func(addr *T, old T, new T) (swapped bool) { + return a.compareAndSwap +} + +func (a *atomicImpl[T]) Load() func(addr *T) (val T) { + return a.load +} + +func (a *atomicImpl[T]) Store() func(addr *T, val T) { + return a.store } func SwapInt32(addr *int32, new int32) (old int32) { diff --git a/lang/atomic/Reference.go b/lang/atomic/Reference.go index 49690d2..fc0ff97 100644 --- a/lang/atomic/Reference.go +++ b/lang/atomic/Reference.go @@ -3,6 +3,8 @@ package atomic import ( "sync/atomic" "unsafe" + + "github.com/tursom/GoCollections/lang" ) //goland:noinspection GoUnusedGlobalVariable @@ -14,35 +16,64 @@ var ( ) type ( - Pointer = unsafe.Pointer - PPointer = *unsafe.Pointer + Pointer = unsafe.Pointer + PPointer = *unsafe.Pointer + + // Reference atomic type T reference Reference[T any] struct { - reference *T + lang.BaseObject + p *T } ) +// NewReference new *Reference[T] init by given reference func NewReference[T any](reference *T) *Reference[T] { - return &Reference[T]{reference} + return lang.ForceCast[Reference[T]](Pointer(&reference)) } -func (v *Reference[T]) Load() (val *T) { - return LoadPointer(&v.reference) +// ReferenceOf cast **T to *Reference[T] +func ReferenceOf[T any](reference **T) *Reference[T] { + return lang.ForceCast[Reference[T]](Pointer(reference)) } -func (v *Reference[T]) Store(val *T) { - StorePointer(&v.reference, val) +func ReferenceUintptr[T any](reference *uintptr) *Reference[T] { + return lang.ForceCast[Reference[T]](Pointer(reference)) } -func (v *Reference[T]) Swap(new *T) (old *T) { - return SwapPointer(&v.reference, new) +func (r *Reference[T]) AsPointer() Pointer { + return Pointer(r) } -func (v *Reference[T]) CompareAndSwap(old, new *T) (swapped bool) { - return CompareAndSwapPointer(&v.reference, old, new) +func (r *Reference[T]) AsPPointer() PPointer { + return PPointer(r.AsPointer()) } -func AsPPointer[T any](p **T) *unsafe.Pointer { - return (*unsafe.Pointer)(unsafe.Pointer(p)) +func (r *Reference[T]) AsUintptr() *TypedUintptr[T] { + return lang.ForceCast[TypedUintptr[T]](r.AsPointer()) +} + +func (r *Reference[T]) pointer() **T { + return &r.p +} + +func (r *Reference[T]) Load() (val *T) { + return LoadPointer(r.pointer()) +} + +func (r *Reference[T]) Store(val *T) { + StorePointer(r.pointer(), val) +} + +func (r *Reference[T]) Swap(new *T) (old *T) { + return SwapPointer(r.pointer(), new) +} + +func (r *Reference[T]) CompareAndSwap(old, new *T) (swapped bool) { + return CompareAndSwapPointer(r.pointer(), old, new) +} + +func AsPPointer[T any](p **T) PPointer { + return PPointer(Pointer(p)) } func LoadPointer[T any](addr **T) (val *T) { @@ -50,11 +81,11 @@ func LoadPointer[T any](addr **T) (val *T) { } func StorePointer[T any](addr **T, val *T) { - atomic.StorePointer(AsPPointer(addr), unsafe.Pointer(val)) + atomic.StorePointer(AsPPointer(addr), Pointer(val)) } func SwapPointer[T any](addr **T, new *T) (old *T) { - return (*T)(atomic.SwapPointer(AsPPointer(addr), unsafe.Pointer(new))) + return (*T)(atomic.SwapPointer(AsPPointer(addr), Pointer(new))) } func CompareAndSwapPointer[T any](addr **T, old, new *T) (swapped bool) { diff --git a/lang/atomic/Reference_test.go b/lang/atomic/Reference_test.go index 10fe162..131b4ac 100644 --- a/lang/atomic/Reference_test.go +++ b/lang/atomic/Reference_test.go @@ -2,14 +2,26 @@ package atomic import ( "fmt" - "github.com/tursom/GoCollections/lang" "testing" ) func TestAtomic_Store(t *testing.T) { - a := &Reference[lang.Int]{} - var i lang.Int = 1 + a := NewReference[int](nil) + var i = 1 a.Store(&i) i = 2 - fmt.Println(a.Load()) + fmt.Println(*a.Load()) +} + +func TestReferenceOf(t *testing.T) { + one := 1 + + var p *int = nil + ref := ReferenceOf(&p) + + ref.Store(&one) + fmt.Println(ref.Load()) + fmt.Println(*ref.Load()) + + _ = *ref.AsUintptr() + 1 } diff --git a/lang/atomic/Uintptr.go b/lang/atomic/Uintptr.go new file mode 100644 index 0000000..9230583 --- /dev/null +++ b/lang/atomic/Uintptr.go @@ -0,0 +1,87 @@ +package atomic + +import ( + "sync/atomic" + + "github.com/tursom/GoCollections/lang" +) + +type ( + Uintptr uintptr + TypedUintptr[T any] Uintptr +) + +func (p Uintptr) Raw() uintptr { + return uintptr(p) +} + +func (p *Uintptr) RawP() *uintptr { + return (*uintptr)(p) +} + +func (p *Uintptr) AsPointer() Pointer { + return Pointer(p) +} + +func (p *Uintptr) AsPPointer() PPointer { + return PPointer(p.AsPointer()) +} +func (p *Uintptr) Load() (val Uintptr) { + return Uintptr(atomic.LoadUintptr((*uintptr)(p))) +} + +func (p *Uintptr) Store(val Uintptr) { + atomic.StoreUintptr((*uintptr)(p), uintptr(val)) +} + +func (p *Uintptr) Swap(new Uintptr) (old Uintptr) { + return Uintptr(atomic.SwapUintptr((*uintptr)(p), uintptr(new))) +} + +func (p *Uintptr) CompareAndSwap(old, new Uintptr) (swapped bool) { + return atomic.CompareAndSwapUintptr((*uintptr)(p), uintptr(old), uintptr(new)) +} + +func (p TypedUintptr[T]) Raw() uintptr { + return uintptr(p) +} + +func (p *TypedUintptr[T]) RawP() *uintptr { + return (*uintptr)(p) +} + +func (p *TypedUintptr[T]) AsPointer() Pointer { + return Pointer(p) +} + +func (p *TypedUintptr[T]) AsPPointer() PPointer { + return PPointer(p.AsPointer()) +} + +func (tp *TypedUintptr[T]) AsReference() *Reference[T] { + return lang.ForceCast[Reference[T]](Pointer(tp)) +} + +func (tp TypedUintptr[T]) Uintptr() Uintptr { + return Uintptr(tp) +} + +func (tp *TypedUintptr[T]) PUintptr() *Uintptr { + return (*Uintptr)(tp) +} + +func (tp *TypedUintptr[T]) Load() TypedUintptr[T] { + return TypedUintptr[T](atomic.LoadUintptr((*uintptr)(tp))) +} + +func (tp *TypedUintptr[T]) Store(val TypedUintptr[T]) { + atomic.StoreUintptr((*uintptr)(tp), uintptr(val)) +} + +func (tp *TypedUintptr[T]) Swap(new TypedUintptr[T]) (old TypedUintptr[T]) { + return TypedUintptr[T](atomic.SwapUintptr((*uintptr)(tp), uintptr(new))) +} + +func (tp *TypedUintptr[T]) CompareAndSwap(old, new TypedUintptr[T]) (swapped bool) { + return atomic.CompareAndSwapUintptr((*uintptr)(tp), uintptr(old), uintptr(new)) +} diff --git a/util/Utils.go b/util/Utils.go index ef18588..b770853 100644 --- a/util/Utils.go +++ b/util/Utils.go @@ -1,21 +1,11 @@ package util -import ( - "sync" - "unsafe" -) - type ( Stringer struct { stringer func() string } ) -func OnceDone(once *sync.Once) bool { - i := *(*uint32)(unsafe.Pointer(once)) - return i != 0 -} - func NewStringer(stringer func() string) Stringer { return Stringer{ stringer: stringer, diff --git a/util/unsafe/unsafe.go b/util/unsafe/unsafe.go new file mode 100644 index 0000000..ff849ec --- /dev/null +++ b/util/unsafe/unsafe.go @@ -0,0 +1,11 @@ +package unsafe + +import ( + "unsafe" + + "github.com/tursom/GoCollections/lang" +) + +func Sizeof[T any]() uintptr { + return unsafe.Sizeof(lang.Nil[T]()) +}