mirror of
https://github.com/tursom/GoCollections.git
synced 2025-02-25 03:10:20 +08:00
提供更高层次的封装
This commit is contained in:
parent
09bd3376a7
commit
751d09a1a7
60
concurrent/Cond.go
Normal file
60
concurrent/Cond.go
Normal file
@ -0,0 +1,60 @@
|
||||
package concurrent
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
Cond interface {
|
||||
Signal()
|
||||
Broadcast()
|
||||
// Wait wait within unlocked environment
|
||||
Wait()
|
||||
// LockedWait wait within locked environment
|
||||
LockedWait()
|
||||
WaitOrTimeout(timeout time.Duration) bool
|
||||
}
|
||||
|
||||
CondImpl sync.Cond
|
||||
)
|
||||
|
||||
func NewCond(l sync.Locker) Cond {
|
||||
return (*CondImpl)(sync.NewCond(l))
|
||||
}
|
||||
|
||||
func (c *CondImpl) Signal() {
|
||||
(*sync.Cond)(c).Signal()
|
||||
}
|
||||
|
||||
func (c *CondImpl) Broadcast() {
|
||||
(*sync.Cond)(c).Broadcast()
|
||||
}
|
||||
|
||||
func (c *CondImpl) Wait() {
|
||||
c.L.Lock()
|
||||
defer c.L.Unlock()
|
||||
(*sync.Cond)(c).Wait()
|
||||
}
|
||||
|
||||
func (c *CondImpl) LockedWait() {
|
||||
(*sync.Cond)(c).Wait()
|
||||
}
|
||||
|
||||
func (c *CondImpl) WaitOrTimeout(timeout time.Duration) bool {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
c.L.Lock()
|
||||
defer c.L.Unlock()
|
||||
|
||||
c.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
return false
|
||||
case <-done:
|
||||
return true
|
||||
}
|
||||
}
|
19
concurrent/Once.go
Normal file
19
concurrent/Once.go
Normal file
@ -0,0 +1,19 @@
|
||||
package concurrent
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type (
|
||||
Once sync.Once
|
||||
)
|
||||
|
||||
func (o *Once) Do(f func()) {
|
||||
(*sync.Once)(o).Do(f)
|
||||
}
|
||||
|
||||
func (o *Once) IsDone() bool {
|
||||
i := *(*uint32)(unsafe.Pointer(o))
|
||||
return i != 0
|
||||
}
|
@ -3,76 +3,55 @@ package collections
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/tursom/GoCollections/concurrent"
|
||||
"github.com/tursom/GoCollections/lang/atomic"
|
||||
"github.com/tursom/GoCollections/exceptions"
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
)
|
||||
|
||||
var (
|
||||
// MessageQueueCapacity message capacity of MQ
|
||||
// -1 to unlimited
|
||||
// this variable can let you discover problems before OOM crash
|
||||
MessageQueueCapacity = 128
|
||||
MessageQueueWarnLimit = MessageQueueCapacity / 2
|
||||
)
|
||||
|
||||
type (
|
||||
MessageQueue[T any] struct {
|
||||
lock sync.Mutex
|
||||
cond *sync.Cond
|
||||
end *messageQueueNode[T]
|
||||
MessageQueue[T any] interface {
|
||||
// Subscribe subscribe this message queue
|
||||
Subscribe() lang.ReceiveChannel[T]
|
||||
Send(msg T)
|
||||
}
|
||||
|
||||
messageQueueNode[T any] struct {
|
||||
value T
|
||||
next *messageQueueNode[T]
|
||||
MessageQueueImpl[T lang.Object] struct {
|
||||
chLock sync.Mutex
|
||||
ch lang.Channel[T]
|
||||
}
|
||||
)
|
||||
|
||||
func (q *MessageQueue[T]) getEnd() *messageQueueNode[T] {
|
||||
if q.end == nil {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if q.end == nil {
|
||||
q.end = &messageQueueNode[T]{}
|
||||
}
|
||||
}
|
||||
return q.end
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) getCond() *sync.Cond {
|
||||
if q.cond == nil {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
q.cond = sync.NewCond(&q.lock)
|
||||
}
|
||||
return q.cond
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) Subscribe() (in <-chan T, canceler func()) {
|
||||
end := q.getEnd()
|
||||
ch := make(chan T)
|
||||
canceled := false
|
||||
go func() {
|
||||
cond := q.getCond()
|
||||
node := end
|
||||
for !canceled {
|
||||
for node.next != nil {
|
||||
if canceled {
|
||||
func (m *MessageQueueImpl[T]) checkCh() {
|
||||
if m.ch != nil {
|
||||
return
|
||||
}
|
||||
node = node.next
|
||||
ch <- node.value
|
||||
}
|
||||
concurrent.WaitCond(cond)
|
||||
}
|
||||
}()
|
||||
return ch, func() {
|
||||
canceled = true
|
||||
|
||||
m.chLock.Lock()
|
||||
defer m.chLock.Unlock()
|
||||
|
||||
if m.ch != nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.ch = lang.NewChannel[T](MessageQueueCapacity)
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) Send(msg T) {
|
||||
node := &messageQueueNode[T]{
|
||||
value: msg,
|
||||
}
|
||||
p := &q.getEnd().next
|
||||
for !atomic.CompareAndSwapPointer(p, nil, node) {
|
||||
for *p != nil {
|
||||
p = &q.end.next
|
||||
}
|
||||
}
|
||||
q.end = node
|
||||
q.getCond().Broadcast()
|
||||
func (m *MessageQueueImpl[T]) Subscribe() lang.ReceiveChannel[T] {
|
||||
m.checkCh()
|
||||
// package ch, remove closer to empty body
|
||||
// closer is nil will just close this channel
|
||||
return lang.WithReceiveChannel[T](m.ch, func() {})
|
||||
}
|
||||
|
||||
func (m *MessageQueueImpl[T]) Send(msg T) {
|
||||
m.checkCh()
|
||||
if !m.ch.TrySend(msg) {
|
||||
panic(exceptions.NewIndexOutOfBound("object buffer of this MQ is full", nil))
|
||||
}
|
||||
}
|
||||
|
@ -1,27 +0,0 @@
|
||||
package collections
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMessageQueue_Subscribe(t *testing.T) {
|
||||
var mq MessageQueue[int]
|
||||
for i := 0; i < 3; i++ {
|
||||
id := i
|
||||
go func() {
|
||||
fmt.Println("run subscriber", id)
|
||||
subscribe, canceler := mq.Subscribe()
|
||||
for i := 0; i < 10; i++ {
|
||||
msg := <-subscribe
|
||||
fmt.Println(id, msg)
|
||||
}
|
||||
canceler()
|
||||
}()
|
||||
}
|
||||
for i := 0; i < 16; i++ {
|
||||
mq.Send(i)
|
||||
}
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
109
concurrent/collections/PublisherMessageQueue.go
Normal file
109
concurrent/collections/PublisherMessageQueue.go
Normal file
@ -0,0 +1,109 @@
|
||||
package collections
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tursom/GoCollections/concurrent"
|
||||
"github.com/tursom/GoCollections/exceptions"
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
"github.com/tursom/GoCollections/lang/atomic"
|
||||
)
|
||||
|
||||
type (
|
||||
// PublisherMessageQueue
|
||||
// Enable an application to announce events to multiple interested consumers asynchronously,
|
||||
// without coupling the senders to the receivers
|
||||
PublisherMessageQueue[T any] struct {
|
||||
end *publisherMessageQueueNode[T]
|
||||
lock sync.Mutex
|
||||
cond concurrent.Cond
|
||||
}
|
||||
|
||||
publisherMessageQueueNode[T any] struct {
|
||||
index int
|
||||
value T
|
||||
next *publisherMessageQueueNode[T]
|
||||
}
|
||||
)
|
||||
|
||||
func (q *PublisherMessageQueue[T]) getEnd() *publisherMessageQueueNode[T] {
|
||||
if q.end == nil {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if q.end == nil {
|
||||
q.end = &publisherMessageQueueNode[T]{}
|
||||
}
|
||||
}
|
||||
return q.end
|
||||
}
|
||||
|
||||
func (q *PublisherMessageQueue[T]) getCond() concurrent.Cond {
|
||||
if q.cond == nil {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
q.cond = concurrent.NewCond(&q.lock)
|
||||
}
|
||||
return q.cond
|
||||
}
|
||||
|
||||
func (q *PublisherMessageQueue[T]) Subscribe() lang.ReceiveChannel[T] {
|
||||
end := q.getEnd()
|
||||
ch := lang.NewChannel[T](0)
|
||||
canceled := false
|
||||
go func() {
|
||||
defer ch.Close()
|
||||
|
||||
cond := q.getCond()
|
||||
node := &end.next
|
||||
for !canceled {
|
||||
// node may be nil when MQ created
|
||||
for *node != nil {
|
||||
if canceled {
|
||||
return
|
||||
}
|
||||
for !ch.SendTimeout((*node).value, time.Second) && !canceled {
|
||||
// check MessageQueueCapacity
|
||||
if MessageQueueCapacity != -1 {
|
||||
continue
|
||||
}
|
||||
diff := q.end.index - (*node).index
|
||||
if diff >= MessageQueueWarnLimit {
|
||||
log.Printf("MD is on warn stack")
|
||||
}
|
||||
if diff > MessageQueueCapacity {
|
||||
panic(exceptions.NewIndexOutOfBound("object buffer of this MQ is full", nil))
|
||||
}
|
||||
}
|
||||
node = &(*node).next
|
||||
}
|
||||
cond.Wait()
|
||||
}
|
||||
}()
|
||||
return lang.WithReceiveChannel[T](ch, func() {
|
||||
canceled = true
|
||||
})
|
||||
}
|
||||
|
||||
func (q *PublisherMessageQueue[T]) Send(msg T) {
|
||||
index := 0
|
||||
if q.end != nil {
|
||||
index = q.end.index + 1
|
||||
}
|
||||
|
||||
node := &publisherMessageQueueNode[T]{
|
||||
index: index,
|
||||
value: msg,
|
||||
}
|
||||
|
||||
p := &q.getEnd().next
|
||||
for !atomic.CompareAndSwapPointer(p, nil, node) {
|
||||
for *p != nil {
|
||||
p = &q.end.next
|
||||
}
|
||||
node.index = q.end.index + 1
|
||||
}
|
||||
q.end = node
|
||||
q.getCond().Broadcast()
|
||||
}
|
44
concurrent/collections/PublisherMessageQueue_test.go
Normal file
44
concurrent/collections/PublisherMessageQueue_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
package collections
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tursom/GoCollections/util/unsafe"
|
||||
)
|
||||
|
||||
func TestPublisherMessageQueueNode_Sizeof(t *testing.T) {
|
||||
fmt.Println(unsafe.Sizeof[publisherMessageQueueNode[int]]())
|
||||
}
|
||||
|
||||
func TestMessageQueue_Subscribe(t *testing.T) {
|
||||
var mq PublisherMessageQueue[string]
|
||||
for i := 0; i < 3; i++ {
|
||||
id := i
|
||||
subscribe := mq.Subscribe()
|
||||
go func() {
|
||||
fmt.Println("run subscriber", id)
|
||||
defer func() {
|
||||
subscribe.Close()
|
||||
fmt.Println("subscriber", id, "closed")
|
||||
}()
|
||||
|
||||
for true {
|
||||
msg := subscribe.Receive()
|
||||
fmt.Println(id, msg)
|
||||
}
|
||||
}()
|
||||
}
|
||||
time.Sleep(time.Second / 10)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
index := i
|
||||
go func() {
|
||||
for j := 0; j < 16; j++ {
|
||||
mq.Send(fmt.Sprintf("%d-%d", index, j))
|
||||
}
|
||||
}()
|
||||
}
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
@ -3,9 +3,10 @@ package collections
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/tursom/GoCollections/concurrent"
|
||||
"github.com/tursom/GoCollections/exceptions"
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
"github.com/tursom/GoCollections/lang/atomic"
|
||||
"github.com/tursom/GoCollections/util"
|
||||
)
|
||||
|
||||
type (
|
||||
@ -13,10 +14,10 @@ type (
|
||||
// 数据的接收顺序与 Sender 的生成顺序保持一致
|
||||
Sequence[T any] struct {
|
||||
lock sync.Mutex
|
||||
ch chan T
|
||||
ch lang.Channel[T]
|
||||
head *sequenceNode[T]
|
||||
end **sequenceNode[T]
|
||||
close sync.Once
|
||||
close concurrent.Once
|
||||
}
|
||||
|
||||
// SequenceSender 用于给 Sequence 发送信息
|
||||
@ -35,13 +36,13 @@ type (
|
||||
)
|
||||
|
||||
// channel 懒加载 Sequence channel
|
||||
func (s *Sequence[T]) channel() chan T {
|
||||
func (s *Sequence[T]) channel() lang.Channel[T] {
|
||||
// 经典懒加载单例写法
|
||||
if s.ch == nil {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if s.ch == nil {
|
||||
s.ch = make(chan T, 16)
|
||||
s.ch = lang.NewChannel[T](16)
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,10 +50,14 @@ func (s *Sequence[T]) channel() chan T {
|
||||
}
|
||||
|
||||
// Channel 获取用于读取 Sequence 数据的 channel
|
||||
func (s *Sequence[T]) Channel() <-chan T {
|
||||
func (s *Sequence[T]) Channel() lang.ReceiveChannel[T] {
|
||||
return s.channel()
|
||||
}
|
||||
|
||||
func (s *Sequence[T]) RawChannel() <-chan T {
|
||||
return s.channel().RCh()
|
||||
}
|
||||
|
||||
func (s *Sequence[T]) Send(msg T) {
|
||||
s.Alloc().Send(msg)
|
||||
}
|
||||
@ -85,7 +90,7 @@ func (s *Sequence[T]) send() {
|
||||
head := s.head
|
||||
for head != nil && head.sent {
|
||||
if head.cause == nil {
|
||||
channel <- head.value
|
||||
channel.Send(head.value)
|
||||
} else {
|
||||
s.Close()
|
||||
panic(head.cause)
|
||||
@ -102,12 +107,12 @@ func (s *Sequence[T]) send() {
|
||||
|
||||
func (s *Sequence[T]) Close() {
|
||||
s.close.Do(func() {
|
||||
close(s.channel())
|
||||
s.channel().Close()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Sequence[T]) Closed() bool {
|
||||
return util.OnceDone(&s.close)
|
||||
return s.close.IsDone()
|
||||
}
|
||||
|
||||
func (s *sequenceNode[T]) Send(value T) {
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
func TestSequence_Alloc(t *testing.T) {
|
||||
var sequence Sequence[int]
|
||||
go func() {
|
||||
for i := range sequence.Channel() {
|
||||
for i := range sequence.RawChannel() {
|
||||
fmt.Println(i)
|
||||
}
|
||||
}()
|
||||
@ -24,4 +24,5 @@ func TestSequence_Alloc(t *testing.T) {
|
||||
}()
|
||||
}
|
||||
time.Sleep(time.Second * 10)
|
||||
sequence.Close()
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
package util
|
||||
|
||||
import "github.com/tursom/GoCollections/concurrent/collections"
|
||||
import (
|
||||
"github.com/tursom/GoCollections/concurrent/collections"
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
)
|
||||
|
||||
type (
|
||||
pipeline[T, R any] struct {
|
||||
@ -9,16 +12,16 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
func NewPipeline[T, R any](producer <-chan T, concurrency int, consumer func(T) R) <-chan R {
|
||||
func NewPipeline[T, R any](producer <-chan T, concurrency int, consumer func(T) R) lang.ReceiveChannel[R] {
|
||||
var sequence collections.Sequence[R]
|
||||
c := make(chan pipeline[T, R])
|
||||
c := lang.NewChannel[pipeline[T, R]](0)
|
||||
go func() {
|
||||
defer close(c)
|
||||
for value := range producer {
|
||||
c <- pipeline[T, R]{
|
||||
c.Send(pipeline[T, R]{
|
||||
value: value,
|
||||
sender: sequence.Alloc(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -20,7 +20,7 @@ func TestPipeline(t *testing.T) {
|
||||
time.Sleep(time.Second)
|
||||
}()
|
||||
|
||||
for i := range r {
|
||||
for i := range r.RCh() {
|
||||
fmt.Println(i)
|
||||
}
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -1,6 +1,6 @@
|
||||
module github.com/tursom/GoCollections
|
||||
|
||||
go 1.18
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/petermattis/goid v0.0.0-20220302125637-5f11c28912df
|
||||
|
110
lang/Channel.go
Normal file
110
lang/Channel.go
Normal file
@ -0,0 +1,110 @@
|
||||
package lang
|
||||
|
||||
import "time"
|
||||
|
||||
type (
|
||||
SendChannel[T any] interface {
|
||||
Close()
|
||||
SCh() chan<- T
|
||||
Send(obj T)
|
||||
TrySend(obj T) bool
|
||||
SendTimeout(obj T, timeout time.Duration) bool
|
||||
}
|
||||
|
||||
ReceiveChannel[T any] interface {
|
||||
Close()
|
||||
RCh() <-chan T
|
||||
Receive() T
|
||||
TryReceive() (T, bool)
|
||||
ReceiveTimeout(timeout time.Duration) (T, bool)
|
||||
}
|
||||
|
||||
Channel[T any] interface {
|
||||
SendChannel[T]
|
||||
ReceiveChannel[T]
|
||||
Ch() chan T
|
||||
}
|
||||
|
||||
RawChannel[T any] chan T
|
||||
|
||||
withReceiveChannel[T any] struct {
|
||||
ReceiveChannel[T]
|
||||
closer func()
|
||||
}
|
||||
)
|
||||
|
||||
func NewChannel[T any](cap int) RawChannel[T] {
|
||||
return make(chan T, cap)
|
||||
}
|
||||
|
||||
func WithReceiveChannel[T any](channel ReceiveChannel[T], closer func()) ReceiveChannel[T] {
|
||||
return withReceiveChannel[T]{channel, closer}
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) Ch() chan T {
|
||||
return ch
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) Close() {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) SCh() chan<- T {
|
||||
return ch
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) Send(obj T) {
|
||||
ch <- obj
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) TrySend(obj T) bool {
|
||||
select {
|
||||
case ch <- obj:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) SendTimeout(obj T, timeout time.Duration) bool {
|
||||
select {
|
||||
case ch <- obj:
|
||||
return true
|
||||
case <-time.After(timeout):
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) RCh() <-chan T {
|
||||
return ch
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) Receive() T {
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) TryReceive() (T, bool) {
|
||||
select {
|
||||
case obj := <-ch:
|
||||
return obj, true
|
||||
default:
|
||||
return Nil[T](), false
|
||||
}
|
||||
}
|
||||
|
||||
func (ch RawChannel[T]) ReceiveTimeout(timeout time.Duration) (T, bool) {
|
||||
select {
|
||||
case obj := <-ch:
|
||||
return obj, true
|
||||
case <-time.After(timeout):
|
||||
return Nil[T](), false
|
||||
}
|
||||
}
|
||||
|
||||
func (i withReceiveChannel[T]) Close() {
|
||||
if i.closer != nil {
|
||||
i.closer()
|
||||
} else {
|
||||
i.ReceiveChannel.Close()
|
||||
}
|
||||
}
|
26
lang/Channel_test.go
Normal file
26
lang/Channel_test.go
Normal file
@ -0,0 +1,26 @@
|
||||
package lang
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestChannel_Send(t *testing.T) {
|
||||
ch := NewChannel[int](0)
|
||||
|
||||
fmt.Println(ch.TrySend(0))
|
||||
|
||||
go func() {
|
||||
for i := range ch {
|
||||
fmt.Println(i)
|
||||
}
|
||||
}()
|
||||
|
||||
ch.Send(1)
|
||||
ch.SCh() <- 2
|
||||
time.Sleep(time.Second / 10)
|
||||
fmt.Println(ch.TrySend(3))
|
||||
|
||||
ch.Close()
|
||||
}
|
@ -2,7 +2,7 @@ package atomic
|
||||
|
||||
type (
|
||||
Array[T any] struct {
|
||||
atomic *Atomic[T]
|
||||
atomic Atomic[T]
|
||||
array []T
|
||||
}
|
||||
Int32Array struct {
|
||||
@ -36,7 +36,7 @@ func CapArray[T any](array []*T) *Array[*T] {
|
||||
func NewInt32Array(size int) *Int32Array {
|
||||
return &Int32Array{
|
||||
Array[int32]{
|
||||
atomic: &Int32F,
|
||||
atomic: Int32F,
|
||||
array: make([]int32, size),
|
||||
},
|
||||
}
|
||||
@ -45,7 +45,7 @@ func NewInt32Array(size int) *Int32Array {
|
||||
func NewInt64Array(size int) *Int64Array {
|
||||
return &Int64Array{
|
||||
Array[int64]{
|
||||
atomic: &Int64F,
|
||||
atomic: Int64F,
|
||||
array: make([]int64, size),
|
||||
},
|
||||
}
|
||||
@ -54,7 +54,7 @@ func NewInt64Array(size int) *Int64Array {
|
||||
func NewUInt32Array(size int) *UInt32Array {
|
||||
return &UInt32Array{
|
||||
Array[uint32]{
|
||||
atomic: &UInt32F,
|
||||
atomic: UInt32F,
|
||||
array: make([]uint32, size),
|
||||
},
|
||||
}
|
||||
@ -63,7 +63,7 @@ func NewUInt32Array(size int) *UInt32Array {
|
||||
func NewUInt64Array(size int) *UInt64Array {
|
||||
return &UInt64Array{
|
||||
Array[uint64]{
|
||||
atomic: &UInt64F,
|
||||
atomic: UInt64F,
|
||||
array: make([]uint64, size),
|
||||
},
|
||||
}
|
||||
@ -78,19 +78,19 @@ func (a *Array[T]) Array() []T {
|
||||
}
|
||||
|
||||
func (a *Array[T]) Get(index int) T {
|
||||
return a.atomic.Load(&a.array[index])
|
||||
return a.atomic.Load()(&a.array[index])
|
||||
}
|
||||
|
||||
func (a *Array[T]) Set(index int, p T) {
|
||||
a.atomic.Store(&a.array[index], p)
|
||||
a.atomic.Store()(&a.array[index], p)
|
||||
}
|
||||
|
||||
func (a *Array[T]) Swap(index int, p T) (old T) {
|
||||
return a.atomic.Swap(&a.array[index], p)
|
||||
return a.atomic.Swap()(&a.array[index], p)
|
||||
}
|
||||
|
||||
func (a *Array[T]) CompareAndSwap(index int, old, new T) (swapped bool) {
|
||||
return a.atomic.CompareAndSwap(&a.array[index], old, new)
|
||||
return a.atomic.CompareAndSwap()(&a.array[index], old, new)
|
||||
}
|
||||
|
||||
func (a *Int32Array) Add(index int, value int32) {
|
||||
|
@ -20,41 +20,51 @@ type (
|
||||
CompareAndSwapBit(bit int, old, new bool) (swapped bool)
|
||||
}
|
||||
|
||||
Atomic[T any] struct {
|
||||
Swap func(addr *T, new T) (old T)
|
||||
CompareAndSwap func(addr *T, old, new T) (swapped bool)
|
||||
Load func(addr *T) (val T)
|
||||
Store func(addr *T, val T)
|
||||
Atomic[T any] interface {
|
||||
Swap() func(addr *T, new T) (old T)
|
||||
CompareAndSwap() func(addr *T, old, new T) (swapped bool)
|
||||
Load() func(addr *T) (val T)
|
||||
Store() func(addr *T, val T)
|
||||
}
|
||||
|
||||
atomicImpl[T any] struct {
|
||||
swap func(addr *T, new T) (old T)
|
||||
compareAndSwap func(addr *T, old, new T) (swapped bool)
|
||||
load func(addr *T) (val T)
|
||||
store func(addr *T, val T)
|
||||
}
|
||||
|
||||
atomicTyped[T any] struct {
|
||||
}
|
||||
)
|
||||
|
||||
//goland:noinspection GoUnusedGlobalVariable
|
||||
var (
|
||||
Int32F = Atomic[int32]{
|
||||
Int32F Atomic[int32] = &atomicImpl[int32]{
|
||||
SwapInt32,
|
||||
CompareAndSwapInt32,
|
||||
LoadInt32,
|
||||
StoreInt32,
|
||||
}
|
||||
Int64F = Atomic[int64]{
|
||||
Int64F Atomic[int64] = &atomicImpl[int64]{
|
||||
SwapInt64,
|
||||
CompareAndSwapInt64,
|
||||
LoadInt64,
|
||||
StoreInt64,
|
||||
}
|
||||
UInt32F = Atomic[uint32]{
|
||||
UInt32F Atomic[uint32] = &atomicImpl[uint32]{
|
||||
SwapUInt32,
|
||||
CompareAndSwapUInt32,
|
||||
LoadUint32,
|
||||
StoreUInt32,
|
||||
}
|
||||
UInt64F = Atomic[uint64]{
|
||||
UInt64F Atomic[uint64] = &atomicImpl[uint64]{
|
||||
SwapUInt64,
|
||||
CompareAndSwapUInt64,
|
||||
LoadUint64,
|
||||
StoreUInt64,
|
||||
}
|
||||
PointerF = Atomic[unsafe.Pointer]{
|
||||
PointerF Atomic[unsafe.Pointer] = &atomicImpl[unsafe.Pointer]{
|
||||
UnsafeSwapPointer,
|
||||
UnsafeCompareAndSwapPointer,
|
||||
UnsafeLoadPointer,
|
||||
@ -62,13 +72,40 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func GetAtomic[T any]() *Atomic[*T] {
|
||||
return &Atomic[*T]{
|
||||
SwapPointer[T],
|
||||
CompareAndSwapPointer[T],
|
||||
LoadPointer[T],
|
||||
StorePointer[T],
|
||||
}
|
||||
func GetAtomic[T any]() Atomic[*T] {
|
||||
return atomicTyped[T]{}
|
||||
}
|
||||
|
||||
func (a atomicTyped[T]) Swap() func(addr **T, new *T) (old *T) {
|
||||
return SwapPointer[T]
|
||||
}
|
||||
|
||||
func (a atomicTyped[T]) CompareAndSwap() func(addr **T, old *T, new *T) (swapped bool) {
|
||||
return CompareAndSwapPointer[T]
|
||||
}
|
||||
|
||||
func (a atomicTyped[T]) Load() func(addr **T) (val *T) {
|
||||
return LoadPointer[T]
|
||||
}
|
||||
|
||||
func (a atomicTyped[T]) Store() func(addr **T, val *T) {
|
||||
return StorePointer[T]
|
||||
}
|
||||
|
||||
func (a *atomicImpl[T]) Swap() func(addr *T, new T) (old T) {
|
||||
return a.swap
|
||||
}
|
||||
|
||||
func (a *atomicImpl[T]) CompareAndSwap() func(addr *T, old T, new T) (swapped bool) {
|
||||
return a.compareAndSwap
|
||||
}
|
||||
|
||||
func (a *atomicImpl[T]) Load() func(addr *T) (val T) {
|
||||
return a.load
|
||||
}
|
||||
|
||||
func (a *atomicImpl[T]) Store() func(addr *T, val T) {
|
||||
return a.store
|
||||
}
|
||||
|
||||
func SwapInt32(addr *int32, new int32) (old int32) {
|
||||
|
@ -3,6 +3,8 @@ package atomic
|
||||
import (
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
)
|
||||
|
||||
//goland:noinspection GoUnusedGlobalVariable
|
||||
@ -16,33 +18,62 @@ var (
|
||||
type (
|
||||
Pointer = unsafe.Pointer
|
||||
PPointer = *unsafe.Pointer
|
||||
|
||||
// Reference atomic type T reference
|
||||
Reference[T any] struct {
|
||||
reference *T
|
||||
lang.BaseObject
|
||||
p *T
|
||||
}
|
||||
)
|
||||
|
||||
// NewReference new *Reference[T] init by given reference
|
||||
func NewReference[T any](reference *T) *Reference[T] {
|
||||
return &Reference[T]{reference}
|
||||
return lang.ForceCast[Reference[T]](Pointer(&reference))
|
||||
}
|
||||
|
||||
func (v *Reference[T]) Load() (val *T) {
|
||||
return LoadPointer(&v.reference)
|
||||
// ReferenceOf cast **T to *Reference[T]
|
||||
func ReferenceOf[T any](reference **T) *Reference[T] {
|
||||
return lang.ForceCast[Reference[T]](Pointer(reference))
|
||||
}
|
||||
|
||||
func (v *Reference[T]) Store(val *T) {
|
||||
StorePointer(&v.reference, val)
|
||||
func ReferenceUintptr[T any](reference *uintptr) *Reference[T] {
|
||||
return lang.ForceCast[Reference[T]](Pointer(reference))
|
||||
}
|
||||
|
||||
func (v *Reference[T]) Swap(new *T) (old *T) {
|
||||
return SwapPointer(&v.reference, new)
|
||||
func (r *Reference[T]) AsPointer() Pointer {
|
||||
return Pointer(r)
|
||||
}
|
||||
|
||||
func (v *Reference[T]) CompareAndSwap(old, new *T) (swapped bool) {
|
||||
return CompareAndSwapPointer(&v.reference, old, new)
|
||||
func (r *Reference[T]) AsPPointer() PPointer {
|
||||
return PPointer(r.AsPointer())
|
||||
}
|
||||
|
||||
func AsPPointer[T any](p **T) *unsafe.Pointer {
|
||||
return (*unsafe.Pointer)(unsafe.Pointer(p))
|
||||
func (r *Reference[T]) AsUintptr() *TypedUintptr[T] {
|
||||
return lang.ForceCast[TypedUintptr[T]](r.AsPointer())
|
||||
}
|
||||
|
||||
func (r *Reference[T]) pointer() **T {
|
||||
return &r.p
|
||||
}
|
||||
|
||||
func (r *Reference[T]) Load() (val *T) {
|
||||
return LoadPointer(r.pointer())
|
||||
}
|
||||
|
||||
func (r *Reference[T]) Store(val *T) {
|
||||
StorePointer(r.pointer(), val)
|
||||
}
|
||||
|
||||
func (r *Reference[T]) Swap(new *T) (old *T) {
|
||||
return SwapPointer(r.pointer(), new)
|
||||
}
|
||||
|
||||
func (r *Reference[T]) CompareAndSwap(old, new *T) (swapped bool) {
|
||||
return CompareAndSwapPointer(r.pointer(), old, new)
|
||||
}
|
||||
|
||||
func AsPPointer[T any](p **T) PPointer {
|
||||
return PPointer(Pointer(p))
|
||||
}
|
||||
|
||||
func LoadPointer[T any](addr **T) (val *T) {
|
||||
@ -50,11 +81,11 @@ func LoadPointer[T any](addr **T) (val *T) {
|
||||
}
|
||||
|
||||
func StorePointer[T any](addr **T, val *T) {
|
||||
atomic.StorePointer(AsPPointer(addr), unsafe.Pointer(val))
|
||||
atomic.StorePointer(AsPPointer(addr), Pointer(val))
|
||||
}
|
||||
|
||||
func SwapPointer[T any](addr **T, new *T) (old *T) {
|
||||
return (*T)(atomic.SwapPointer(AsPPointer(addr), unsafe.Pointer(new)))
|
||||
return (*T)(atomic.SwapPointer(AsPPointer(addr), Pointer(new)))
|
||||
}
|
||||
|
||||
func CompareAndSwapPointer[T any](addr **T, old, new *T) (swapped bool) {
|
||||
|
@ -2,14 +2,26 @@ package atomic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAtomic_Store(t *testing.T) {
|
||||
a := &Reference[lang.Int]{}
|
||||
var i lang.Int = 1
|
||||
a := NewReference[int](nil)
|
||||
var i = 1
|
||||
a.Store(&i)
|
||||
i = 2
|
||||
fmt.Println(a.Load())
|
||||
fmt.Println(*a.Load())
|
||||
}
|
||||
|
||||
func TestReferenceOf(t *testing.T) {
|
||||
one := 1
|
||||
|
||||
var p *int = nil
|
||||
ref := ReferenceOf(&p)
|
||||
|
||||
ref.Store(&one)
|
||||
fmt.Println(ref.Load())
|
||||
fmt.Println(*ref.Load())
|
||||
|
||||
_ = *ref.AsUintptr() + 1
|
||||
}
|
||||
|
87
lang/atomic/Uintptr.go
Normal file
87
lang/atomic/Uintptr.go
Normal file
@ -0,0 +1,87 @@
|
||||
package atomic
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
)
|
||||
|
||||
type (
|
||||
Uintptr uintptr
|
||||
TypedUintptr[T any] Uintptr
|
||||
)
|
||||
|
||||
func (p Uintptr) Raw() uintptr {
|
||||
return uintptr(p)
|
||||
}
|
||||
|
||||
func (p *Uintptr) RawP() *uintptr {
|
||||
return (*uintptr)(p)
|
||||
}
|
||||
|
||||
func (p *Uintptr) AsPointer() Pointer {
|
||||
return Pointer(p)
|
||||
}
|
||||
|
||||
func (p *Uintptr) AsPPointer() PPointer {
|
||||
return PPointer(p.AsPointer())
|
||||
}
|
||||
func (p *Uintptr) Load() (val Uintptr) {
|
||||
return Uintptr(atomic.LoadUintptr((*uintptr)(p)))
|
||||
}
|
||||
|
||||
func (p *Uintptr) Store(val Uintptr) {
|
||||
atomic.StoreUintptr((*uintptr)(p), uintptr(val))
|
||||
}
|
||||
|
||||
func (p *Uintptr) Swap(new Uintptr) (old Uintptr) {
|
||||
return Uintptr(atomic.SwapUintptr((*uintptr)(p), uintptr(new)))
|
||||
}
|
||||
|
||||
func (p *Uintptr) CompareAndSwap(old, new Uintptr) (swapped bool) {
|
||||
return atomic.CompareAndSwapUintptr((*uintptr)(p), uintptr(old), uintptr(new))
|
||||
}
|
||||
|
||||
func (p TypedUintptr[T]) Raw() uintptr {
|
||||
return uintptr(p)
|
||||
}
|
||||
|
||||
func (p *TypedUintptr[T]) RawP() *uintptr {
|
||||
return (*uintptr)(p)
|
||||
}
|
||||
|
||||
func (p *TypedUintptr[T]) AsPointer() Pointer {
|
||||
return Pointer(p)
|
||||
}
|
||||
|
||||
func (p *TypedUintptr[T]) AsPPointer() PPointer {
|
||||
return PPointer(p.AsPointer())
|
||||
}
|
||||
|
||||
func (tp *TypedUintptr[T]) AsReference() *Reference[T] {
|
||||
return lang.ForceCast[Reference[T]](Pointer(tp))
|
||||
}
|
||||
|
||||
func (tp TypedUintptr[T]) Uintptr() Uintptr {
|
||||
return Uintptr(tp)
|
||||
}
|
||||
|
||||
func (tp *TypedUintptr[T]) PUintptr() *Uintptr {
|
||||
return (*Uintptr)(tp)
|
||||
}
|
||||
|
||||
func (tp *TypedUintptr[T]) Load() TypedUintptr[T] {
|
||||
return TypedUintptr[T](atomic.LoadUintptr((*uintptr)(tp)))
|
||||
}
|
||||
|
||||
func (tp *TypedUintptr[T]) Store(val TypedUintptr[T]) {
|
||||
atomic.StoreUintptr((*uintptr)(tp), uintptr(val))
|
||||
}
|
||||
|
||||
func (tp *TypedUintptr[T]) Swap(new TypedUintptr[T]) (old TypedUintptr[T]) {
|
||||
return TypedUintptr[T](atomic.SwapUintptr((*uintptr)(tp), uintptr(new)))
|
||||
}
|
||||
|
||||
func (tp *TypedUintptr[T]) CompareAndSwap(old, new TypedUintptr[T]) (swapped bool) {
|
||||
return atomic.CompareAndSwapUintptr((*uintptr)(tp), uintptr(old), uintptr(new))
|
||||
}
|
@ -1,21 +1,11 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type (
|
||||
Stringer struct {
|
||||
stringer func() string
|
||||
}
|
||||
)
|
||||
|
||||
func OnceDone(once *sync.Once) bool {
|
||||
i := *(*uint32)(unsafe.Pointer(once))
|
||||
return i != 0
|
||||
}
|
||||
|
||||
func NewStringer(stringer func() string) Stringer {
|
||||
return Stringer{
|
||||
stringer: stringer,
|
||||
|
11
util/unsafe/unsafe.go
Normal file
11
util/unsafe/unsafe.go
Normal file
@ -0,0 +1,11 @@
|
||||
package unsafe
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/tursom/GoCollections/lang"
|
||||
)
|
||||
|
||||
func Sizeof[T any]() uintptr {
|
||||
return unsafe.Sizeof(lang.Nil[T]())
|
||||
}
|
Loading…
Reference in New Issue
Block a user