From 09bd3376a71040da36f3036bf8b5b6be9b475a57 Mon Sep 17 00:00:00 2001 From: tursom Date: Fri, 28 Oct 2022 18:12:00 +0800 Subject: [PATCH] fix 2 bug of ConcurrentLinkedQueue --- collections/ArrayList.go | 21 +++++++ collections/ArrayList_test.go | 23 +++++++- collections/Iterable.go | 16 +++++ collections/LinkedList.go | 21 +++++++ .../collections/ConcurrentLinkedQueue.go | 19 ++++++ .../collections/ConcurrentLinkedQueue_test.go | 59 ++++++++++++++++--- .../collections/ConcurrentLinkedStack.go | 1 + 7 files changed, 151 insertions(+), 9 deletions(-) diff --git a/collections/ArrayList.go b/collections/ArrayList.go index ca2a27c..dacba30 100644 --- a/collections/ArrayList.go +++ b/collections/ArrayList.go @@ -27,6 +27,27 @@ func NewArrayListByCapacity[T lang.Object](cap int) *ArrayList[T] { } } +// NewArrayListFrom create a new ArrayList from list by index from [from] until [to] +func NewArrayListFrom[T lang.Object](list List[T], from, to int) *ArrayList[T] { + newList := NewArrayListByCapacity[T](to - from) + iterator, err := SkipIterator[T](list.ListIterator(), to-from) + if err != nil { + panic(err) + } + + for i := 0; i < to-from; i++ { + next, err := iterator.Next() + if err != nil { + panic(err) + } + + // newList wont throw any exception in this place + _ = newList.Add(next) + } + + return newList +} + func (a *ArrayList[T]) String() string { return String[T](a) } diff --git a/collections/ArrayList_test.go b/collections/ArrayList_test.go index 2b5e223..aced2bf 100644 --- a/collections/ArrayList_test.go +++ b/collections/ArrayList_test.go @@ -2,11 +2,29 @@ package collections import ( "fmt" + "testing" + "github.com/tursom/GoCollections/exceptions" "github.com/tursom/GoCollections/lang" - "testing" ) +func Test_NewArrayListByCapacity(t *testing.T) { + capacity := 10 + list := NewArrayListByCapacity[lang.Int](capacity) + if len(list.array) != 0 && cap(list.array) != capacity { + t.Fail() + } +} + +func Test_NewArrayListFrom(t *testing.T) { + list := NewArrayList[lang.Int]() + for i := 0; i < 10; i++ { + list.Add(lang.Int(i)) + } + newList := NewArrayListFrom[lang.Int](list, 5, 6) + fmt.Println(newList) +} + func TestArrayListAdd(t *testing.T) { list := NewArrayList[lang.Int]() for i := 0; i < 10; i++ { @@ -39,7 +57,8 @@ func TestLinkedList(t *testing.T) { return nil, nil, nil }, ) - fmt.Println(exceptions.Exec2r1((*LinkedList[lang.Int]).Get, list, j).AsInt()) + //get := (*LinkedList[lang.Int]).Get + //fmt.Println(exceptions.Exec2r1[func(int) (int, exceptions.Exception), List[int], int](get, list, j).AsInt()) } } for i := 0; i < 10; i++ { diff --git a/collections/Iterable.go b/collections/Iterable.go index e5658e1..809f809 100644 --- a/collections/Iterable.go +++ b/collections/Iterable.go @@ -161,3 +161,19 @@ func Clear[T any](l MutableIterable[T]) exceptions.Exception { return iterator.Remove() }) } + +// SkipIterator skip [skip] items of [iterator] +// returns [iterator] itself +func SkipIterator[T any](iterator Iterator[T], skip int) (Iterator[T], exceptions.Exception) { + for i := 0; i < skip; i++ { + if iterator.HasNext() { + _, err := iterator.Next() + if err != nil { + return nil, err + } + } else { + break + } + } + return iterator, nil +} diff --git a/collections/LinkedList.go b/collections/LinkedList.go index f8623e6..6bd17af 100644 --- a/collections/LinkedList.go +++ b/collections/LinkedList.go @@ -33,6 +33,26 @@ func NewLinkedList[T lang.Object]() *LinkedList[T] { return &LinkedList[T]{lang.NewBaseObject(), tail, 0} } +func NewLinkedListFrom[T lang.Object](list List[T], from, to int) *LinkedList[T] { + newList := NewLinkedList[T]() + iterator, err := SkipIterator[T](list.ListIterator(), to-from) + if err != nil { + panic(err) + } + + for i := 0; i < to-from; i++ { + next, err := iterator.Next() + if err != nil { + panic(err) + } + + // newList wont throw any exception in this place + _ = newList.Add(next) + } + + return newList +} + func (l *LinkedList[T]) Size() int { return l.size } @@ -161,6 +181,7 @@ func (l *LinkedList[T]) SubMutableList(from, to int) MutableList[T] { list.Add(node.value) } return list + return NewArrayListFrom[T](l, from, to) } func (l *LinkedList[T]) Get(index int) (T, exceptions.Exception) { diff --git a/concurrent/collections/ConcurrentLinkedQueue.go b/concurrent/collections/ConcurrentLinkedQueue.go index 5c0857b..ee43620 100644 --- a/concurrent/collections/ConcurrentLinkedQueue.go +++ b/concurrent/collections/ConcurrentLinkedQueue.go @@ -8,6 +8,8 @@ import ( ) type ( + // ConcurrentLinkedQueue FIFO data struct, impl by linked list. + // in order to reuse ConcurrentLinkedStack, element will offer on queue's end, and poll on queue's head. ConcurrentLinkedQueue[T lang.Object] struct { lang.BaseObject ConcurrentLinkedStack[T] @@ -45,12 +47,21 @@ func (q *ConcurrentLinkedQueue[T]) OfferAndGetNode(element T) (collections.Queue return &linkedQueueIterator[T]{queue: q, node: newNode}, nil } +// offerAndGetNode offer an element on q.end func (q *ConcurrentLinkedQueue[T]) offerAndGetNode(element T) (*linkedStackNode[T], exceptions.Exception) { newNode := &linkedStackNode[T]{value: element} q.size.Add(1) var next **linkedStackNode[T] ref := q.end + + // bug fix + // buf caused by delete q.end but not update it's reference + for ref != nil && ref.deleted { + ref = ref.next + } + + // q.end is nil when queue just created switch { case ref == nil: next = &q.head @@ -67,6 +78,12 @@ func (q *ConcurrentLinkedQueue[T]) offerAndGetNode(element T) (*linkedStackNode[ } next = &ref.next } + + // bug fix + // q.head may be deleted on async env + for *next != nil { + next = &(*next).next + } } q.end = newNode return newNode, nil @@ -80,6 +97,8 @@ func (q *ConcurrentLinkedQueue[T]) MutableIterator() collections.MutableIterator return &linkedQueueIterator[T]{queue: q, node: q.head} } +// Size size of queue +// it may not correct on concurrent environment, to check it's empty, use func IsEmpty func (q *ConcurrentLinkedQueue[T]) Size() int { return int(q.size.Load()) } diff --git a/concurrent/collections/ConcurrentLinkedQueue_test.go b/concurrent/collections/ConcurrentLinkedQueue_test.go index e192e7e..2ebeeef 100644 --- a/concurrent/collections/ConcurrentLinkedQueue_test.go +++ b/concurrent/collections/ConcurrentLinkedQueue_test.go @@ -2,11 +2,13 @@ package collections import ( "fmt" + "math/rand" "sync" "testing" "time" "unsafe" + "github.com/tursom/GoCollections/collections" "github.com/tursom/GoCollections/exceptions" "github.com/tursom/GoCollections/lang" ) @@ -83,16 +85,59 @@ func TestConcurrentLinkedQueue_ThreadSafe(t *testing.T) { func Test_concurrentLinkedQueueIterator_Remove(t *testing.T) { queue := NewLinkedQueue[lang.Int]() - nodes := make([]QueueNode[lang.Int], 0) - for i := 0; i < 1000; i++ { + + for i := 0; i < 16; i++ { + testConcurrentLinkedQueueIteratorRemove(t, queue, 1000) + t.Logf("Test_concurrentLinkedQueueIterator_Remove passed on %d loop", i+1) + } +} + +func testConcurrentLinkedQueueIteratorRemove(t *testing.T, queue *ConcurrentLinkedQueue[lang.Int], nodeNumber int) { + nodes := make(chan collections.StackNode[lang.Int], nodeNumber) + nodesIndex := make([]bool, nodeNumber) + b := rand.Int()&1 == 1 + if b { + for i := 0; i < 4; i++ { + go func() { + for node := range nodes { + index := exceptions.Exec0r1(node.RemoveAndGet) + if nodesIndex[index] { + t.Fatalf("TODO Fatalf") + } + nodesIndex[index] = true + } + }() + } + } + + for i := 0; i < nodeNumber; i++ { node, _ := queue.OfferAndGetNode(lang.Int(i)) - nodes = append(nodes, node) - //fmt.Println(queue) + nodes <- node } - for _, node := range nodes { - fmt.Println(exceptions.Exec0r1(node.RemoveAndGet)) + + close(nodes) + + if !b { + for i := 0; i < 4; i++ { + go func() { + for node := range nodes { + index := exceptions.Exec0r1(node.RemoveAndGet) + if nodesIndex[index] { + t.Fatalf("TODO Fatalf") + } + nodesIndex[index] = true + } + }() + } } - if queue.Size() != 0 { + + time.Sleep(time.Second / 5) + + head := queue.head + if head != nil && head.deleted { + head = head.next + } + if head != nil { t.Fatalf(fmt.Sprintf("queue remain %d element, is not thread safe", queue.Size())) } } diff --git a/concurrent/collections/ConcurrentLinkedStack.go b/concurrent/collections/ConcurrentLinkedStack.go index 846911a..f6c2e3b 100644 --- a/concurrent/collections/ConcurrentLinkedStack.go +++ b/concurrent/collections/ConcurrentLinkedStack.go @@ -73,6 +73,7 @@ func (s *ConcurrentLinkedStack[T]) Pop() (T, exceptions.Exception) { return node.value, nil } +// CleanDeleted tell stack that may clean deleted nodes func (s *ConcurrentLinkedStack[T]) CleanDeleted() { if s.deleted.Load() <= s.size.Load() { return