mirror of
https://github.com/tursom/GoCollections.git
synced 2025-03-13 17:00:18 +08:00
add MapReduce & Park
This commit is contained in:
parent
c7e46ba377
commit
a62508af6f
45
concurrent/collections/Park.go
Normal file
45
concurrent/collections/Park.go
Normal file
@ -0,0 +1,45 @@
|
||||
package collections
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Park struct {
|
||||
lock sync.Mutex
|
||||
ch chan struct{}
|
||||
}
|
||||
|
||||
func (p *Park) getCh() chan struct{} {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
if p.ch == nil {
|
||||
p.ch = make(chan struct{})
|
||||
}
|
||||
|
||||
return p.ch
|
||||
}
|
||||
|
||||
func (p *Park) Park() {
|
||||
<-p.getCh()
|
||||
}
|
||||
|
||||
func (p *Park) ParkT(timeout time.Duration) {
|
||||
select {
|
||||
case <-p.getCh():
|
||||
case <-time.After(timeout):
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Park) Unpark() {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
if p.ch == nil {
|
||||
return
|
||||
}
|
||||
|
||||
close(p.ch)
|
||||
p.ch = nil
|
||||
}
|
38
concurrent/collections/Park_test.go
Normal file
38
concurrent/collections/Park_test.go
Normal file
@ -0,0 +1,38 @@
|
||||
package collections
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPark_Park(t *testing.T) {
|
||||
var p Park
|
||||
t1 := time.Now()
|
||||
go func() {
|
||||
<-time.After(time.Second)
|
||||
|
||||
p.Unpark()
|
||||
}()
|
||||
|
||||
p.Park()
|
||||
t2 := time.Now()
|
||||
|
||||
sub := t2.Sub(t1)
|
||||
if sub > time.Duration(float64(time.Second)*1.01) ||
|
||||
sub < time.Duration(float64(time.Second)*0.09) {
|
||||
t.Fatal(sub)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPark_ParkT(t *testing.T) {
|
||||
var p Park
|
||||
t1 := time.Now()
|
||||
p.ParkT(time.Second)
|
||||
t2 := time.Now()
|
||||
|
||||
sub := t2.Sub(t1)
|
||||
if sub > time.Duration(float64(time.Second)*1.01) ||
|
||||
sub < time.Duration(float64(time.Second)*0.09) {
|
||||
t.Fatal(sub)
|
||||
}
|
||||
}
|
66
util/mr/MapReduce.go
Normal file
66
util/mr/MapReduce.go
Normal file
@ -0,0 +1,66 @@
|
||||
package mr
|
||||
|
||||
import "github.com/tursom/GoCollections/lang/atomic"
|
||||
|
||||
type (
|
||||
MapReduce[V, R any] interface {
|
||||
Map(value V) R
|
||||
Reduce(results <-chan R) R
|
||||
}
|
||||
)
|
||||
|
||||
func LocalMap[V, R any](values <-chan V, m func(value V) R) <-chan R {
|
||||
rc := make(chan R)
|
||||
|
||||
go func() {
|
||||
for value := range values {
|
||||
rc <- m(value)
|
||||
}
|
||||
|
||||
close(rc)
|
||||
}()
|
||||
|
||||
return rc
|
||||
}
|
||||
|
||||
func LocalReduce[R any](values <-chan R, r func(results <-chan R) R) R {
|
||||
return r(values)
|
||||
}
|
||||
|
||||
func Local[V, R any](values <-chan V, mr MapReduce[V, R]) R {
|
||||
return LocalReduce(LocalMap(values, mr.Map), mr.Reduce)
|
||||
}
|
||||
|
||||
func MultiMap[V, R any](values <-chan V, m func(value V) R) <-chan R {
|
||||
rc := make(chan R)
|
||||
|
||||
c := atomic.Int32(1)
|
||||
for value0 := range values {
|
||||
value := value0
|
||||
go func() {
|
||||
rc <- m(value)
|
||||
|
||||
if c.Add(-1) == 0 {
|
||||
close(rc)
|
||||
}
|
||||
}()
|
||||
}
|
||||
if c.Add(-1) == 0 {
|
||||
close(rc)
|
||||
}
|
||||
|
||||
return rc
|
||||
}
|
||||
|
||||
func MultiReduce[R any](values <-chan R, r func(results <-chan R) R) R {
|
||||
rc := make(chan R)
|
||||
go func() {
|
||||
rc <- r(values)
|
||||
}()
|
||||
|
||||
return <-rc
|
||||
}
|
||||
|
||||
func Multi[V, R any](values <-chan V, mr MapReduce[V, R]) R {
|
||||
return MultiReduce(MultiMap(values, mr.Map), mr.Reduce)
|
||||
}
|
Loading…
Reference in New Issue
Block a user