feat: remove queue

This has long since been moved into the DHT itself and doesn't belong
here. I.e., it has nothing to do with the peerstore and nobody else uses
it.

replaces https://github.com/libp2p/go-libp2p-peerstore/pull/83
This commit is contained in:
Steven Allen 2021-07-23 10:59:18 -07:00
parent 3f599b0424
commit f00c27d4d7
6 changed files with 4 additions and 358 deletions

2
go.mod
View File

@ -14,8 +14,6 @@ require (
github.com/multiformats/go-base32 v0.0.3
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/multiformats/go-multihash v0.0.15
github.com/stretchr/testify v1.7.0
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1
go.uber.org/goleak v1.1.10
)

18
go.sum
View File

@ -85,8 +85,6 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
@ -106,9 +104,8 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
@ -130,9 +127,8 @@ github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I=
github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.15 h1:hWOPdrNqDjwHDx82vsYGSDZNyktOJJ2dzZJzFkOV1jM=
github.com/multiformats/go-multihash v0.0.15/go.mod h1:D6aZrWNLFTV/ynMpKsNtB40mJzmCl4jb1alC0OvHiHg=
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY=
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
@ -170,8 +166,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -194,9 +188,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -231,11 +224,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=

View File

@ -1,100 +0,0 @@
package queue
import (
"container/heap"
"math/big"
"sync"
"github.com/libp2p/go-libp2p-core/peer"
ks "github.com/whyrusleeping/go-keyspace"
)
// peerMetric tracks a peer and its distance to something else.
type peerMetric struct {
// the peer
peer peer.ID
// big.Int for XOR metric
metric *big.Int
}
// peerMetricHeap implements a heap of peerDistances
type peerMetricHeap []*peerMetric
func (ph peerMetricHeap) Len() int {
return len(ph)
}
func (ph peerMetricHeap) Less(i, j int) bool {
return -1 == ph[i].metric.Cmp(ph[j].metric)
}
func (ph peerMetricHeap) Swap(i, j int) {
ph[i], ph[j] = ph[j], ph[i]
}
func (ph *peerMetricHeap) Push(x interface{}) {
item := x.(*peerMetric)
*ph = append(*ph, item)
}
func (ph *peerMetricHeap) Pop() interface{} {
old := *ph
n := len(old)
item := old[n-1]
*ph = old[0 : n-1]
return item
}
// distancePQ implements heap.Interface and PeerQueue
type distancePQ struct {
// from is the Key this PQ measures against
from ks.Key
// heap is a heap of peerDistance items
heap peerMetricHeap
sync.RWMutex
}
func (pq *distancePQ) Len() int {
pq.Lock()
defer pq.Unlock()
return len(pq.heap)
}
func (pq *distancePQ) Enqueue(p peer.ID) {
pq.Lock()
defer pq.Unlock()
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
heap.Push(&pq.heap, &peerMetric{
peer: p,
metric: distance,
})
}
func (pq *distancePQ) Dequeue() peer.ID {
pq.Lock()
defer pq.Unlock()
if len(pq.heap) < 1 {
panic("called Dequeue on an empty PeerQueue")
// will panic internally anyway, but we can help debug here
}
o := heap.Pop(&pq.heap)
p := o.(*peerMetric)
return p.peer
}
// NewXORDistancePQ returns a PeerQueue which maintains its peers sorted
// in terms of their distances to each other in an XORKeySpace (i.e. using
// XOR as a metric of distance).
func NewXORDistancePQ(from string) PeerQueue {
return &distancePQ{
from: ks.XORKeySpace.Key([]byte(from)),
heap: peerMetricHeap{},
}
}

View File

@ -1,18 +0,0 @@
package queue
import "github.com/libp2p/go-libp2p-core/peer"
// PeerQueue maintains a set of peers ordered according to a metric.
// Implementations of PeerQueue could order peers based on distances along
// a KeySpace, latency measurements, trustworthiness, reputation, etc.
type PeerQueue interface {
// Len returns the number of items in PeerQueue
Len() int
// Enqueue adds this node to the queue.
Enqueue(peer.ID)
// Dequeue retrieves the highest (smallest int) priority node
Dequeue() peer.ID
}

View File

@ -1,139 +0,0 @@
package queue
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
mh "github.com/multiformats/go-multihash"
)
func TestQueue(t *testing.T) {
p1 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
p2 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32") // these aren't valid, because need to hex-decode.
p3 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") // these aren't valid, because need to hex-decode.
p4 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34") // these aren't valid, because need to hex-decode.
p5 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
// but they work.
// these are the peer.IDs' XORKeySpace Key values:
// [228 47 151 130 156 102 222 232 218 31 132 94 170 208 80 253 120 103 55 35 91 237 48 157 81 245 57 247 66 150 9 40]
// [26 249 85 75 54 49 25 30 21 86 117 62 85 145 48 175 155 194 210 216 58 14 241 143 28 209 129 144 122 28 163 6]
// [78 135 26 216 178 181 224 181 234 117 2 248 152 115 255 103 244 34 4 152 193 88 9 225 8 127 216 158 226 8 236 246]
// [125 135 124 6 226 160 101 94 192 57 39 12 18 79 121 140 190 154 147 55 44 83 101 151 63 255 94 179 51 203 241 51]
pq := NewXORDistancePQ("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")
pq.Enqueue(p3)
pq.Enqueue(p1)
pq.Enqueue(p2)
pq.Enqueue(p4)
pq.Enqueue(p5)
pq.Enqueue(p1)
// should come out as: p1, p4, p3, p2
if d := pq.Dequeue(); d != p1 && d != p5 {
t.Error("ordering failed")
}
if d := pq.Dequeue(); d != p1 && d != p5 {
t.Error("ordering failed")
}
if d := pq.Dequeue(); d != p1 && d != p5 {
t.Error("ordering failed")
}
if pq.Dequeue() != p4 {
t.Error("ordering failed")
}
if pq.Dequeue() != p3 {
t.Error("ordering failed")
}
if pq.Dequeue() != p2 {
t.Error("ordering failed")
}
}
func newPeerTime(t time.Time) peer.ID {
s := fmt.Sprintf("hmmm time: %v", t)
h, _ := mh.Sum([]byte(s), mh.SHA2_256, -1)
return peer.ID(h)
}
func TestSyncQueue(t *testing.T) {
tickT := time.Microsecond * 50
max := 5000
consumerN := 10
countsIn := make([]int, consumerN*2)
countsOut := make([]int, consumerN)
if testing.Short() {
max = 1000
}
ctx := context.Background()
pq := NewXORDistancePQ("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31")
cq := NewChanQueue(ctx, pq)
wg := sync.WaitGroup{}
produce := func(p int) {
defer wg.Done()
tick := time.Tick(tickT)
for i := 0; i < max; i++ {
select {
case tim := <-tick:
countsIn[p]++
cq.EnqChan <- newPeerTime(tim)
case <-ctx.Done():
return
}
}
}
consume := func(c int) {
defer wg.Done()
for {
select {
case <-cq.DeqChan:
countsOut[c]++
if countsOut[c] >= max*2 {
return
}
case <-ctx.Done():
return
}
}
}
// make n * 2 producers and n consumers
for i := 0; i < consumerN; i++ {
wg.Add(3)
go produce(i)
go produce(consumerN + i)
go consume(i)
}
wg.Wait()
sum := func(ns []int) int {
total := 0
for _, n := range ns {
total += n
}
return total
}
if sum(countsIn) != sum(countsOut) {
t.Errorf("didnt get all of them out: %d/%d", sum(countsOut), sum(countsIn))
}
}

View File

@ -1,85 +0,0 @@
package queue
import (
"context"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
)
var log = logging.Logger("peerqueue")
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
Queue PeerQueue
EnqChan chan<- peer.ID
DeqChan <-chan peer.ID
}
// NewChanQueue creates a ChanQueue by wrapping pq.
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
cq := &ChanQueue{Queue: pq}
cq.process(ctx)
return cq
}
func (cq *ChanQueue) process(ctx context.Context) {
// construct the channels here to be able to use them bidirectionally
enqChan := make(chan peer.ID)
deqChan := make(chan peer.ID)
cq.EnqChan = enqChan
cq.DeqChan = deqChan
go func() {
log.Debug("processing")
defer log.Debug("closed")
defer close(deqChan)
var next peer.ID
var item peer.ID
var more bool
for {
if cq.Queue.Len() == 0 {
// log.Debug("wait for enqueue")
select {
case next, more = <-enqChan:
if !more {
return
}
// log.Debug("got", next)
case <-ctx.Done():
return
}
} else {
next = cq.Queue.Dequeue()
// log.Debug("peek", next)
}
select {
case item, more = <-enqChan:
if !more {
if cq.Queue.Len() > 0 {
return // we're done done.
}
enqChan = nil // closed, so no use.
}
// log.Debug("got", item)
cq.Queue.Enqueue(item)
cq.Queue.Enqueue(next) // order may have changed.
next = ""
case deqChan <- next:
// log.Debug("dequeued", next)
next = ""
case <-ctx.Done():
return
}
}
}()
}