Merge pull request #47 from raulk/feat/pstore-ds-gc

pstoreds: migrate from strict TTL to GC-based expiry.
This commit is contained in:
Raúl Kripalani 2019-02-22 14:43:47 +00:00 committed by GitHub
commit c123410c64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 2255 additions and 549 deletions

View File

@ -1,3 +1,5 @@
coverage:
range: "50...100"
comment: off
ignore:
- "pb/*.pb.go"

View File

@ -3,6 +3,7 @@ package peerstore
import (
"context"
"errors"
"io"
"math"
"time"
@ -49,6 +50,8 @@ const (
// Peerstore provides a threadsafe store of Peer related
// information.
type Peerstore interface {
io.Closer
AddrBook
KeyBook
PeerMetadata

12
pb/Makefile Normal file
View File

@ -0,0 +1,12 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $<
clean:
rm -f *.pb.go
.PHONY: clean

110
pb/custom.go Normal file
View File

@ -0,0 +1,110 @@
package pstore_pb
import (
"encoding/json"
proto "github.com/gogo/protobuf/proto"
peer "github.com/libp2p/go-libp2p-peer"
pt "github.com/libp2p/go-libp2p-peer/test"
ma "github.com/multiformats/go-multiaddr"
)
// customGogoType aggregates the interfaces that custom Gogo types need to implement.
// it is only used for type assertions.
type customGogoType interface {
proto.Marshaler
proto.Unmarshaler
json.Marshaler
json.Unmarshaler
proto.Sizer
}
// ProtoAddr is a custom type used by gogo to serde raw peer IDs into the peer.ID type, and back.
type ProtoPeerID struct {
peer.ID
}
var _ customGogoType = (*ProtoPeerID)(nil)
func (id ProtoPeerID) Marshal() ([]byte, error) {
return []byte(id.ID), nil
}
func (id ProtoPeerID) MarshalTo(data []byte) (n int, err error) {
return copy(data, []byte(id.ID)), nil
}
func (id ProtoPeerID) MarshalJSON() ([]byte, error) {
m, _ := id.Marshal()
return json.Marshal(m)
}
func (id *ProtoPeerID) Unmarshal(data []byte) (err error) {
id.ID = peer.ID(string(data))
return nil
}
func (id *ProtoPeerID) UnmarshalJSON(data []byte) error {
var v []byte
err := json.Unmarshal(data, v)
if err != nil {
return err
}
return id.Unmarshal(v)
}
func (id ProtoPeerID) Size() int {
return len([]byte(id.ID))
}
// ProtoAddr is a custom type used by gogo to serde raw multiaddresses into the ma.Multiaddr type, and back.
type ProtoAddr struct {
ma.Multiaddr
}
var _ customGogoType = (*ProtoAddr)(nil)
func (a ProtoAddr) Marshal() ([]byte, error) {
return a.Bytes(), nil
}
func (a ProtoAddr) MarshalTo(data []byte) (n int, err error) {
return copy(data, a.Bytes()), nil
}
func (a ProtoAddr) MarshalJSON() ([]byte, error) {
m, _ := a.Marshal()
return json.Marshal(m)
}
func (a *ProtoAddr) Unmarshal(data []byte) (err error) {
a.Multiaddr, err = ma.NewMultiaddrBytes(data)
return err
}
func (a *ProtoAddr) UnmarshalJSON(data []byte) error {
v := new([]byte)
err := json.Unmarshal(data, v)
if err != nil {
return err
}
return a.Unmarshal(*v)
}
func (a ProtoAddr) Size() int {
return len(a.Bytes())
}
// NewPopulatedProtoAddr generates a populated instance of the custom gogo type ProtoAddr.
// It is required by gogo-generated tests.
func NewPopulatedProtoAddr(r randyPstore) *ProtoAddr {
a, _ := ma.NewMultiaddr("/ip4/123.123.123.123/tcp/7001")
return &ProtoAddr{Multiaddr: a}
}
// NewPopulatedProtoPeerID generates a populated instance of the custom gogo type ProtoPeerID.
// It is required by gogo-generated tests.
func NewPopulatedProtoPeerID(r randyPstore) *ProtoPeerID {
id, _ := pt.RandPeerID()
return &ProtoPeerID{ID: id}
}

754
pb/pstore.pb.go Normal file
View File

@ -0,0 +1,754 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pstore.proto
package pstore_pb
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
// AddrBookRecord represents a record for a peer in the address book.
type AddrBookRecord struct {
// The peer ID.
Id *ProtoPeerID `protobuf:"bytes,1,opt,name=id,proto3,customtype=ProtoPeerID" json:"id,omitempty"`
// The multiaddresses. This is a sorted list where element 0 expires the soonest.
Addrs []*AddrBookRecord_AddrEntry `protobuf:"bytes,2,rep,name=addrs,proto3" json:"addrs,omitempty"`
}
func (m *AddrBookRecord) Reset() { *m = AddrBookRecord{} }
func (m *AddrBookRecord) String() string { return proto.CompactTextString(m) }
func (*AddrBookRecord) ProtoMessage() {}
func (*AddrBookRecord) Descriptor() ([]byte, []int) {
return fileDescriptor_f96873690e08a98f, []int{0}
}
func (m *AddrBookRecord) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AddrBookRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AddrBookRecord.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *AddrBookRecord) XXX_Merge(src proto.Message) {
xxx_messageInfo_AddrBookRecord.Merge(m, src)
}
func (m *AddrBookRecord) XXX_Size() int {
return m.Size()
}
func (m *AddrBookRecord) XXX_DiscardUnknown() {
xxx_messageInfo_AddrBookRecord.DiscardUnknown(m)
}
var xxx_messageInfo_AddrBookRecord proto.InternalMessageInfo
func (m *AddrBookRecord) GetAddrs() []*AddrBookRecord_AddrEntry {
if m != nil {
return m.Addrs
}
return nil
}
// AddrEntry represents a single multiaddress.
type AddrBookRecord_AddrEntry struct {
Addr *ProtoAddr `protobuf:"bytes,1,opt,name=addr,proto3,customtype=ProtoAddr" json:"addr,omitempty"`
// The point in time when this address expires.
Expiry int64 `protobuf:"varint,2,opt,name=expiry,proto3" json:"expiry,omitempty"`
// The original TTL of this address.
Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"`
}
func (m *AddrBookRecord_AddrEntry) Reset() { *m = AddrBookRecord_AddrEntry{} }
func (m *AddrBookRecord_AddrEntry) String() string { return proto.CompactTextString(m) }
func (*AddrBookRecord_AddrEntry) ProtoMessage() {}
func (*AddrBookRecord_AddrEntry) Descriptor() ([]byte, []int) {
return fileDescriptor_f96873690e08a98f, []int{0, 0}
}
func (m *AddrBookRecord_AddrEntry) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AddrBookRecord_AddrEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AddrBookRecord_AddrEntry.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *AddrBookRecord_AddrEntry) XXX_Merge(src proto.Message) {
xxx_messageInfo_AddrBookRecord_AddrEntry.Merge(m, src)
}
func (m *AddrBookRecord_AddrEntry) XXX_Size() int {
return m.Size()
}
func (m *AddrBookRecord_AddrEntry) XXX_DiscardUnknown() {
xxx_messageInfo_AddrBookRecord_AddrEntry.DiscardUnknown(m)
}
var xxx_messageInfo_AddrBookRecord_AddrEntry proto.InternalMessageInfo
func (m *AddrBookRecord_AddrEntry) GetExpiry() int64 {
if m != nil {
return m.Expiry
}
return 0
}
func (m *AddrBookRecord_AddrEntry) GetTtl() int64 {
if m != nil {
return m.Ttl
}
return 0
}
func init() {
proto.RegisterType((*AddrBookRecord)(nil), "pstore.pb.AddrBookRecord")
proto.RegisterType((*AddrBookRecord_AddrEntry)(nil), "pstore.pb.AddrBookRecord.AddrEntry")
}
func init() { proto.RegisterFile("pstore.proto", fileDescriptor_f96873690e08a98f) }
var fileDescriptor_f96873690e08a98f = []byte{
// 255 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x28, 0x2e, 0xc9,
0x2f, 0x4a, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0xf1, 0x92, 0xa4, 0x74, 0xd3,
0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5, 0xc1,
0x2a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0xe8, 0x54, 0x3a, 0xc6, 0xc8, 0xc5,
0xe7, 0x98, 0x92, 0x52, 0xe4, 0x94, 0x9f, 0x9f, 0x1d, 0x94, 0x9a, 0x9c, 0x5f, 0x94, 0x22, 0x24,
0xcf, 0xc5, 0x94, 0x99, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0xe3, 0xc4, 0x7f, 0xeb, 0x9e, 0x3c,
0x77, 0x00, 0x48, 0x65, 0x40, 0x6a, 0x6a, 0x91, 0xa7, 0x4b, 0x10, 0x53, 0x66, 0x8a, 0x90, 0x25,
0x17, 0x6b, 0x62, 0x4a, 0x4a, 0x51, 0xb1, 0x04, 0x93, 0x02, 0xb3, 0x06, 0xb7, 0x91, 0xb2, 0x1e,
0xdc, 0x76, 0x3d, 0x54, 0xa3, 0xc0, 0x5c, 0xd7, 0xbc, 0x92, 0xa2, 0xca, 0x20, 0x88, 0x0e, 0xa9,
0x08, 0x2e, 0x4e, 0xb8, 0x98, 0x90, 0x22, 0x17, 0x0b, 0x48, 0x14, 0x6a, 0x15, 0xef, 0xad, 0x7b,
0xf2, 0x9c, 0x60, 0xab, 0x40, 0x2a, 0x82, 0xc0, 0x52, 0x42, 0x62, 0x5c, 0x6c, 0xa9, 0x15, 0x05,
0x99, 0x45, 0x95, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x50, 0x9e, 0x90, 0x00, 0x17, 0x73,
0x49, 0x49, 0x8e, 0x04, 0x33, 0x58, 0x10, 0xc4, 0x74, 0x52, 0xf8, 0xf1, 0x50, 0x8e, 0xf1, 0xc0,
0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x71,
0xc2, 0x63, 0x39, 0x86, 0x0b, 0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, 0x48, 0x62, 0x03, 0xfb,
0xd8, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xb1, 0x1a, 0x16, 0x43, 0x3b, 0x01, 0x00, 0x00,
}
func (m *AddrBookRecord) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AddrBookRecord) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Id != nil {
dAtA[i] = 0xa
i++
i = encodeVarintPstore(dAtA, i, uint64(m.Id.Size()))
n1, err := m.Id.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
if len(m.Addrs) > 0 {
for _, msg := range m.Addrs {
dAtA[i] = 0x12
i++
i = encodeVarintPstore(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
return i, nil
}
func (m *AddrBookRecord_AddrEntry) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AddrBookRecord_AddrEntry) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Addr != nil {
dAtA[i] = 0xa
i++
i = encodeVarintPstore(dAtA, i, uint64(m.Addr.Size()))
n2, err := m.Addr.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n2
}
if m.Expiry != 0 {
dAtA[i] = 0x10
i++
i = encodeVarintPstore(dAtA, i, uint64(m.Expiry))
}
if m.Ttl != 0 {
dAtA[i] = 0x18
i++
i = encodeVarintPstore(dAtA, i, uint64(m.Ttl))
}
return i, nil
}
func encodeVarintPstore(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func NewPopulatedAddrBookRecord(r randyPstore, easy bool) *AddrBookRecord {
this := &AddrBookRecord{}
this.Id = NewPopulatedProtoPeerID(r)
if r.Intn(10) != 0 {
v1 := r.Intn(5)
this.Addrs = make([]*AddrBookRecord_AddrEntry, v1)
for i := 0; i < v1; i++ {
this.Addrs[i] = NewPopulatedAddrBookRecord_AddrEntry(r, easy)
}
}
if !easy && r.Intn(10) != 0 {
}
return this
}
func NewPopulatedAddrBookRecord_AddrEntry(r randyPstore, easy bool) *AddrBookRecord_AddrEntry {
this := &AddrBookRecord_AddrEntry{}
this.Addr = NewPopulatedProtoAddr(r)
this.Expiry = int64(r.Int63())
if r.Intn(2) == 0 {
this.Expiry *= -1
}
this.Ttl = int64(r.Int63())
if r.Intn(2) == 0 {
this.Ttl *= -1
}
if !easy && r.Intn(10) != 0 {
}
return this
}
type randyPstore interface {
Float32() float32
Float64() float64
Int63() int64
Int31() int32
Uint32() uint32
Intn(n int) int
}
func randUTF8RunePstore(r randyPstore) rune {
ru := r.Intn(62)
if ru < 10 {
return rune(ru + 48)
} else if ru < 36 {
return rune(ru + 55)
}
return rune(ru + 61)
}
func randStringPstore(r randyPstore) string {
v2 := r.Intn(100)
tmps := make([]rune, v2)
for i := 0; i < v2; i++ {
tmps[i] = randUTF8RunePstore(r)
}
return string(tmps)
}
func randUnrecognizedPstore(r randyPstore, maxFieldNumber int) (dAtA []byte) {
l := r.Intn(5)
for i := 0; i < l; i++ {
wire := r.Intn(4)
if wire == 3 {
wire = 5
}
fieldNumber := maxFieldNumber + r.Intn(100)
dAtA = randFieldPstore(dAtA, r, fieldNumber, wire)
}
return dAtA
}
func randFieldPstore(dAtA []byte, r randyPstore, fieldNumber int, wire int) []byte {
key := uint32(fieldNumber)<<3 | uint32(wire)
switch wire {
case 0:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
v3 := r.Int63()
if r.Intn(2) == 0 {
v3 *= -1
}
dAtA = encodeVarintPopulatePstore(dAtA, uint64(v3))
case 1:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
case 2:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
ll := r.Intn(100)
dAtA = encodeVarintPopulatePstore(dAtA, uint64(ll))
for j := 0; j < ll; j++ {
dAtA = append(dAtA, byte(r.Intn(256)))
}
default:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
}
return dAtA
}
func encodeVarintPopulatePstore(dAtA []byte, v uint64) []byte {
for v >= 1<<7 {
dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80))
v >>= 7
}
dAtA = append(dAtA, uint8(v))
return dAtA
}
func (m *AddrBookRecord) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Id != nil {
l = m.Id.Size()
n += 1 + l + sovPstore(uint64(l))
}
if len(m.Addrs) > 0 {
for _, e := range m.Addrs {
l = e.Size()
n += 1 + l + sovPstore(uint64(l))
}
}
return n
}
func (m *AddrBookRecord_AddrEntry) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Addr != nil {
l = m.Addr.Size()
n += 1 + l + sovPstore(uint64(l))
}
if m.Expiry != 0 {
n += 1 + sovPstore(uint64(m.Expiry))
}
if m.Ttl != 0 {
n += 1 + sovPstore(uint64(m.Ttl))
}
return n
}
func sovPstore(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozPstore(x uint64) (n int) {
return sovPstore(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *AddrBookRecord) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AddrBookRecord: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AddrBookRecord: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthPstore
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
var v ProtoPeerID
m.Id = &v
if err := m.Id.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthPstore
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Addrs = append(m.Addrs, &AddrBookRecord_AddrEntry{})
if err := m.Addrs[len(m.Addrs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPstore(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPstore
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPstore
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *AddrBookRecord_AddrEntry) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AddrEntry: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AddrEntry: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthPstore
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
var v ProtoAddr
m.Addr = &v
if err := m.Addr.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Expiry", wireType)
}
m.Expiry = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Expiry |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
}
m.Ttl = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Ttl |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipPstore(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPstore
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthPstore
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPstore(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthPstore
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthPstore
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipPstore(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthPstore
}
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthPstore = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPstore = fmt.Errorf("proto: integer overflow")
)

27
pb/pstore.proto Normal file
View File

@ -0,0 +1,27 @@
syntax = "proto3";
package pstore.pb;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.benchgen_all) = true;
option (gogoproto.populate_all) = true;
// AddrBookRecord represents a record for a peer in the address book.
message AddrBookRecord {
// The peer ID.
bytes id = 1 [(gogoproto.customtype) = "ProtoPeerID"];
// The multiaddresses. This is a sorted list where element 0 expires the soonest.
repeated AddrEntry addrs = 2;
// AddrEntry represents a single multiaddress.
message AddrEntry {
bytes addr = 1 [(gogoproto.customtype) = "ProtoAddr"];
// The point in time when this address expires.
int64 expiry = 2;
// The original TTL of this address.
int64 ttl = 3;
}
}

129
pb/pstorepb_test.go Normal file
View File

@ -0,0 +1,129 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pstore.proto
package pstore_pb
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/proto"
math "math"
math_rand "math/rand"
testing "testing"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
func BenchmarkAddrBookRecordProtoMarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord, 10000)
for i := 0; i < 10000; i++ {
pops[i] = NewPopulatedAddrBookRecord(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(pops[i%10000])
if err != nil {
panic(err)
}
total += len(dAtA)
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecordProtoUnmarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
datas := make([][]byte, 10000)
for i := 0; i < 10000; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(NewPopulatedAddrBookRecord(popr, false))
if err != nil {
panic(err)
}
datas[i] = dAtA
}
msg := &AddrBookRecord{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += len(datas[i%10000])
if err := github_com_gogo_protobuf_proto.Unmarshal(datas[i%10000], msg); err != nil {
panic(err)
}
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecord_AddrEntryProtoMarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord_AddrEntry, 10000)
for i := 0; i < 10000; i++ {
pops[i] = NewPopulatedAddrBookRecord_AddrEntry(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(pops[i%10000])
if err != nil {
panic(err)
}
total += len(dAtA)
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecord_AddrEntryProtoUnmarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
datas := make([][]byte, 10000)
for i := 0; i < 10000; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(NewPopulatedAddrBookRecord_AddrEntry(popr, false))
if err != nil {
panic(err)
}
datas[i] = dAtA
}
msg := &AddrBookRecord_AddrEntry{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += len(datas[i%10000])
if err := github_com_gogo_protobuf_proto.Unmarshal(datas[i%10000], msg); err != nil {
panic(err)
}
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecordSize(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord, 1000)
for i := 0; i < 1000; i++ {
pops[i] = NewPopulatedAddrBookRecord(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += pops[i%1000].Size()
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecord_AddrEntrySize(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord_AddrEntry, 1000)
for i := 0; i < 1000; i++ {
pops[i] = NewPopulatedAddrBookRecord_AddrEntry(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += pops[i%1000].Size()
}
b.SetBytes(int64(total / b.N))
}
//These tests are generated by github.com/gogo/protobuf/plugin/testgen

View File

@ -2,6 +2,7 @@ package peerstore
import (
"fmt"
"io"
"sync"
peer "github.com/libp2p/go-libp2p-peer"
@ -31,6 +32,26 @@ func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore {
}
}
func (ps *peerstore) Close() (err error) {
var errs []error
weakClose := func(name string, c interface{}) {
if cl, ok := c.(io.Closer); ok {
if err = cl.Close(); err != nil {
errs = append(errs, fmt.Errorf("%s error: %s", name, err))
}
}
}
weakClose("keybook", ps.KeyBook)
weakClose("addressbook", ps.AddrBook)
weakClose("peermetadata", ps.PeerMetadata)
if len(errs) > 0 {
return fmt.Errorf("failed while closing peerstore; err(s): %q", errs)
}
return nil
}
func (ps *peerstore) Peers() peer.IDSlice {
set := map[peer.ID]struct{}{}
for _, p := range ps.PeersWithKeys() {

View File

@ -2,50 +2,25 @@ package pstoreds
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sort"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
base32 "github.com/whyrusleeping/base32"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
pb "github.com/libp2p/go-libp2p-peerstore/pb"
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
lru "github.com/hashicorp/golang-lru"
ma "github.com/multiformats/go-multiaddr"
b32 "github.com/whyrusleeping/base32"
)
var (
log = logging.Logger("peerstore/ds")
// The maximum representable value in time.Time is time.Unix(1<<63-62135596801, 999999999).
// But it's too brittle and implementation-dependent, so we prefer to use 1<<62, which is in the
// year 146138514283. We're safe.
maxTime = time.Unix(1<<62, 0)
ErrTTLDatastore = errors.New("datastore must provide TTL support")
)
// Peer addresses are stored under the following db key pattern:
// /peers/addr/<b32 peer id no padding>/<hash of maddr>
var abBase = ds.NewKey("/peers/addrs")
var _ pstore.AddrBook = (*dsAddrBook)(nil)
// dsAddrBook is an address book backed by a Datastore with both an
// in-memory TTL manager and an in-memory address stream manager.
type dsAddrBook struct {
cache cache
ds ds.TxnDatastore
subsManager *pstoremem.AddrSubManager
writeRetries int
}
type ttlWriteMode int
const (
@ -53,347 +28,283 @@ const (
ttlExtend
)
type cacheEntry struct {
expiration time.Time
addrs []ma.Multiaddr
var (
log = logging.Logger("peerstore/ds")
// Peer addresses are stored db key pattern:
// /peers/addrs/<b32 peer id no padding>
addrBookBase = ds.NewKey("/peers/addrs")
)
// addrsRecord decorates the AddrBookRecord with locks and metadata.
type addrsRecord struct {
sync.RWMutex
*pb.AddrBookRecord
dirty bool
}
type addrRecord struct {
ttl time.Duration
addr ma.Multiaddr
}
// flush writes the record to the datastore by calling ds.Put, unless the record is
// marked for deletion, in which case we call ds.Delete. To be called within a lock.
func (r *addrsRecord) flush(write ds.Write) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID)))
if len(r.Addrs) == 0 {
if err = write.Delete(key); err == nil {
r.dirty = false
}
return err
}
func (ar *addrRecord) MarshalBinary() ([]byte, error) {
ttlB := make([]byte, 8)
binary.LittleEndian.PutUint64(ttlB, uint64(ar.ttl))
return append(ttlB, ar.addr.Bytes()...), nil
}
func (ar *addrRecord) UnmarshalBinary(b []byte) error {
ar.ttl = time.Duration(binary.LittleEndian.Uint64(b))
// this had been serialized by us, no need to check for errors
ar.addr, _ = ma.NewMultiaddrBytes(b[8:])
data, err := r.Marshal()
if err != nil {
return err
}
if err = write.Put(key, data); err != nil {
return err
}
// write succeeded; record is no longer dirty.
r.dirty = false
return nil
}
// NewAddrBook initializes a new address book given a
// Datastore instance, a context for managing the TTL manager,
// and the interval at which the TTL manager should sweep the Datastore.
func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (*dsAddrBook, error) {
if _, ok := store.(ds.TTLDatastore); !ok {
return nil, ErrTTLDatastore
// clean is called on records to perform housekeeping. The return value indicates if the record was changed
// as a result of this call.
//
// clean does the following:
// * sorts addresses by expiration (soonest expiring first).
// * removes expired addresses.
//
// It short-circuits optimistically when there's nothing to do.
//
// clean is called from several points:
// * when accessing an entry.
// * when performing periodic GC.
// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.)
//
// If the return value is true, the caller should perform a flush immediately to sync the record with the store.
func (r *addrsRecord) clean() (chgd bool) {
now := time.Now().Unix()
if !r.dirty && len(r.Addrs) > 0 && r.Addrs[0].Expiry > now {
// record is not dirty, and we have no expired entries to purge.
return false
}
var (
cache cache = &noopCache{}
err error
)
if len(r.Addrs) == 0 {
// this is a ghost record; let's signal it has to be written.
// flush() will take care of doing the deletion.
return true
}
if opts.CacheSize > 0 {
if cache, err = lru.NewARC(int(opts.CacheSize)); err != nil {
return nil, err
if r.dirty && len(r.Addrs) > 1 {
// the record has been modified, so it may need resorting.
// we keep addresses sorted by expiration, where 0 is the soonest expiring.
sort.Slice(r.Addrs, func(i, j int) bool {
return r.Addrs[i].Expiry < r.Addrs[j].Expiry
})
}
// since addresses are sorted by expiration, we find the first
// survivor and split the slice on its index.
pivot := -1
for i, addr := range r.Addrs {
if addr.Expiry > now {
break
}
pivot = i
}
mgr := &dsAddrBook{
cache: cache,
ds: store,
subsManager: pstoremem.NewAddrSubManager(),
writeRetries: int(opts.WriteRetries),
}
return mgr, nil
r.Addrs = r.Addrs[pivot+1:]
return r.dirty || pivot >= 0
}
func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) {
var (
keys = make([]ds.Key, len(addrs))
clean = make([]ma.Multiaddr, len(addrs))
parentKey = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
i = 0
)
// dsAddrBook is an address book backed by a Datastore with a GC procedure to purge expired entries. It uses an
// in-memory address stream manager. See the NewAddrBook for more information.
type dsAddrBook struct {
ctx context.Context
opts Options
for _, addr := range addrs {
if addr == nil {
continue
}
cache cache
ds ds.Batching
gc *dsAddrBookGc
subsManager *pstoremem.AddrSubManager
hash, err := mh.Sum((addr).Bytes(), mh.MURMUR3, -1)
if err != nil {
return nil, nil, err
}
keys[i] = parentKey.ChildString(base32.RawStdEncoding.EncodeToString(hash))
clean[i] = addr
i++
// controls children goroutine lifetime.
childrenDone sync.WaitGroup
cancelFn func()
}
var _ pstore.AddrBook = (*dsAddrBook)(nil)
// NewAddrBook initializes a new datastore-backed address book. It serves as a drop-in replacement for pstoremem
// (memory-backed peerstore), and works with any datastore implementing the ds.Batching interface.
//
// Addresses and peer records are serialized into protobuf, storing one datastore entry per peer, along with metadata
// to control address expiration. To alleviate disk access and serde overhead, we internally use a read/write-through
// ARC cache, the size of which is adjustable via Options.CacheSize.
//
// The user has a choice of two GC algorithms:
//
// - lookahead GC: minimises the amount of full store traversals by maintaining a time-indexed list of entries that
// need to be visited within the period specified in Options.GCLookaheadInterval. This is useful in scenarios with
// considerable TTL variance, coupled with datastores whose native iterators return entries in lexicographical key
// order. Enable this mode by passing a value Options.GCLookaheadInterval > 0. Lookahead windows are jumpy, not
// sliding. Purges operate exclusively over the lookahead window with periodicity Options.GCPurgeInterval.
//
// - full-purge GC (default): performs a full visit of the store with periodicity Options.GCPurgeInterval. Useful when
// the range of possible TTL values is small and the values themselves are also extreme, e.g. 10 minutes or
// permanent, popular values used in other libp2p modules. In this cited case, optimizing with lookahead windows
// makes little sense.
func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAddrBook, err error) {
ctx, cancelFn := context.WithCancel(ctx)
ab = &dsAddrBook{
ctx: ctx,
ds: store,
opts: opts,
cancelFn: cancelFn,
subsManager: pstoremem.NewAddrSubManager(),
}
return keys[:i], clean[:i], nil
if opts.CacheSize > 0 {
if ab.cache, err = lru.NewARC(int(opts.CacheSize)); err != nil {
return nil, err
}
} else {
ab.cache = new(noopCache)
}
if ab.gc, err = newAddressBookGc(ctx, ab); err != nil {
return nil, err
}
return ab, nil
}
func (ab *dsAddrBook) Close() error {
ab.cancelFn()
ab.childrenDone.Wait()
return nil
}
// loadRecord is a read-through fetch. It fetches a record from cache, falling back to the
// datastore upon a miss, and returning a newly initialized record if the peer doesn't exist.
//
// loadRecord calls clean() on an existing record before returning it. If the record changes
// as a result and the update argument is true, the resulting state is saved in the datastore.
//
// If the cache argument is true, the record is inserted in the cache when loaded from the datastore.
func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) {
if e, ok := ab.cache.Get(id); ok {
pr = e.(*addrsRecord)
pr.Lock()
defer pr.Unlock()
if pr.clean() && update {
err = pr.flush(ab.ds)
}
return pr, err
}
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
data, err := ab.ds.Get(key)
switch err {
case ds.ErrNotFound:
err = nil
pr.Id = &pb.ProtoPeerID{ID: id}
case nil:
if err = pr.Unmarshal(data); err != nil {
return nil, err
}
// this record is new and local for now (not in cache), so we don't need to lock.
if pr.clean() && update {
err = pr.flush(ab.ds)
}
default:
return nil, err
}
if cache {
ab.cache.Add(id, pr)
}
return pr, err
}
// AddAddr will add a new address if it's not already in the AddrBook.
func (mgr *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
func (ab *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ab.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs will add many new addresses if they're not already in the AddrBook.
func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
return
}
mgr.setAddrs(p, addrs, ttl, ttlExtend)
addrs = cleanAddrs(addrs)
ab.setAddrs(p, addrs, ttl, ttlExtend)
}
// SetAddr will add or update the TTL of an address in the AddrBook.
func (mgr *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
addrs := []ma.Multiaddr{addr}
mgr.SetAddrs(p, addrs, ttl)
func (ab *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ab.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
func (ab *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
addrs = cleanAddrs(addrs)
if ttl <= 0 {
mgr.deleteAddrs(p, addrs)
ab.deleteAddrs(p, addrs)
return
}
mgr.setAddrs(p, addrs, ttl, ttlOverride)
}
func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error {
// Keys and cleaned up addresses.
keys, addrs, err := keysAndAddrs(p, addrs)
if err != nil {
return err
}
mgr.cache.Remove(p)
// Attempt transactional KV deletion.
for i := 0; i < mgr.writeRetries; i++ {
if err = mgr.dbDelete(keys); err == nil {
break
}
log.Errorf("failed to delete addresses for peer %s: %s\n", p.Pretty(), err)
}
if err != nil {
log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err)
return err
}
return nil
}
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) error {
// Keys and cleaned up addresses.
keys, addrs, err := keysAndAddrs(p, addrs)
if err != nil {
return err
}
mgr.cache.Remove(p)
// Attempt transactional KV insertion.
var existed []bool
for i := 0; i < mgr.writeRetries; i++ {
if existed, err = mgr.dbInsert(keys, addrs, ttl, mode); err == nil {
break
}
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
}
if err != nil {
log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err)
return err
}
// Update was successful, so broadcast event only for new addresses.
for i, _ := range keys {
if !existed[i] {
mgr.subsManager.BroadcastAddr(p, addrs[i])
}
}
return nil
}
// dbInsert performs a transactional insert of the provided keys and values.
func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) ([]bool, error) {
var (
err error
existed = make([]bool, len(keys))
exp = time.Now().Add(ttl)
)
txn, err := mgr.ds.NewTransaction(false)
if err != nil {
return nil, err
}
defer txn.Discard()
ttltxn := txn.(ds.TTLDatastore)
for i, key := range keys {
// Check if the key existed previously.
if existed[i], err = ttltxn.Has(key); err != nil {
log.Errorf("transaction failed and aborted while checking key existence: %s, cause: %v", key.String(), err)
return nil, err
}
// The key embeds a hash of the value, so if it existed, we can safely skip the insert and
// just update the TTL.
if existed[i] {
switch mode {
case ttlOverride:
err = ttltxn.SetTTL(key, ttl)
case ttlExtend:
var curr time.Time
if curr, err = ttltxn.GetExpiration(key); err == nil && exp.After(curr) {
err = ttltxn.SetTTL(key, ttl)
}
}
if err != nil {
// mode will be printed as an int
log.Errorf("failed while updating the ttl for key: %s, mode: %v, cause: %v", key.String(), mode, err)
return nil, err
}
continue
}
r := &addrRecord{
ttl: ttl,
addr: addrs[i],
}
value, _ := r.MarshalBinary()
if err = ttltxn.PutWithTTL(key, value, ttl); err != nil {
log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err)
return nil, err
}
}
if err = txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when setting keys, cause: %v", err)
return nil, err
}
return existed, nil
ab.setAddrs(p, addrs, ttl, ttlOverride)
}
// UpdateAddrs will update any addresses for a given peer and TTL combination to
// have a new TTL.
func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
mgr.cache.Remove(p)
var err error
for i := 0; i < mgr.writeRetries; i++ {
if err = mgr.dbUpdateTTL(p, oldTTL, newTTL); err == nil {
break
}
log.Errorf("failed to update ttlsfor peer %s: %s\n", p.Pretty(), err)
}
func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
log.Errorf("failed to avoid write conflict when updating ttls for peer %s after %d retries: %v\n",
p.Pretty(), mgr.writeRetries, err)
log.Errorf("failed to update ttls for peer %s: %s\n", p.Pretty(), err)
}
}
func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error {
var (
prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
q = query.Query{Prefix: prefix.String(), KeysOnly: false}
results query.Results
err error
)
pr.Lock()
defer pr.Unlock()
txn, err := mgr.ds.NewTransaction(false)
if err != nil {
return err
}
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
return err
}
defer results.Close()
ttltxn := txn.(ds.TTLDatastore)
r := &addrRecord{}
for result := range results.Next() {
r.UnmarshalBinary(result.Value)
if r.ttl != oldTTL {
newExp := time.Now().Add(newTTL).Unix()
for _, entry := range pr.Addrs {
if entry.Ttl != int64(oldTTL) {
continue
}
r.ttl = newTTL
value, _ := r.MarshalBinary()
if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), value, newTTL); err != nil {
return err
}
entry.Ttl, entry.Expiry = int64(newTTL), newExp
pr.dirty = true
}
if err := txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when updating ttls, cause: %v", err)
return err
if pr.clean() {
pr.flush(ab.ds)
}
return nil
}
// Addrs returns all of the non-expired addresses for a given peer.
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
var (
prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true}
results query.Results
err error
)
// Check the cache and return the entry only if it hasn't expired; if expired, remove.
if e, ok := mgr.cache.Get(p); ok {
entry := e.(cacheEntry)
if entry.expiration.After(time.Now()) {
addrs := make([]ma.Multiaddr, len(entry.addrs))
copy(addrs, entry.addrs)
return addrs
} else {
mgr.cache.Remove(p)
}
}
txn, err := mgr.ds.NewTransaction(true)
func (ab *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
pr, err := ab.loadRecord(p, true, true)
if err != nil {
log.Warning("failed to load peerstore entry for peer %v while querying addrs, err: %v", p, err)
return nil
}
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
log.Error(err)
return nil
pr.RLock()
defer pr.RUnlock()
addrs := make([]ma.Multiaddr, 0, len(pr.Addrs))
for _, a := range pr.Addrs {
addrs = append(addrs, a.Addr)
}
defer results.Close()
var addrs []ma.Multiaddr
var r addrRecord
// used to set the expiration for the entire cache entry
earliestExp := maxTime
for result := range results.Next() {
if err = r.UnmarshalBinary(result.Value); err == nil {
addrs = append(addrs, r.addr)
}
if exp := result.Expiration; !exp.IsZero() && exp.Before(earliestExp) {
earliestExp = exp
}
}
// Store a copy in the cache.
addrsCpy := make([]ma.Multiaddr, len(addrs))
copy(addrsCpy, addrs)
entry := cacheEntry{addrs: addrsCpy, expiration: earliestExp}
mgr.cache.Add(p, entry)
return addrs
}
// Peers returns all of the peer IDs for which the AddrBook has addresses.
func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
ids, err := uniquePeerIds(mgr.ds, abBase, func(result query.Result) string {
return ds.RawKey(result.Key).Parent().Name()
func (ab *dsAddrBook) PeersWithAddrs() peer.IDSlice {
ids, err := uniquePeerIds(ab.ds, addrBookBase, func(result query.Result) string {
return ds.RawKey(result.Key).Name()
})
if err != nil {
log.Errorf("error while retrieving peers with addresses: %v", err)
@ -403,107 +314,114 @@ func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := mgr.Addrs(p)
return mgr.subsManager.AddrStream(ctx, p, initial)
func (ab *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := ab.Addrs(p)
return ab.subsManager.AddrStream(ctx, p, initial)
}
// ClearAddrs will delete all known addresses for a peer ID.
func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
var (
err error
prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
deleteFn func() error
)
func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
ab.cache.Remove(p)
if e, ok := mgr.cache.Peek(p); ok {
mgr.cache.Remove(p)
keys, _, _ := keysAndAddrs(p, e.(cacheEntry).addrs)
deleteFn = func() error {
return mgr.dbDelete(keys)
}
} else {
deleteFn = func() error {
return mgr.dbDeleteIter(prefix)
}
}
// Attempt transactional KV deletion.
for i := 0; i < mgr.writeRetries; i++ {
if err = deleteFn(); err == nil {
break
}
log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err)
}
if err != nil {
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), mgr.writeRetries)
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p)))
if err := ab.ds.Delete(key); err != nil {
log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err)
}
}
// dbDelete transactionally deletes the provided keys.
func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error {
var err error
txn, err := mgr.ds.NewTransaction(false)
func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) (err error) {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
return err
return fmt.Errorf("failed to load peerstore entry for peer %v while setting addrs, err: %v", p, err)
}
defer txn.Discard()
for _, key := range keys {
if err = txn.Delete(key); err != nil {
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
return err
pr.Lock()
defer pr.Unlock()
newExp := time.Now().Add(ttl).Unix()
existed := make([]bool, len(addrs)) // keeps track of which addrs we found.
Outer:
for i, incoming := range addrs {
for _, have := range pr.Addrs {
if incoming.Equal(have.Addr) {
existed[i] = true
if mode == ttlExtend && have.Expiry > newExp {
// if we're only extending TTLs but the addr already has a longer one, we skip it.
continue Outer
}
have.Expiry = newExp
// we found the address, and addresses cannot be duplicate,
// so let's move on to the next.
continue Outer
}
}
}
if err = txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
return err
// add addresses we didn't hold.
var added []*pb.AddrBookRecord_AddrEntry
for i, e := range existed {
if e {
continue
}
addr := addrs[i]
entry := &pb.AddrBookRecord_AddrEntry{
Addr: &pb.ProtoAddr{Multiaddr: addr},
Ttl: int64(ttl),
Expiry: newExp,
}
added = append(added, entry)
// note: there's a minor chance that writing the record will fail, in which case we would've broadcast
// the addresses without persisting them. This is very unlikely and not much of an issue.
ab.subsManager.BroadcastAddr(p, addr)
}
return nil
pr.Addrs = append(pr.Addrs, added...)
pr.dirty = true
pr.clean()
return pr.flush(ab.ds)
}
// dbDeleteIter removes all entries whose keys are prefixed with the argument.
// it returns a slice of the removed keys in case it's needed
func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) error {
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
txn, err := mgr.ds.NewTransaction(false)
func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
pr, err := ab.loadRecord(p, false, false)
if err != nil {
return err
}
defer txn.Discard()
results, err := txn.Query(q)
if err != nil {
log.Errorf("failed to fetch all keys prefixed with: %s, cause: %v", prefix.String(), err)
return err
return fmt.Errorf("failed to load peerstore entry for peer %v while deleting addrs, err: %v", p, err)
}
var keys = make([]ds.Key, 0, 4) // cap: 4 to reduce allocs
var key ds.Key
for result := range results.Next() {
key = ds.RawKey(result.Key)
keys = append(keys, key)
if pr.Addrs == nil {
return nil
}
if err = txn.Delete(key); err != nil {
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
return err
pr.Lock()
defer pr.Unlock()
// deletes addresses in place, and avoiding copies until we encounter the first deletion.
survived := 0
for i, addr := range pr.Addrs {
for _, del := range addrs {
if addr.Addr.Equal(del) {
continue
}
if i != survived {
pr.Addrs[survived] = pr.Addrs[i]
}
survived++
}
}
pr.Addrs = pr.Addrs[:survived]
if err = results.Close(); err != nil {
log.Errorf("failed to close cursor, cause: %v", err)
return err
}
if err = txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
return err
}
return nil
pr.dirty = true
pr.clean()
return pr.flush(ab.ds)
}
func cleanAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
clean := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
if addr == nil {
continue
}
clean = append(clean, addr)
}
return clean
}

388
pstoreds/addr_book_gc.go Normal file
View File

@ -0,0 +1,388 @@
package pstoreds
import (
"context"
"fmt"
"strconv"
"time"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
peer "github.com/libp2p/go-libp2p-peer"
pb "github.com/libp2p/go-libp2p-peerstore/pb"
b32 "github.com/whyrusleeping/base32"
)
var (
// GC lookahead entries are stored in key pattern:
// /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32> => nil
// in databases with lexicographical key order, this time-indexing allows us to visit
// only the timeslice we are interested in.
gcLookaheadBase = ds.NewKey("/peers/gc/addrs")
// queries
purgeLookaheadQuery = query.Query{
Prefix: gcLookaheadBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: true,
}
purgeStoreQuery = query.Query{
Prefix: addrBookBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: false,
}
populateLookaheadQuery = query.Query{
Prefix: addrBookBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: true,
}
)
// dsAddrBookGc is responsible for garbage collection in a datastore-backed address book.
type dsAddrBookGc struct {
ctx context.Context
ab *dsAddrBook
running chan struct{}
lookaheadEnabled bool
purgeFunc func()
currWindowEnd int64
}
func newAddressBookGc(ctx context.Context, ab *dsAddrBook) (*dsAddrBookGc, error) {
if ab.opts.GCPurgeInterval < 0 {
return nil, fmt.Errorf("negative GC purge interval provided: %s", ab.opts.GCPurgeInterval)
}
if ab.opts.GCLookaheadInterval < 0 {
return nil, fmt.Errorf("negative GC lookahead interval provided: %s", ab.opts.GCLookaheadInterval)
}
if ab.opts.GCInitialDelay < 0 {
return nil, fmt.Errorf("negative GC initial delay provided: %s", ab.opts.GCInitialDelay)
}
if ab.opts.GCLookaheadInterval > 0 && ab.opts.GCLookaheadInterval < ab.opts.GCPurgeInterval {
return nil, fmt.Errorf("lookahead interval must be larger than purge interval, respectively: %s, %s",
ab.opts.GCLookaheadInterval, ab.opts.GCPurgeInterval)
}
lookaheadEnabled := ab.opts.GCLookaheadInterval > 0
gc := &dsAddrBookGc{
ctx: ctx,
ab: ab,
running: make(chan struct{}, 1),
lookaheadEnabled: lookaheadEnabled,
}
if lookaheadEnabled {
gc.purgeFunc = gc.purgeLookahead
} else {
gc.purgeFunc = gc.purgeStore
}
// do not start GC timers if purge is disabled; this GC can only be triggered manually.
if ab.opts.GCPurgeInterval > 0 {
gc.ab.childrenDone.Add(1)
go gc.background()
}
return gc, nil
}
// gc prunes expired addresses from the datastore at regular intervals. It should be spawned as a goroutine.
func (gc *dsAddrBookGc) background() {
defer gc.ab.childrenDone.Done()
select {
case <-time.After(gc.ab.opts.GCInitialDelay):
case <-gc.ab.ctx.Done():
// yield if we have been cancelled/closed before the delay elapses.
return
}
purgeTimer := time.NewTicker(gc.ab.opts.GCPurgeInterval)
defer purgeTimer.Stop()
var lookaheadCh <-chan time.Time
if gc.lookaheadEnabled {
lookaheadTimer := time.NewTicker(gc.ab.opts.GCLookaheadInterval)
lookaheadCh = lookaheadTimer.C
gc.populateLookahead() // do a lookahead now
defer lookaheadTimer.Stop()
}
for {
select {
case <-purgeTimer.C:
gc.purgeFunc()
case <-lookaheadCh:
// will never trigger if lookahead is disabled (nil Duration).
gc.populateLookahead()
case <-gc.ctx.Done():
return
}
}
}
// purgeCycle runs a single GC purge cycle. It operates within the lookahead window if lookahead is enabled; else it
// visits all entries in the datastore, deleting the addresses that have expired.
func (gc *dsAddrBookGc) purgeLookahead() {
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if lookahead is running.
return
}
var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err)
}
// This function drops an unparseable GC entry; this is for safety. It is an escape hatch in case
// we modify the format of keys going forward. If a user runs a new version against an old DB,
// if we don't clean up unparseable entries we'll end up accumulating garbage.
dropInError := func(key ds.Key, err error, msg string) {
if err != nil {
log.Warningf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err)
}
if err = batch.Delete(key); err != nil {
log.Warningf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err)
}
}
// This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit
// if the next earliest expiry falls within the current window again.
dropOrReschedule := func(key ds.Key, ar *addrsRecord) {
if err := batch.Delete(key); err != nil {
log.Warningf("failed to delete lookahead entry: %v, err: %v", key, err)
}
// re-add the record if it needs to be visited again in this window.
if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name()))
if err := batch.Put(gcKey, []byte{}); err != nil {
log.Warningf("failed to add new GC key: %v, err: %v", gcKey, err)
}
}
}
results, err := gc.ab.ds.Query(purgeLookaheadQuery)
if err != nil {
log.Warningf("failed while fetching entries to purge: %v", err)
return
}
defer results.Close()
now := time.Now().Unix()
// keys: /peers/gc/addrs/<unix timestamp of next visit>/<peer ID b32>
// values: nil
for result := range results.Next() {
gcKey := ds.RawKey(result.Key)
ts, err := strconv.ParseInt(gcKey.Parent().Name(), 10, 64)
if err != nil {
dropInError(gcKey, err, "parsing timestamp")
log.Warningf("failed while parsing timestamp from key: %v, err: %v", result.Key, err)
continue
} else if ts > now {
// this is an ordered cursor; when we hit an entry with a timestamp beyond now, we can break.
break
}
idb32, err := b32.RawStdEncoding.DecodeString(gcKey.Name())
if err != nil {
dropInError(gcKey, err, "parsing peer ID")
log.Warningf("failed while parsing b32 peer ID from key: %v, err: %v", result.Key, err)
continue
}
id, err = peer.IDFromBytes(idb32)
if err != nil {
dropInError(gcKey, err, "decoding peer ID")
log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
continue
}
// if the record is in cache, we clean it and flush it if necessary.
if e, ok := gc.ab.cache.Peek(id); ok {
cached := e.(*addrsRecord)
cached.Lock()
if cached.clean() {
if err = cached.flush(batch); err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
}
dropOrReschedule(gcKey, cached)
cached.Unlock()
continue
}
record.Reset()
// otherwise, fetch it from the store, clean it and flush it.
entryKey := addrBookBase.ChildString(gcKey.Name())
val, err := gc.ab.ds.Get(entryKey)
if err != nil {
// captures all errors, including ErrNotFound.
dropInError(gcKey, err, "fetching entry")
continue
}
err = record.Unmarshal(val)
if err != nil {
dropInError(gcKey, err, "unmarshalling entry")
continue
}
if record.clean() {
err = record.flush(batch)
if err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err)
}
}
dropOrReschedule(gcKey, record)
}
if err = batch.Commit(); err != nil {
log.Warningf("failed to commit GC purge batch: %v", err)
}
}
func (gc *dsAddrBookGc) purgeStore() {
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if lookahead is running.
return
}
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to purge GC entries: %v", err)
}
results, err := gc.ab.ds.Query(purgeStoreQuery)
if err != nil {
log.Warningf("failed while opening iterator: %v", err)
return
}
defer results.Close()
// keys: /peers/addrs/<peer ID b32>
for result := range results.Next() {
record.Reset()
if err = record.Unmarshal(result.Value); err != nil {
// TODO log
continue
}
id := record.Id.ID
if !record.clean() {
continue
}
if err := record.flush(batch); err != nil {
log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id, err)
}
gc.ab.cache.Remove(id)
}
if err = batch.Commit(); err != nil {
log.Warningf("failed to commit GC purge batch: %v", err)
}
}
// populateLookahead populates the lookahead window by scanning the entire store and picking entries whose earliest
// expiration falls within the window period.
//
// Those entries are stored in the lookahead region in the store, indexed by the timestamp when they need to be
// visited, to facilitate temporal range scans.
func (gc *dsAddrBookGc) populateLookahead() {
if gc.ab.opts.GCLookaheadInterval == 0 {
return
}
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if something's running.
return
}
until := time.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix()
var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
results, err := gc.ab.ds.Query(populateLookaheadQuery)
if err != nil {
log.Warningf("failed while querying to populate lookahead GC window: %v", err)
return
}
defer results.Close()
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warningf("failed while creating batch to populate lookahead GC window: %v", err)
return
}
for result := range results.Next() {
idb32 := ds.RawKey(result.Key).Name()
k, err := b32.RawStdEncoding.DecodeString(idb32)
if err != nil {
log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
continue
}
if id, err = peer.IDFromBytes(k); err != nil {
log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err)
}
// if the record is in cache, use the cached version.
if e, ok := gc.ab.cache.Peek(id); ok {
cached := e.(*addrsRecord)
cached.RLock()
if len(cached.Addrs) == 0 || cached.Addrs[0].Expiry > until {
cached.RUnlock()
continue
}
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
log.Warningf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
cached.RUnlock()
continue
}
record.Reset()
val, err := gc.ab.ds.Get(ds.RawKey(result.Key))
if err != nil {
log.Warningf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err)
continue
}
if err := record.Unmarshal(val); err != nil {
log.Warningf("failed while unmarshalling record from store for peer: %v, err: %v", id.Pretty(), err)
continue
}
if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil {
log.Warningf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
}
}
}
if err = batch.Commit(); err != nil {
log.Warningf("failed to commit GC lookahead batch: %v", err)
}
gc.currWindowEnd = until
}

View File

@ -0,0 +1,254 @@
package pstoreds
import (
"testing"
"time"
query "github.com/ipfs/go-datastore/query"
pstore "github.com/libp2p/go-libp2p-peerstore"
test "github.com/libp2p/go-libp2p-peerstore/test"
ma "github.com/multiformats/go-multiaddr"
)
var lookaheadQuery = query.Query{Prefix: gcLookaheadBase.String(), KeysOnly: true}
type testProbe struct {
t *testing.T
ab pstore.AddrBook
}
func (tp *testProbe) countLookaheadEntries() (i int) {
results, err := tp.ab.(*dsAddrBook).ds.Query(lookaheadQuery)
if err != nil {
tp.t.Fatal(err)
}
defer results.Close()
for range results.Next() {
i++
}
return i
}
func (tp *testProbe) clearCache() {
for _, k := range tp.ab.(*dsAddrBook).cache.Keys() {
tp.ab.(*dsAddrBook).cache.Remove(k)
}
}
func TestGCLookahead(t *testing.T) {
opts := DefaultOpts()
// effectively disable automatic GC for this test.
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 10 * time.Second
opts.GCPurgeInterval = 1 * time.Second
factory := addressBookFactory(t, badgerStore, opts)
ab, closeFn := factory()
gc := ab.(*dsAddrBook).gc
defer closeFn()
tp := &testProbe{t, ab}
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
// lookahead is 10 seconds, so these entries will be outside the lookahead window.
ab.AddAddrs(ids[0], addrs[:10], time.Hour)
ab.AddAddrs(ids[1], addrs[10:20], time.Hour)
ab.AddAddrs(ids[2], addrs[20:30], time.Hour)
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no GC lookahead entries, got: %v", i)
}
// change addresses of a peer to have TTL 1 second, placing them in the lookahead window.
ab.UpdateAddrs(ids[1], time.Hour, time.Second)
// Purge the cache, to exercise a different path in the lookahead cycle.
tp.clearCache()
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 GC lookahead entry, got: %v", i)
}
// change addresses of another to have TTL 5 second, placing them in the lookahead window.
ab.UpdateAddrs(ids[2], time.Hour, 5*time.Second)
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 2 {
t.Errorf("expected 2 GC lookahead entries, got: %v", i)
}
}
func TestGCPurging(t *testing.T) {
opts := DefaultOpts()
// effectively disable automatic GC for this test.
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 20 * time.Second
opts.GCPurgeInterval = 1 * time.Second
factory := addressBookFactory(t, badgerStore, opts)
ab, closeFn := factory()
gc := ab.(*dsAddrBook).gc
defer closeFn()
tp := &testProbe{t, ab}
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
// stagger addresses within the lookahead window, but stagger them.
ab.AddAddrs(ids[0], addrs[:10], 1*time.Second)
ab.AddAddrs(ids[1], addrs[30:40], 1*time.Second)
ab.AddAddrs(ids[2], addrs[60:70], 1*time.Second)
ab.AddAddrs(ids[0], addrs[10:20], 4*time.Second)
ab.AddAddrs(ids[1], addrs[40:50], 4*time.Second)
ab.AddAddrs(ids[0], addrs[20:30], 10*time.Second)
ab.AddAddrs(ids[1], addrs[50:60], 10*time.Second)
// this is inside the window, but it will survive the purges we do in the test.
ab.AddAddrs(ids[3], addrs[70:80], 15*time.Second)
gc.populateLookahead()
if i := tp.countLookaheadEntries(); i != 4 {
t.Errorf("expected 4 GC lookahead entries, got: %v", i)
}
<-time.After(2 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}
// Purge the cache, to exercise a different path in the purge cycle.
tp.clearCache()
<-time.After(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 3 {
t.Errorf("expected 3 GC lookahead entries, got: %v", i)
}
<-time.After(5 * time.Second)
gc.purgeLookahead()
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 GC lookahead entries, got: %v", i)
}
if i := len(ab.PeersWithAddrs()); i != 1 {
t.Errorf("expected 1 entries in database, got: %v", i)
}
if p := ab.PeersWithAddrs()[0]; p != ids[3] {
t.Errorf("expected remaining peer to be #3, got: %v, expected: %v", p, ids[3])
}
}
func TestGCDelay(t *testing.T) {
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
opts := DefaultOpts()
opts.GCInitialDelay = 2 * time.Second
opts.GCLookaheadInterval = 1 * time.Minute
opts.GCPurgeInterval = 30 * time.Second
factory := addressBookFactory(t, badgerStore, opts)
ab, closeFn := factory()
defer closeFn()
tp := &testProbe{t, ab}
ab.AddAddrs(ids[0], addrs, 1*time.Second)
// immediately after we should be having no lookahead entries.
if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no lookahead entries, got: %d", i)
}
// after the initial delay has passed.
<-time.After(3 * time.Second)
if i := tp.countLookaheadEntries(); i != 1 {
t.Errorf("expected 1 lookahead entry, got: %d", i)
}
}
func TestGCLookaheadDisabled(t *testing.T) {
ids := test.GeneratePeerIDs(10)
addrs := test.GenerateAddrs(100)
opts := DefaultOpts()
// effectively disable automatic GC for this test.
opts.GCInitialDelay = 90 * time.Hour
opts.GCLookaheadInterval = 0 // disable lookahead
opts.GCPurgeInterval = 9 * time.Hour
factory := addressBookFactory(t, badgerStore, opts)
ab, closeFn := factory()
defer closeFn()
tp := &testProbe{t, ab}
// four peers:
// ids[0] has 10 addresses, all of which expire in 500ms.
// ids[1] has 20 addresses; 50% expire in 500ms and 50% in 10 hours.
// ids[2] has 10 addresses; all expire in 10 hours.
// ids[3] has 60 addresses; all expire in 10 hours.
ab.AddAddrs(ids[0], addrs[:10], 500*time.Millisecond)
ab.AddAddrs(ids[1], addrs[10:20], 500*time.Millisecond)
ab.AddAddrs(ids[1], addrs[20:30], 10*time.Hour)
ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour)
ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour)
time.Sleep(100 * time.Millisecond)
if i := tp.countLookaheadEntries(); i != 0 {
t.Errorf("expected no GC lookahead entries, got: %v", i)
}
time.Sleep(500 * time.Millisecond)
gc := ab.(*dsAddrBook).gc
gc.purgeFunc()
var empty []ma.Multiaddr
test.AssertAddressesEqual(t, empty, ab.Addrs(ids[0]))
test.AssertAddressesEqual(t, addrs[20:30], ab.Addrs(ids[1]))
test.AssertAddressesEqual(t, addrs[30:40], ab.Addrs(ids[2]))
test.AssertAddressesEqual(t, addrs[40:], ab.Addrs(ids[3]))
}
func BenchmarkLookaheadCycle(b *testing.B) {
ids := test.GeneratePeerIDs(100)
addrs := test.GenerateAddrs(100)
opts := DefaultOpts()
opts.GCInitialDelay = 2 * time.Hour
opts.GCLookaheadInterval = 2 * time.Hour
opts.GCPurgeInterval = 6 * time.Hour
factory := addressBookFactory(b, badgerStore, opts)
ab, closeFn := factory()
defer closeFn()
inside, outside := 1*time.Minute, 48*time.Hour
for i, id := range ids {
var ttl time.Duration
if i%2 == 0 {
ttl = inside
} else {
ttl = outside
}
ab.AddAddrs(id, addrs, ttl)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ab.(*dsAddrBook).gc.populateLookahead()
}
}

View File

@ -8,12 +8,15 @@ type cache interface {
Remove(key interface{})
Contains(key interface{}) bool
Peek(key interface{}) (value interface{}, ok bool)
Keys() []interface{}
}
// noopCache is a dummy implementation that's used when the cache is disabled.
type noopCache struct {
}
var _ cache = (*noopCache)(nil)
func (*noopCache) Get(key interface{}) (value interface{}, ok bool) {
return nil, false
}
@ -31,3 +34,7 @@ func (*noopCache) Contains(key interface{}) bool {
func (*noopCache) Peek(key interface{}) (value interface{}, ok bool) {
return nil, false
}
func (*noopCache) Keys() (keys []interface{}) {
return keys
}

76
pstoreds/cyclic_batch.go Normal file
View File

@ -0,0 +1,76 @@
package pstoreds
import (
"github.com/pkg/errors"
ds "github.com/ipfs/go-datastore"
)
// how many operations are queued in a cyclic batch before we flush it.
var defaultOpsPerCyclicBatch = 20
// cyclicBatch buffers ds write operations and automatically flushes them after defaultOpsPerCyclicBatch (20) have been
// queued. An explicit `Commit()` closes this cyclic batch, erroring all further operations.
//
// It is similar to go-ds autobatch, but it's driven by an actual Batch facility offered by the
// ds.
type cyclicBatch struct {
threshold int
ds.Batch
ds ds.Batching
pending int
}
func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) {
batch, err := ds.Batch()
if err != nil {
return nil, err
}
return &cyclicBatch{Batch: batch, ds: ds}, nil
}
func (cb *cyclicBatch) cycle() (err error) {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if cb.pending < cb.threshold {
// we haven't reached the threshold yet.
return nil
}
// commit and renew the batch.
if err = cb.Batch.Commit(); err != nil {
return errors.Wrap(err, "failed while committing cyclic batch")
}
if cb.Batch, err = cb.ds.Batch(); err != nil {
return errors.Wrap(err, "failed while renewing cyclic batch")
}
return nil
}
func (cb *cyclicBatch) Put(key ds.Key, val []byte) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Put(key, val)
}
func (cb *cyclicBatch) Delete(key ds.Key) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Delete(key)
}
func (cb *cyclicBatch) Commit() error {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if err := cb.Batch.Commit(); err != nil {
return err
}
cb.pending = 0
cb.Batch = nil
return nil
}

View File

@ -15,11 +15,10 @@ import (
pt "github.com/libp2p/go-libp2p-peerstore/test"
)
type datastoreFactory func(tb testing.TB) (ds.TxnDatastore, func())
type datastoreFactory func(tb testing.TB) (ds.Batching, func())
var dstores = map[string]datastoreFactory{
"Badger": badgerStore,
// TODO: Enable once go-ds-leveldb supports TTL via a shim.
// "Leveldb": leveldbStore,
}
@ -37,7 +36,7 @@ func TestDsAddrBook(t *testing.T) {
t.Parallel()
opts := DefaultOpts()
opts.TTLInterval = 100 * time.Microsecond
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 1024
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
@ -47,7 +46,7 @@ func TestDsAddrBook(t *testing.T) {
t.Parallel()
opts := DefaultOpts()
opts.TTLInterval = 100 * time.Microsecond
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 0
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
@ -88,7 +87,7 @@ func BenchmarkDsPeerstore(b *testing.B) {
}
}
func badgerStore(tb testing.TB) (ds.TxnDatastore, func()) {
func badgerStore(tb testing.TB) (ds.Batching, func()) {
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
if err != nil {
tb.Fatal(err)
@ -122,39 +121,41 @@ func leveldbStore(tb testing.TB) (ds.TxnDatastore, func()) {
func peerstoreFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.PeerstoreFactory {
return func() (pstore.Peerstore, func()) {
store, closeFunc := storeFactory(tb)
store, storeCloseFn := storeFactory(tb)
ps, err := NewPeerstore(context.Background(), store, opts)
if err != nil {
tb.Fatal(err)
}
return ps, closeFunc
closer := func() {
ps.Close()
storeCloseFn()
}
return ps, closer
}
}
func addressBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.AddrBookFactory {
return func() (pstore.AddrBook, func()) {
store, closeFunc := storeFactory(tb)
ab, err := NewAddrBook(context.Background(), store, opts)
if err != nil {
tb.Fatal(err)
}
return ab, closeFunc
closer := func() {
ab.Close()
closeFunc()
}
return ab, closer
}
}
func keyBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.KeyBookFactory {
return func() (pstore.KeyBook, func()) {
store, closeFunc := storeFactory(tb)
store, storeCloseFn := storeFactory(tb)
kb, err := NewKeyBook(context.Background(), store, opts)
if err != nil {
tb.Fatal(err)
}
return kb, closeFunc
return kb, storeCloseFn
}
}

View File

@ -23,12 +23,12 @@ var (
)
type dsKeyBook struct {
ds ds.TxnDatastore
ds ds.Datastore
}
var _ pstore.KeyBook = (*dsKeyBook)(nil)
func NewKeyBook(_ context.Context, store ds.TxnDatastore, _ Options) (pstore.KeyBook, error) {
func NewKeyBook(_ context.Context, store ds.Datastore, _ Options) (pstore.KeyBook, error) {
return &dsKeyBook{store}, nil
}

View File

@ -18,28 +18,36 @@ type Options struct {
// The size of the in-memory cache. A value of 0 or lower disables the cache.
CacheSize uint
// Sweep interval to expire entries, only used when TTL is *not* natively managed
// by the underlying datastore.
TTLInterval time.Duration
// Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run
// automatically, but it'll be available on demand via explicit calls.
GCPurgeInterval time.Duration
// Number of times to retry transactional writes.
WriteRetries uint
// Interval to renew the GC lookahead window. If this is a zero value, lookahead will be disabled and we'll
// traverse the entire datastore for every purge cycle.
GCLookaheadInterval time.Duration
// Initial delay before GC processes start. Intended to give the system breathing room to fully boot
// before starting GC.
GCInitialDelay time.Duration
}
// DefaultOpts returns the default options for a persistent peerstore:
// * Cache size: 1024
// * TTL sweep interval: 1 second
// * WriteRetries: 5
// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm:
//
// * Cache size: 1024.
// * GC purge interval: 2 hours.
// * GC lookahead interval: disabled.
// * GC initial delay: 60 seconds.
func DefaultOpts() Options {
return Options{
CacheSize: 1024,
TTLInterval: time.Second,
WriteRetries: 5,
CacheSize: 1024,
GCPurgeInterval: 2 * time.Hour,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
}
}
// NewPeerstore creates a peerstore backed by the provided persistent datastore.
func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pstore.Peerstore, error) {
func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (pstore.Peerstore, error) {
addrBook, err := NewAddrBook(ctx, store, opts)
if err != nil {
return nil, err
@ -60,20 +68,14 @@ func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pst
}
// uniquePeerIds extracts and returns unique peer IDs from database keys.
func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result query.Result) string) (peer.IDSlice, error) {
func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.Result) string) (peer.IDSlice, error) {
var (
q = query.Query{Prefix: prefix.String(), KeysOnly: true}
results query.Results
err error
)
txn, err := ds.NewTransaction(true)
if err != nil {
return nil, err
}
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
if results, err = ds.Query(q); err != nil {
log.Error(err)
return nil, err
}
@ -90,12 +92,11 @@ func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result quer
return peer.IDSlice{}, nil
}
ids := make(peer.IDSlice, len(idset))
i := 0
ids := make(peer.IDSlice, 0, len(idset))
for id := range idset {
pid, _ := base32.RawStdEncoding.DecodeString(id)
ids[i], _ = peer.IDFromBytes(pid)
i++
id, _ := peer.IDFromBytes(pid)
ids = append(ids, id)
}
return ids, nil
}

View File

@ -1,14 +1,10 @@
package test
import (
"fmt"
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
pt "github.com/libp2p/go-libp2p-peer/test"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){
@ -39,64 +35,48 @@ func TestAddrBook(t *testing.T, factory AddrBookFactory) {
}
}
func generateAddrs(count int) []ma.Multiaddr {
var addrs = make([]ma.Multiaddr, count)
for i := 0; i < count; i++ {
addrs[i] = multiaddr(fmt.Sprintf("/ip4/1.1.1.%d/tcp/1111", i))
}
return addrs
}
func generatePeerIds(count int) []peer.ID {
var ids = make([]peer.ID, count)
for i := 0; i < count; i++ {
ids[i], _ = pt.RandPeerID()
}
return ids
}
func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
return func(t *testing.T) {
t.Run("add a single address", func(t *testing.T) {
id := generatePeerIds(1)[0]
addrs := generateAddrs(1)
id := GeneratePeerIDs(1)[0]
addrs := GenerateAddrs(1)
ab.AddAddr(id, addrs[0], time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("idempotent add single address", func(t *testing.T) {
id := generatePeerIds(1)[0]
addrs := generateAddrs(1)
id := GeneratePeerIDs(1)[0]
addrs := GenerateAddrs(1)
ab.AddAddr(id, addrs[0], time.Hour)
ab.AddAddr(id, addrs[0], time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("add multiple addresses", func(t *testing.T) {
id := generatePeerIds(1)[0]
addrs := generateAddrs(3)
id := GeneratePeerIDs(1)[0]
addrs := GenerateAddrs(3)
ab.AddAddrs(id, addrs, time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("idempotent add multiple addresses", func(t *testing.T) {
id := generatePeerIds(1)[0]
addrs := generateAddrs(3)
id := GeneratePeerIDs(1)[0]
addrs := GenerateAddrs(3)
ab.AddAddrs(id, addrs, time.Hour)
ab.AddAddrs(id, addrs, time.Hour)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
t.Run("adding an existing address with a later expiration extends its ttl", func(t *testing.T) {
id := generatePeerIds(1)[0]
addrs := generateAddrs(3)
id := GeneratePeerIDs(1)[0]
addrs := GenerateAddrs(3)
ab.AddAddrs(id, addrs, time.Second)
@ -105,12 +85,12 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
// after the initial TTL has expired, check that only the third address is present.
time.Sleep(1200 * time.Millisecond)
testHas(t, addrs[2:], ab.Addrs(id))
AssertAddressesEqual(t, addrs[2:], ab.Addrs(id))
})
t.Run("adding an existing address with an earlier expiration is noop", func(t *testing.T) {
id := generatePeerIds(1)[0]
addrs := generateAddrs(3)
id := GeneratePeerIDs(1)[0]
addrs := GenerateAddrs(3)
ab.AddAddrs(id, addrs, time.Hour)
@ -120,57 +100,64 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) {
// after the initial TTL has expired, check that all three addresses are still present (i.e. the TTL on
// the modified one was not shortened).
time.Sleep(2100 * time.Millisecond)
testHas(t, addrs, ab.Addrs(id))
AssertAddressesEqual(t, addrs, ab.Addrs(id))
})
}
}
func testClearWorks(ab pstore.AddrBook) func(t *testing.T) {
return func(t *testing.T) {
ids := generatePeerIds(2)
addrs := generateAddrs(5)
ids := GeneratePeerIDs(2)
addrs := GenerateAddrs(5)
ab.AddAddrs(ids[0], addrs[0:3], time.Hour)
ab.AddAddrs(ids[1], addrs[3:], time.Hour)
testHas(t, addrs[0:3], ab.Addrs(ids[0]))
testHas(t, addrs[3:], ab.Addrs(ids[1]))
AssertAddressesEqual(t, addrs[0:3], ab.Addrs(ids[0]))
AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1]))
ab.ClearAddrs(ids[0])
testHas(t, nil, ab.Addrs(ids[0]))
testHas(t, addrs[3:], ab.Addrs(ids[1]))
AssertAddressesEqual(t, nil, ab.Addrs(ids[0]))
AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1]))
ab.ClearAddrs(ids[1])
testHas(t, nil, ab.Addrs(ids[0]))
testHas(t, nil, ab.Addrs(ids[1]))
AssertAddressesEqual(t, nil, ab.Addrs(ids[0]))
AssertAddressesEqual(t, nil, ab.Addrs(ids[1]))
}
}
func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) {
return func(t *testing.T) {
id := generatePeerIds(1)[0]
addr := generateAddrs(1)[0]
id := GeneratePeerIDs(1)[0]
addrs := GenerateAddrs(100)
m.SetAddr(id, addr, time.Hour)
testHas(t, []ma.Multiaddr{addr}, m.Addrs(id))
m.SetAddrs(id, addrs, time.Hour)
AssertAddressesEqual(t, addrs, m.Addrs(id))
m.SetAddr(id, addr, -1)
testHas(t, nil, m.Addrs(id))
// remove two addresses.
m.SetAddr(id, addrs[50], -1)
m.SetAddr(id, addrs[75], -1)
// calculate the survivors
survivors := append(addrs[0:50], addrs[51:]...)
survivors = append(survivors[0:74], survivors[75:]...)
AssertAddressesEqual(t, survivors, m.Addrs(id))
}
}
func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
return func(t *testing.T) {
t.Run("update ttl of peer with no addrs", func(t *testing.T) {
id := generatePeerIds(1)[0]
id := GeneratePeerIDs(1)[0]
// Shouldn't panic.
m.UpdateAddrs(id, time.Hour, time.Minute)
})
t.Run("update ttls successfully", func(t *testing.T) {
ids := generatePeerIds(2)
addrs1, addrs2 := generateAddrs(2), generateAddrs(2)
ids := GeneratePeerIDs(2)
addrs1, addrs2 := GenerateAddrs(2), GenerateAddrs(2)
// set two keys with different ttls for each peer.
m.SetAddr(ids[0], addrs1[0], time.Hour)
@ -179,8 +166,8 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
m.SetAddr(ids[1], addrs2[1], time.Minute)
// Sanity check.
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
// Will only affect addrs1[0].
// Badger does not support subsecond TTLs.
@ -188,26 +175,26 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
m.UpdateAddrs(ids[0], time.Hour, 1*time.Second)
// No immediate effect.
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
// After a wait, addrs[0] is gone.
time.Sleep(1500 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
// Will only affect addrs2[0].
m.UpdateAddrs(ids[1], time.Hour, 1*time.Second)
// No immediate effect.
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
time.Sleep(1500 * time.Millisecond)
// First addrs is gone in both.
testHas(t, addrs1[1:], m.Addrs(ids[0]))
testHas(t, addrs2[1:], m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1]))
})
}
@ -215,7 +202,7 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
func testNilAddrsDontBreak(m pstore.AddrBook) func(t *testing.T) {
return func(t *testing.T) {
id := generatePeerIds(1)[0]
id := GeneratePeerIDs(1)[0]
m.SetAddr(id, nil, time.Hour)
m.AddAddr(id, nil, time.Hour)
@ -224,53 +211,53 @@ func testNilAddrsDontBreak(m pstore.AddrBook) func(t *testing.T) {
func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) {
return func(t *testing.T) {
ids := generatePeerIds(2)
addrs1 := generateAddrs(3)
addrs2 := generateAddrs(2)
ids := GeneratePeerIDs(2)
addrs1 := GenerateAddrs(3)
addrs2 := GenerateAddrs(2)
m.AddAddrs(ids[0], addrs1, time.Hour)
m.AddAddrs(ids[1], addrs2, time.Hour)
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.AddAddrs(ids[0], addrs1, 2*time.Hour)
m.AddAddrs(ids[1], addrs2, 2*time.Hour)
testHas(t, addrs1, m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1, m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:3], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:3], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2, m.Addrs(ids[1]))
m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, addrs2[1:], m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1]))
m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
testHas(t, nil, m.Addrs(ids[1]))
AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0]))
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond)
<-time.After(100 * time.Millisecond)
testHas(t, nil, m.Addrs(ids[0]))
testHas(t, nil, m.Addrs(ids[1]))
AssertAddressesEqual(t, nil, m.Addrs(ids[0]))
AssertAddressesEqual(t, nil, m.Addrs(ids[1]))
}
}
func testClearWithIterator(m pstore.AddrBook) func(t *testing.T) {
return func(t *testing.T) {
ids := generatePeerIds(2)
addrs := generateAddrs(100)
ids := GeneratePeerIDs(2)
addrs := GenerateAddrs(100)
// Add the peers with 50 addresses each.
m.AddAddrs(ids[0], addrs[:50], pstore.PermanentAddrTTL)
@ -307,8 +294,8 @@ func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) {
})
t.Run("non-empty addrbook", func(t *testing.T) {
ids := generatePeerIds(2)
addrs := generateAddrs(10)
ids := GeneratePeerIDs(2)
addrs := GenerateAddrs(10)
m.AddAddrs(ids[0], addrs[:5], pstore.PermanentAddrTTL)
m.AddAddrs(ids[1], addrs[5:], pstore.PermanentAddrTTL)
@ -319,25 +306,3 @@ func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) {
})
}
}
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
t.Helper()
if len(exp) != len(act) {
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatalf("expected address %s not found", a)
}
}
}

View File

@ -36,7 +36,7 @@ func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory, variant string)
// Start all test peer producing goroutines, where each produces peers with as many
// multiaddrs as the n field in the param struct.
for _, p := range params {
go addressProducer(ctx, b, p.ch, p.n)
go AddressProducer(ctx, b, p.ch, p.n)
}
// So tests are always run in the same order.

View File

@ -10,7 +10,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
func multiaddr(m string) ma.Multiaddr {
func Multiaddr(m string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(m)
if err != nil {
panic(err)
@ -23,7 +23,7 @@ type peerpair struct {
Addr []ma.Multiaddr
}
func randomPeer(b *testing.B, addrCount int) *peerpair {
func RandomPeer(b *testing.B, addrCount int) *peerpair {
var (
pid peer.ID
err error
@ -44,11 +44,11 @@ func randomPeer(b *testing.B, addrCount int) *peerpair {
return &peerpair{pid, addrs}
}
func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, addrsPerPeer int) {
func AddressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, addrsPerPeer int) {
b.Helper()
defer close(addrs)
for {
p := randomPeer(b, addrsPerPeer)
p := RandomPeer(b, addrsPerPeer)
select {
case addrs <- p:
case <-ctx.Done():
@ -56,3 +56,41 @@ func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, ad
}
}
}
func GenerateAddrs(count int) []ma.Multiaddr {
var addrs = make([]ma.Multiaddr, count)
for i := 0; i < count; i++ {
addrs[i] = Multiaddr(fmt.Sprintf("/ip4/1.1.1.%d/tcp/1111", i))
}
return addrs
}
func GeneratePeerIDs(count int) []peer.ID {
var ids = make([]peer.ID, count)
for i := 0; i < count; i++ {
ids[i], _ = pt.RandPeerID()
}
return ids
}
func AssertAddressesEqual(t *testing.T, exp, act []ma.Multiaddr) {
t.Helper()
if len(exp) != len(act) {
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatalf("expected address %s not found", a)
}
}
}