diff --git a/concurrent/collections/Sequence.go b/concurrent/collections/Sequence.go index 3e9af58..38144ee 100644 --- a/concurrent/collections/Sequence.go +++ b/concurrent/collections/Sequence.go @@ -5,16 +5,18 @@ import ( "github.com/tursom/GoCollections/exceptions" "github.com/tursom/GoCollections/lang/atomic" + "github.com/tursom/GoCollections/util" ) type ( // Sequence 一个用于并发生产场景下使消息有序的 Sequence // 数据的接收顺序与 Sender 的生成顺序保持一致 Sequence[T any] struct { - lock sync.Mutex - ch chan T - head *sequenceNode[T] - end **sequenceNode[T] + lock sync.Mutex + ch chan T + head *sequenceNode[T] + end **sequenceNode[T] + close sync.Once } // SequenceSender 用于给 Sequence 发送信息 @@ -73,6 +75,9 @@ func (s *Sequence[T]) Alloc() SequenceSender[T] { // send 清空 Sequence 中可发送的消息 func (s *Sequence[T]) send() { + if s.Closed() { + return + } channel := s.channel() s.lock.Lock() defer s.lock.Unlock() @@ -82,8 +87,7 @@ func (s *Sequence[T]) send() { if head.cause == nil { channel <- head.value } else { - close(channel) - s.ch = nil + s.Close() panic(head.cause) } head = head.next @@ -91,9 +95,25 @@ func (s *Sequence[T]) send() { // 防止 sequenceNode.Send 判断自身是否是 head 产生的问题 atomic.StorePointer(&s.head, head) } + if head == nil { + s.end = &s.head + } +} + +func (s *Sequence[T]) Close() { + s.close.Do(func() { + close(s.channel()) + }) +} + +func (s *Sequence[T]) Closed() bool { + return util.OnceDone(&s.close) } func (s *sequenceNode[T]) Send(value T) { + if s.sequence.Closed() { + return + } s.value = value s.sent = true if atomic.LoadPointer(&s.sequence.head) == s { @@ -102,6 +122,9 @@ func (s *sequenceNode[T]) Send(value T) { } func (s *sequenceNode[T]) Fail(cause exceptions.Exception) { + if s.sequence.Closed() { + return + } s.cause = cause s.sent = true if atomic.LoadPointer(&s.sequence.head) == s { diff --git a/concurrent/util/Pipeline.go b/concurrent/util/Pipeline.go new file mode 100644 index 0000000..2291337 --- /dev/null +++ b/concurrent/util/Pipeline.go @@ -0,0 +1,35 @@ +package util + +import "github.com/tursom/GoCollections/concurrent/collections" + +type ( + pipeline[T, R any] struct { + value T + sender collections.SequenceSender[R] + } +) + +func NewPipeline[T, R any](producer <-chan T, concurrency int, consumer func(T) R) <-chan R { + var sequence collections.Sequence[R] + c := make(chan pipeline[T, R]) + go func() { + defer close(c) + for value := range producer { + c <- pipeline[T, R]{ + value: value, + sender: sequence.Alloc(), + } + } + }() + + for i := 0; i < concurrency; i++ { + go func() { + defer sequence.Close() + for value := range c { + r := consumer(value.value) + value.sender.Send(r) + } + }() + } + return sequence.Channel() +} diff --git a/concurrent/util/Pipeline_test.go b/concurrent/util/Pipeline_test.go new file mode 100644 index 0000000..86f2ef9 --- /dev/null +++ b/concurrent/util/Pipeline_test.go @@ -0,0 +1,26 @@ +package util + +import ( + "fmt" + "testing" + "time" +) + +func TestPipeline(t *testing.T) { + c := make(chan int) + r := NewPipeline(c, 4, func(t int) int { + return t * t * t * t + }) + + go func() { + defer close(c) + for i := 0; i < 1024; i++ { + c <- i + } + time.Sleep(time.Second) + }() + + for i := range r { + fmt.Println(i) + } +} diff --git a/exceptions/Exception.go b/exceptions/Exception.go index db6d973..85b1fa2 100644 --- a/exceptions/Exception.go +++ b/exceptions/Exception.go @@ -58,6 +58,12 @@ func BuildStackTrace(builder *strings.Builder, e Exception) { } } +func GetStackTraceString(e Exception) string { + builder := &strings.Builder{} + BuildStackTrace(builder, e) + return builder.String() +} + func Try[R any]( f func() (ret R, err Exception), catch func(panic any) (ret R, err Exception), diff --git a/exceptions/exec.py b/exceptions/exec.py index d885115..9556ff9 100644 --- a/exceptions/exec.py +++ b/exceptions/exec.py @@ -1,3 +1,5 @@ +# this script due to build Exec.go + def arg_list(arg_size): if arg_size == 0: return "" diff --git a/util/Utils.go b/util/Utils.go index c7d8682..ef18588 100644 --- a/util/Utils.go +++ b/util/Utils.go @@ -1 +1,30 @@ package util + +import ( + "sync" + "unsafe" +) + +type ( + Stringer struct { + stringer func() string + } +) + +func OnceDone(once *sync.Once) bool { + i := *(*uint32)(unsafe.Pointer(once)) + return i != 0 +} + +func NewStringer(stringer func() string) Stringer { + return Stringer{ + stringer: stringer, + } +} + +func (s Stringer) String() string { + if s.stringer == nil { + return "nil" + } + return s.stringer() +}