This commit is contained in:
tursom 2022-07-11 17:36:48 +08:00
parent 60c4df4ade
commit 839f26196a
6 changed files with 127 additions and 6 deletions

View File

@ -5,16 +5,18 @@ import (
"github.com/tursom/GoCollections/exceptions"
"github.com/tursom/GoCollections/lang/atomic"
"github.com/tursom/GoCollections/util"
)
type (
// Sequence 一个用于并发生产场景下使消息有序的 Sequence
// 数据的接收顺序与 Sender 的生成顺序保持一致
Sequence[T any] struct {
lock sync.Mutex
ch chan T
head *sequenceNode[T]
end **sequenceNode[T]
lock sync.Mutex
ch chan T
head *sequenceNode[T]
end **sequenceNode[T]
close sync.Once
}
// SequenceSender 用于给 Sequence 发送信息
@ -73,6 +75,9 @@ func (s *Sequence[T]) Alloc() SequenceSender[T] {
// send 清空 Sequence 中可发送的消息
func (s *Sequence[T]) send() {
if s.Closed() {
return
}
channel := s.channel()
s.lock.Lock()
defer s.lock.Unlock()
@ -82,8 +87,7 @@ func (s *Sequence[T]) send() {
if head.cause == nil {
channel <- head.value
} else {
close(channel)
s.ch = nil
s.Close()
panic(head.cause)
}
head = head.next
@ -91,9 +95,25 @@ func (s *Sequence[T]) send() {
// 防止 sequenceNode.Send 判断自身是否是 head 产生的问题
atomic.StorePointer(&s.head, head)
}
if head == nil {
s.end = &s.head
}
}
func (s *Sequence[T]) Close() {
s.close.Do(func() {
close(s.channel())
})
}
func (s *Sequence[T]) Closed() bool {
return util.OnceDone(&s.close)
}
func (s *sequenceNode[T]) Send(value T) {
if s.sequence.Closed() {
return
}
s.value = value
s.sent = true
if atomic.LoadPointer(&s.sequence.head) == s {
@ -102,6 +122,9 @@ func (s *sequenceNode[T]) Send(value T) {
}
func (s *sequenceNode[T]) Fail(cause exceptions.Exception) {
if s.sequence.Closed() {
return
}
s.cause = cause
s.sent = true
if atomic.LoadPointer(&s.sequence.head) == s {

View File

@ -0,0 +1,35 @@
package util
import "github.com/tursom/GoCollections/concurrent/collections"
type (
pipeline[T, R any] struct {
value T
sender collections.SequenceSender[R]
}
)
func NewPipeline[T, R any](producer <-chan T, concurrency int, consumer func(T) R) <-chan R {
var sequence collections.Sequence[R]
c := make(chan pipeline[T, R])
go func() {
defer close(c)
for value := range producer {
c <- pipeline[T, R]{
value: value,
sender: sequence.Alloc(),
}
}
}()
for i := 0; i < concurrency; i++ {
go func() {
defer sequence.Close()
for value := range c {
r := consumer(value.value)
value.sender.Send(r)
}
}()
}
return sequence.Channel()
}

View File

@ -0,0 +1,26 @@
package util
import (
"fmt"
"testing"
"time"
)
func TestPipeline(t *testing.T) {
c := make(chan int)
r := NewPipeline(c, 4, func(t int) int {
return t * t * t * t
})
go func() {
defer close(c)
for i := 0; i < 1024; i++ {
c <- i
}
time.Sleep(time.Second)
}()
for i := range r {
fmt.Println(i)
}
}

View File

@ -58,6 +58,12 @@ func BuildStackTrace(builder *strings.Builder, e Exception) {
}
}
func GetStackTraceString(e Exception) string {
builder := &strings.Builder{}
BuildStackTrace(builder, e)
return builder.String()
}
func Try[R any](
f func() (ret R, err Exception),
catch func(panic any) (ret R, err Exception),

View File

@ -1,3 +1,5 @@
# this script due to build Exec.go
def arg_list(arg_size):
if arg_size == 0:
return ""

View File

@ -1 +1,30 @@
package util
import (
"sync"
"unsafe"
)
type (
Stringer struct {
stringer func() string
}
)
func OnceDone(once *sync.Once) bool {
i := *(*uint32)(unsafe.Pointer(once))
return i != 0
}
func NewStringer(stringer func() string) Stringer {
return Stringer{
stringer: stringer,
}
}
func (s Stringer) String() string {
if s.stringer == nil {
return "nil"
}
return s.stringer()
}