mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2024-12-26 23:30:32 +08:00
Merge pull request #166 from libp2p/fix/remove-queue
feat: remove queue
This commit is contained in:
commit
301af201be
2
go.mod
2
go.mod
@ -14,8 +14,6 @@ require (
|
||||
github.com/multiformats/go-base32 v0.0.3
|
||||
github.com/multiformats/go-multiaddr v0.3.3
|
||||
github.com/multiformats/go-multiaddr-fmt v0.1.0
|
||||
github.com/multiformats/go-multihash v0.0.15
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
|
||||
go.uber.org/goleak v1.1.10
|
||||
)
|
||||
|
18
go.sum
18
go.sum
@ -85,8 +85,6 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||
github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw=
|
||||
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
|
||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
@ -106,9 +104,8 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP
|
||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
|
||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
|
||||
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
|
||||
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
|
||||
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
|
||||
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
|
||||
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
|
||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
|
||||
@ -130,9 +127,8 @@ github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77
|
||||
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
|
||||
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
|
||||
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
|
||||
github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I=
|
||||
github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
|
||||
github.com/multiformats/go-multihash v0.0.15 h1:hWOPdrNqDjwHDx82vsYGSDZNyktOJJ2dzZJzFkOV1jM=
|
||||
github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg=
|
||||
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
|
||||
github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY=
|
||||
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
|
||||
@ -170,8 +166,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
||||
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
|
||||
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
|
||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
|
||||
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
|
||||
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
@ -194,9 +188,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
|
||||
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
@ -231,11 +224,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY=
|
||||
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
|
@ -1,100 +0,0 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"math/big"
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
ks "github.com/whyrusleeping/go-keyspace"
|
||||
)
|
||||
|
||||
// peerMetric tracks a peer and its distance to something else.
|
||||
type peerMetric struct {
|
||||
// the peer
|
||||
peer peer.ID
|
||||
|
||||
// big.Int for XOR metric
|
||||
metric *big.Int
|
||||
}
|
||||
|
||||
// peerMetricHeap implements a heap of peerDistances
|
||||
type peerMetricHeap []*peerMetric
|
||||
|
||||
func (ph peerMetricHeap) Len() int {
|
||||
return len(ph)
|
||||
}
|
||||
|
||||
func (ph peerMetricHeap) Less(i, j int) bool {
|
||||
return -1 == ph[i].metric.Cmp(ph[j].metric)
|
||||
}
|
||||
|
||||
func (ph peerMetricHeap) Swap(i, j int) {
|
||||
ph[i], ph[j] = ph[j], ph[i]
|
||||
}
|
||||
|
||||
func (ph *peerMetricHeap) Push(x interface{}) {
|
||||
item := x.(*peerMetric)
|
||||
*ph = append(*ph, item)
|
||||
}
|
||||
|
||||
func (ph *peerMetricHeap) Pop() interface{} {
|
||||
old := *ph
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
*ph = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// distancePQ implements heap.Interface and PeerQueue
|
||||
type distancePQ struct {
|
||||
// from is the Key this PQ measures against
|
||||
from ks.Key
|
||||
|
||||
// heap is a heap of peerDistance items
|
||||
heap peerMetricHeap
|
||||
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (pq *distancePQ) Len() int {
|
||||
pq.Lock()
|
||||
defer pq.Unlock()
|
||||
return len(pq.heap)
|
||||
}
|
||||
|
||||
func (pq *distancePQ) Enqueue(p peer.ID) {
|
||||
pq.Lock()
|
||||
defer pq.Unlock()
|
||||
|
||||
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
|
||||
|
||||
heap.Push(&pq.heap, &peerMetric{
|
||||
peer: p,
|
||||
metric: distance,
|
||||
})
|
||||
}
|
||||
|
||||
func (pq *distancePQ) Dequeue() peer.ID {
|
||||
pq.Lock()
|
||||
defer pq.Unlock()
|
||||
|
||||
if len(pq.heap) < 1 {
|
||||
panic("called Dequeue on an empty PeerQueue")
|
||||
// will panic internally anyway, but we can help debug here
|
||||
}
|
||||
|
||||
o := heap.Pop(&pq.heap)
|
||||
p := o.(*peerMetric)
|
||||
return p.peer
|
||||
}
|
||||
|
||||
// NewXORDistancePQ returns a PeerQueue which maintains its peers sorted
|
||||
// in terms of their distances to each other in an XORKeySpace (i.e. using
|
||||
// XOR as a metric of distance).
|
||||
func NewXORDistancePQ(from string) PeerQueue {
|
||||
return &distancePQ{
|
||||
from: ks.XORKeySpace.Key([]byte(from)),
|
||||
heap: peerMetricHeap{},
|
||||
}
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
package queue
|
||||
|
||||
import "github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
// PeerQueue maintains a set of peers ordered according to a metric.
|
||||
// Implementations of PeerQueue could order peers based on distances along
|
||||
// a KeySpace, latency measurements, trustworthiness, reputation, etc.
|
||||
type PeerQueue interface {
|
||||
|
||||
// Len returns the number of items in PeerQueue
|
||||
Len() int
|
||||
|
||||
// Enqueue adds this node to the queue.
|
||||
Enqueue(peer.ID)
|
||||
|
||||
// Dequeue retrieves the highest (smallest int) priority node
|
||||
Dequeue() peer.ID
|
||||
}
|
@ -1,139 +0,0 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
|
||||
p1 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
|
||||
p2 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32") // these aren't valid, because need to hex-decode.
|
||||
p3 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") // these aren't valid, because need to hex-decode.
|
||||
p4 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34") // these aren't valid, because need to hex-decode.
|
||||
p5 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
|
||||
// but they work.
|
||||
|
||||
// these are the peer.IDs' XORKeySpace Key values:
|
||||
// [228 47 151 130 156 102 222 232 218 31 132 94 170 208 80 253 120 103 55 35 91 237 48 157 81 245 57 247 66 150 9 40]
|
||||
// [26 249 85 75 54 49 25 30 21 86 117 62 85 145 48 175 155 194 210 216 58 14 241 143 28 209 129 144 122 28 163 6]
|
||||
// [78 135 26 216 178 181 224 181 234 117 2 248 152 115 255 103 244 34 4 152 193 88 9 225 8 127 216 158 226 8 236 246]
|
||||
// [125 135 124 6 226 160 101 94 192 57 39 12 18 79 121 140 190 154 147 55 44 83 101 151 63 255 94 179 51 203 241 51]
|
||||
|
||||
pq := NewXORDistancePQ("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")
|
||||
pq.Enqueue(p3)
|
||||
pq.Enqueue(p1)
|
||||
pq.Enqueue(p2)
|
||||
pq.Enqueue(p4)
|
||||
pq.Enqueue(p5)
|
||||
pq.Enqueue(p1)
|
||||
|
||||
// should come out as: p1, p4, p3, p2
|
||||
|
||||
if d := pq.Dequeue(); d != p1 && d != p5 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if d := pq.Dequeue(); d != p1 && d != p5 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if d := pq.Dequeue(); d != p1 && d != p5 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if pq.Dequeue() != p4 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if pq.Dequeue() != p3 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
if pq.Dequeue() != p2 {
|
||||
t.Error("ordering failed")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func newPeerTime(t time.Time) peer.ID {
|
||||
s := fmt.Sprintf("hmmm time: %v", t)
|
||||
h, _ := mh.Sum([]byte(s), mh.SHA2_256, -1)
|
||||
return peer.ID(h)
|
||||
}
|
||||
|
||||
func TestSyncQueue(t *testing.T) {
|
||||
tickT := time.Microsecond * 50
|
||||
max := 5000
|
||||
consumerN := 10
|
||||
countsIn := make([]int, consumerN*2)
|
||||
countsOut := make([]int, consumerN)
|
||||
|
||||
if testing.Short() {
|
||||
max = 1000
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pq := NewXORDistancePQ("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")
|
||||
cq := NewChanQueue(ctx, pq)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
produce := func(p int) {
|
||||
defer wg.Done()
|
||||
|
||||
tick := time.Tick(tickT)
|
||||
for i := 0; i < max; i++ {
|
||||
select {
|
||||
case tim := <-tick:
|
||||
countsIn[p]++
|
||||
cq.EnqChan <- newPeerTime(tim)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
consume := func(c int) {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-cq.DeqChan:
|
||||
countsOut[c]++
|
||||
if countsOut[c] >= max*2 {
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// make n * 2 producers and n consumers
|
||||
for i := 0; i < consumerN; i++ {
|
||||
wg.Add(3)
|
||||
go produce(i)
|
||||
go produce(consumerN + i)
|
||||
go consume(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
sum := func(ns []int) int {
|
||||
total := 0
|
||||
for _, n := range ns {
|
||||
total += n
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
if sum(countsIn) != sum(countsOut) {
|
||||
t.Errorf("didnt get all of them out: %d/%d", sum(countsOut), sum(countsIn))
|
||||
}
|
||||
}
|
@ -1,85 +0,0 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
var log = logging.Logger("peerqueue")
|
||||
|
||||
// ChanQueue makes any PeerQueue synchronizable through channels.
|
||||
type ChanQueue struct {
|
||||
Queue PeerQueue
|
||||
EnqChan chan<- peer.ID
|
||||
DeqChan <-chan peer.ID
|
||||
}
|
||||
|
||||
// NewChanQueue creates a ChanQueue by wrapping pq.
|
||||
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
|
||||
cq := &ChanQueue{Queue: pq}
|
||||
cq.process(ctx)
|
||||
return cq
|
||||
}
|
||||
|
||||
func (cq *ChanQueue) process(ctx context.Context) {
|
||||
// construct the channels here to be able to use them bidirectionally
|
||||
enqChan := make(chan peer.ID)
|
||||
deqChan := make(chan peer.ID)
|
||||
|
||||
cq.EnqChan = enqChan
|
||||
cq.DeqChan = deqChan
|
||||
|
||||
go func() {
|
||||
log.Debug("processing")
|
||||
defer log.Debug("closed")
|
||||
defer close(deqChan)
|
||||
|
||||
var next peer.ID
|
||||
var item peer.ID
|
||||
var more bool
|
||||
|
||||
for {
|
||||
if cq.Queue.Len() == 0 {
|
||||
// log.Debug("wait for enqueue")
|
||||
select {
|
||||
case next, more = <-enqChan:
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
// log.Debug("got", next)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
} else {
|
||||
next = cq.Queue.Dequeue()
|
||||
// log.Debug("peek", next)
|
||||
}
|
||||
|
||||
select {
|
||||
case item, more = <-enqChan:
|
||||
if !more {
|
||||
if cq.Queue.Len() > 0 {
|
||||
return // we're done done.
|
||||
}
|
||||
enqChan = nil // closed, so no use.
|
||||
}
|
||||
// log.Debug("got", item)
|
||||
cq.Queue.Enqueue(item)
|
||||
cq.Queue.Enqueue(next) // order may have changed.
|
||||
next = ""
|
||||
|
||||
case deqChan <- next:
|
||||
// log.Debug("dequeued", next)
|
||||
next = ""
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
Loading…
Reference in New Issue
Block a user