pstoreds: migrate from strict TTL to GC-based expiry.

Each entry in the address book KV store now represents a full peer
with all its addresses. Expiries and TTLs are kept inline. Records are
serialised with Protobuf.

Housekeeping is performed on retrieve, and via a periodic GC routine.
The architecture mimics a write-through cache, although not strictly.
This commit is contained in:
Raúl Kripalani 2018-11-12 22:28:12 +00:00
parent 3a5fba8af2
commit c423e9e997
7 changed files with 1485 additions and 388 deletions

11
pb/Makefile Normal file
View File

@ -0,0 +1,11 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
clean:
rm -f *.pb.go
rm -f *.go

936
pb/pstore.pb.go Normal file
View File

@ -0,0 +1,936 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pstore.proto
package pstore_pb
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import _ "github.com/golang/protobuf/ptypes/duration"
import _ "github.com/golang/protobuf/ptypes/timestamp"
import time "time"
import github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
import io "io"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
// AddrBookRecord represents a record for a peer in the address book.
type AddrBookRecord struct {
// The peer ID.
Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// The timestamp where purging needs to happen.
NextVisit *time.Time `protobuf:"bytes,2,opt,name=nextVisit,stdtime" json:"nextVisit,omitempty"`
// The multiaddresses.
Addrs map[string]*AddrBookRecord_AddrEntry `protobuf:"bytes,3,rep,name=addrs" json:"addrs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AddrBookRecord) Reset() { *m = AddrBookRecord{} }
func (m *AddrBookRecord) String() string { return proto.CompactTextString(m) }
func (*AddrBookRecord) ProtoMessage() {}
func (*AddrBookRecord) Descriptor() ([]byte, []int) {
return fileDescriptor_pstore_842456c80d89ffef, []int{0}
}
func (m *AddrBookRecord) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AddrBookRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AddrBookRecord.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (dst *AddrBookRecord) XXX_Merge(src proto.Message) {
xxx_messageInfo_AddrBookRecord.Merge(dst, src)
}
func (m *AddrBookRecord) XXX_Size() int {
return m.Size()
}
func (m *AddrBookRecord) XXX_DiscardUnknown() {
xxx_messageInfo_AddrBookRecord.DiscardUnknown(m)
}
var xxx_messageInfo_AddrBookRecord proto.InternalMessageInfo
func (m *AddrBookRecord) GetId() []byte {
if m != nil {
return m.Id
}
return nil
}
func (m *AddrBookRecord) GetNextVisit() *time.Time {
if m != nil {
return m.NextVisit
}
return nil
}
func (m *AddrBookRecord) GetAddrs() map[string]*AddrBookRecord_AddrEntry {
if m != nil {
return m.Addrs
}
return nil
}
// AddrEntry represents a single multiaddress.
type AddrBookRecord_AddrEntry struct {
// The point in time when this address expires.
Expiry *time.Time `protobuf:"bytes,2,opt,name=expiry,stdtime" json:"expiry,omitempty"`
// The original TTL of this address.
Ttl *time.Duration `protobuf:"bytes,3,opt,name=ttl,stdduration" json:"ttl,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AddrBookRecord_AddrEntry) Reset() { *m = AddrBookRecord_AddrEntry{} }
func (m *AddrBookRecord_AddrEntry) String() string { return proto.CompactTextString(m) }
func (*AddrBookRecord_AddrEntry) ProtoMessage() {}
func (*AddrBookRecord_AddrEntry) Descriptor() ([]byte, []int) {
return fileDescriptor_pstore_842456c80d89ffef, []int{0, 1}
}
func (m *AddrBookRecord_AddrEntry) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AddrBookRecord_AddrEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AddrBookRecord_AddrEntry.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (dst *AddrBookRecord_AddrEntry) XXX_Merge(src proto.Message) {
xxx_messageInfo_AddrBookRecord_AddrEntry.Merge(dst, src)
}
func (m *AddrBookRecord_AddrEntry) XXX_Size() int {
return m.Size()
}
func (m *AddrBookRecord_AddrEntry) XXX_DiscardUnknown() {
xxx_messageInfo_AddrBookRecord_AddrEntry.DiscardUnknown(m)
}
var xxx_messageInfo_AddrBookRecord_AddrEntry proto.InternalMessageInfo
func (m *AddrBookRecord_AddrEntry) GetExpiry() *time.Time {
if m != nil {
return m.Expiry
}
return nil
}
func (m *AddrBookRecord_AddrEntry) GetTtl() *time.Duration {
if m != nil {
return m.Ttl
}
return nil
}
func init() {
proto.RegisterType((*AddrBookRecord)(nil), "pstore.pb.AddrBookRecord")
proto.RegisterMapType((map[string]*AddrBookRecord_AddrEntry)(nil), "pstore.pb.AddrBookRecord.AddrsEntry")
proto.RegisterType((*AddrBookRecord_AddrEntry)(nil), "pstore.pb.AddrBookRecord.AddrEntry")
}
func (m *AddrBookRecord) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AddrBookRecord) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Id) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintPstore(dAtA, i, uint64(len(m.Id)))
i += copy(dAtA[i:], m.Id)
}
if m.NextVisit != nil {
dAtA[i] = 0x12
i++
i = encodeVarintPstore(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.NextVisit)))
n1, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.NextVisit, dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
if len(m.Addrs) > 0 {
for k, _ := range m.Addrs {
dAtA[i] = 0x1a
i++
v := m.Addrs[k]
msgSize := 0
if v != nil {
msgSize = v.Size()
msgSize += 1 + sovPstore(uint64(msgSize))
}
mapSize := 1 + len(k) + sovPstore(uint64(len(k))) + msgSize
i = encodeVarintPstore(dAtA, i, uint64(mapSize))
dAtA[i] = 0xa
i++
i = encodeVarintPstore(dAtA, i, uint64(len(k)))
i += copy(dAtA[i:], k)
if v != nil {
dAtA[i] = 0x12
i++
i = encodeVarintPstore(dAtA, i, uint64(v.Size()))
n2, err := v.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n2
}
}
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
func (m *AddrBookRecord_AddrEntry) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AddrBookRecord_AddrEntry) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Expiry != nil {
dAtA[i] = 0x12
i++
i = encodeVarintPstore(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.Expiry)))
n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.Expiry, dAtA[i:])
if err != nil {
return 0, err
}
i += n3
}
if m.Ttl != nil {
dAtA[i] = 0x1a
i++
i = encodeVarintPstore(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdDuration(*m.Ttl)))
n4, err := github_com_gogo_protobuf_types.StdDurationMarshalTo(*m.Ttl, dAtA[i:])
if err != nil {
return 0, err
}
i += n4
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeVarintPstore(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func NewPopulatedAddrBookRecord(r randyPstore, easy bool) *AddrBookRecord {
this := &AddrBookRecord{}
v1 := r.Intn(100)
this.Id = make([]byte, v1)
for i := 0; i < v1; i++ {
this.Id[i] = byte(r.Intn(256))
}
if r.Intn(10) != 0 {
this.NextVisit = github_com_gogo_protobuf_types.NewPopulatedStdTime(r, easy)
}
if r.Intn(10) != 0 {
v2 := r.Intn(10)
this.Addrs = make(map[string]*AddrBookRecord_AddrEntry)
for i := 0; i < v2; i++ {
this.Addrs[randStringPstore(r)] = NewPopulatedAddrBookRecord_AddrEntry(r, easy)
}
}
if !easy && r.Intn(10) != 0 {
this.XXX_unrecognized = randUnrecognizedPstore(r, 4)
}
return this
}
func NewPopulatedAddrBookRecord_AddrEntry(r randyPstore, easy bool) *AddrBookRecord_AddrEntry {
this := &AddrBookRecord_AddrEntry{}
if r.Intn(10) != 0 {
this.Expiry = github_com_gogo_protobuf_types.NewPopulatedStdTime(r, easy)
}
if r.Intn(10) != 0 {
this.Ttl = github_com_gogo_protobuf_types.NewPopulatedStdDuration(r, easy)
}
if !easy && r.Intn(10) != 0 {
this.XXX_unrecognized = randUnrecognizedPstore(r, 4)
}
return this
}
type randyPstore interface {
Float32() float32
Float64() float64
Int63() int64
Int31() int32
Uint32() uint32
Intn(n int) int
}
func randUTF8RunePstore(r randyPstore) rune {
ru := r.Intn(62)
if ru < 10 {
return rune(ru + 48)
} else if ru < 36 {
return rune(ru + 55)
}
return rune(ru + 61)
}
func randStringPstore(r randyPstore) string {
v3 := r.Intn(100)
tmps := make([]rune, v3)
for i := 0; i < v3; i++ {
tmps[i] = randUTF8RunePstore(r)
}
return string(tmps)
}
func randUnrecognizedPstore(r randyPstore, maxFieldNumber int) (dAtA []byte) {
l := r.Intn(5)
for i := 0; i < l; i++ {
wire := r.Intn(4)
if wire == 3 {
wire = 5
}
fieldNumber := maxFieldNumber + r.Intn(100)
dAtA = randFieldPstore(dAtA, r, fieldNumber, wire)
}
return dAtA
}
func randFieldPstore(dAtA []byte, r randyPstore, fieldNumber int, wire int) []byte {
key := uint32(fieldNumber)<<3 | uint32(wire)
switch wire {
case 0:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
v4 := r.Int63()
if r.Intn(2) == 0 {
v4 *= -1
}
dAtA = encodeVarintPopulatePstore(dAtA, uint64(v4))
case 1:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
case 2:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
ll := r.Intn(100)
dAtA = encodeVarintPopulatePstore(dAtA, uint64(ll))
for j := 0; j < ll; j++ {
dAtA = append(dAtA, byte(r.Intn(256)))
}
default:
dAtA = encodeVarintPopulatePstore(dAtA, uint64(key))
dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
}
return dAtA
}
func encodeVarintPopulatePstore(dAtA []byte, v uint64) []byte {
for v >= 1<<7 {
dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80))
v >>= 7
}
dAtA = append(dAtA, uint8(v))
return dAtA
}
func (m *AddrBookRecord) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Id)
if l > 0 {
n += 1 + l + sovPstore(uint64(l))
}
if m.NextVisit != nil {
l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.NextVisit)
n += 1 + l + sovPstore(uint64(l))
}
if len(m.Addrs) > 0 {
for k, v := range m.Addrs {
_ = k
_ = v
l = 0
if v != nil {
l = v.Size()
l += 1 + sovPstore(uint64(l))
}
mapEntrySize := 1 + len(k) + sovPstore(uint64(len(k))) + l
n += mapEntrySize + 1 + sovPstore(uint64(mapEntrySize))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *AddrBookRecord_AddrEntry) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Expiry != nil {
l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.Expiry)
n += 1 + l + sovPstore(uint64(l))
}
if m.Ttl != nil {
l = github_com_gogo_protobuf_types.SizeOfStdDuration(*m.Ttl)
n += 1 + l + sovPstore(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovPstore(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozPstore(x uint64) (n int) {
return sovPstore(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *AddrBookRecord) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AddrBookRecord: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AddrBookRecord: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Id = append(m.Id[:0], dAtA[iNdEx:postIndex]...)
if m.Id == nil {
m.Id = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field NextVisit", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.NextVisit == nil {
m.NextVisit = new(time.Time)
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.NextVisit, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Addrs == nil {
m.Addrs = make(map[string]*AddrBookRecord_AddrEntry)
}
var mapkey string
var mapvalue *AddrBookRecord_AddrEntry
for iNdEx < postIndex {
entryPreIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
if fieldNum == 1 {
var stringLenmapkey uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLenmapkey |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLenmapkey := int(stringLenmapkey)
if intStringLenmapkey < 0 {
return ErrInvalidLengthPstore
}
postStringIndexmapkey := iNdEx + intStringLenmapkey
if postStringIndexmapkey > l {
return io.ErrUnexpectedEOF
}
mapkey = string(dAtA[iNdEx:postStringIndexmapkey])
iNdEx = postStringIndexmapkey
} else if fieldNum == 2 {
var mapmsglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
mapmsglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if mapmsglen < 0 {
return ErrInvalidLengthPstore
}
postmsgIndex := iNdEx + mapmsglen
if mapmsglen < 0 {
return ErrInvalidLengthPstore
}
if postmsgIndex > l {
return io.ErrUnexpectedEOF
}
mapvalue = &AddrBookRecord_AddrEntry{}
if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil {
return err
}
iNdEx = postmsgIndex
} else {
iNdEx = entryPreIndex
skippy, err := skipPstore(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPstore
}
if (iNdEx + skippy) > postIndex {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
m.Addrs[mapkey] = mapvalue
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPstore(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPstore
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *AddrBookRecord_AddrEntry) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AddrEntry: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AddrEntry: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Expiry", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Expiry == nil {
m.Expiry = new(time.Time)
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.Expiry, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPstore
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPstore
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Ttl == nil {
m.Ttl = new(time.Duration)
}
if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(m.Ttl, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPstore(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPstore
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPstore(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, ErrInvalidLengthPstore
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPstore
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipPstore(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthPstore = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPstore = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("pstore.proto", fileDescriptor_pstore_842456c80d89ffef) }
var fileDescriptor_pstore_842456c80d89ffef = []byte{
// 330 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xbf, 0x4e, 0xc3, 0x30,
0x10, 0xc6, 0xe5, 0xa4, 0xad, 0x14, 0xb7, 0xaa, 0x90, 0xc5, 0x10, 0x32, 0xb8, 0x15, 0x30, 0x74,
0xc1, 0x15, 0x65, 0x29, 0x1d, 0x90, 0x88, 0xe0, 0x05, 0x22, 0xc4, 0xc6, 0x90, 0xd4, 0x26, 0x58,
0xfd, 0x73, 0x91, 0xe3, 0xa0, 0xf6, 0x2d, 0x18, 0x79, 0x1c, 0x46, 0x46, 0xde, 0x00, 0xc8, 0x3b,
0x20, 0x31, 0xa2, 0x38, 0x69, 0x23, 0x40, 0x42, 0x6c, 0xf7, 0xdd, 0x7d, 0xbf, 0xf3, 0x77, 0x32,
0xee, 0x24, 0xa9, 0x06, 0x25, 0x58, 0xa2, 0x40, 0x03, 0x71, 0x36, 0x2a, 0xf2, 0x7a, 0x31, 0x40,
0x3c, 0x17, 0x43, 0x33, 0x88, 0xb2, 0xdb, 0xa1, 0x96, 0x0b, 0x91, 0xea, 0x70, 0x91, 0x94, 0x5e,
0x8f, 0xfe, 0x34, 0xf0, 0x4c, 0x85, 0x5a, 0xc2, 0xb2, 0x9a, 0x1f, 0xc5, 0x52, 0xdf, 0x65, 0x11,
0x9b, 0xc2, 0x62, 0x18, 0x43, 0x0c, 0xb5, 0xb1, 0x50, 0x46, 0x98, 0xaa, 0xb4, 0xef, 0x7f, 0x58,
0xb8, 0x7b, 0xce, 0xb9, 0xf2, 0x01, 0x66, 0x81, 0x98, 0x82, 0xe2, 0xa4, 0x8b, 0x2d, 0xc9, 0x5d,
0xd4, 0x47, 0x83, 0x4e, 0x60, 0x49, 0x4e, 0xce, 0xb0, 0xb3, 0x14, 0x2b, 0x7d, 0x2d, 0x53, 0xa9,
0x5d, 0xab, 0x8f, 0x06, 0xed, 0x91, 0xc7, 0xca, 0x14, 0x6c, 0xb3, 0x9c, 0x5d, 0x6d, 0x62, 0xfa,
0x8d, 0x87, 0xd7, 0x1e, 0x0a, 0x6a, 0x84, 0x4c, 0x70, 0x33, 0xe4, 0x5c, 0xa5, 0xae, 0xdd, 0xb7,
0x07, 0xed, 0xd1, 0x21, 0xdb, 0x5e, 0xcb, 0xbe, 0xbf, 0x6c, 0x64, 0x7a, 0xb9, 0xd4, 0x6a, 0x1d,
0x94, 0x88, 0x77, 0x83, 0x71, 0xdd, 0x24, 0x3b, 0xd8, 0x9e, 0x89, 0xb5, 0x89, 0xe6, 0x04, 0x45,
0x49, 0x4e, 0x71, 0xf3, 0x3e, 0x9c, 0x67, 0xa2, 0xca, 0x75, 0xf0, 0xf7, 0xee, 0x6a, 0xb5, 0x21,
0x26, 0xd6, 0x18, 0x79, 0x2b, 0xec, 0x6c, 0xfb, 0x64, 0x8c, 0x5b, 0x62, 0x95, 0x48, 0xb5, 0xfe,
0xf7, 0x91, 0x95, 0x9f, 0x1c, 0x63, 0x5b, 0xeb, 0xb9, 0x6b, 0x1b, 0x6c, 0xef, 0x17, 0x76, 0x51,
0xfd, 0x90, 0xdf, 0x78, 0x2c, 0xa8, 0xc2, 0xeb, 0xef, 0x7e, 0xbe, 0x53, 0xf4, 0x94, 0x53, 0xf4,
0x9c, 0x53, 0xf4, 0x92, 0x53, 0xf4, 0x96, 0x53, 0x14, 0xb5, 0x0c, 0x73, 0xf2, 0x15, 0x00, 0x00,
0xff, 0xff, 0x0c, 0x8d, 0xe8, 0x16, 0x1f, 0x02, 0x00, 0x00,
}

32
pb/pstore.proto Normal file
View File

@ -0,0 +1,32 @@
syntax = "proto3";
package pstore.pb;
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.populate_all) = true;
// AddrBookRecord represents a record for a peer in the address book.
message AddrBookRecord {
// The peer ID.
bytes id = 1;
// The timestamp where purging needs to happen.
google.protobuf.Timestamp nextVisit = 2 [(gogoproto.stdtime) = true];
// The multiaddresses.
map<string, AddrEntry> addrs = 3;
// AddrEntry represents a single multiaddress.
message AddrEntry {
// The point in time when this address expires.
google.protobuf.Timestamp expiry = 2 [(gogoproto.stdtime) = true];
// The original TTL of this address.
google.protobuf.Duration ttl = 3 [(gogoproto.stdduration) = true];
}
}

129
pb/pstorepb_test.go Normal file
View File

@ -0,0 +1,129 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pstore.proto
package pstore_pb
import testing "testing"
import math_rand "math/rand"
import github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto"
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import _ "github.com/golang/protobuf/ptypes/duration"
import _ "github.com/golang/protobuf/ptypes/timestamp"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
func BenchmarkAddrBookRecordProtoMarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord, 10000)
for i := 0; i < 10000; i++ {
pops[i] = NewPopulatedAddrBookRecord(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(pops[i%10000])
if err != nil {
panic(err)
}
total += len(dAtA)
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecordProtoUnmarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
datas := make([][]byte, 10000)
for i := 0; i < 10000; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(NewPopulatedAddrBookRecord(popr, false))
if err != nil {
panic(err)
}
datas[i] = dAtA
}
msg := &AddrBookRecord{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += len(datas[i%10000])
if err := github_com_gogo_protobuf_proto.Unmarshal(datas[i%10000], msg); err != nil {
panic(err)
}
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecord_AddrEntryProtoMarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord_AddrEntry, 10000)
for i := 0; i < 10000; i++ {
pops[i] = NewPopulatedAddrBookRecord_AddrEntry(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(pops[i%10000])
if err != nil {
panic(err)
}
total += len(dAtA)
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecord_AddrEntryProtoUnmarshal(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
datas := make([][]byte, 10000)
for i := 0; i < 10000; i++ {
dAtA, err := github_com_gogo_protobuf_proto.Marshal(NewPopulatedAddrBookRecord_AddrEntry(popr, false))
if err != nil {
panic(err)
}
datas[i] = dAtA
}
msg := &AddrBookRecord_AddrEntry{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += len(datas[i%10000])
if err := github_com_gogo_protobuf_proto.Unmarshal(datas[i%10000], msg); err != nil {
panic(err)
}
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecordSize(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord, 1000)
for i := 0; i < 1000; i++ {
pops[i] = NewPopulatedAddrBookRecord(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += pops[i%1000].Size()
}
b.SetBytes(int64(total / b.N))
}
func BenchmarkAddrBookRecord_AddrEntrySize(b *testing.B) {
popr := math_rand.New(math_rand.NewSource(616))
total := 0
pops := make([]*AddrBookRecord_AddrEntry, 1000)
for i := 0; i < 1000; i++ {
pops[i] = NewPopulatedAddrBookRecord_AddrEntry(popr, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
total += pops[i%1000].Size()
}
b.SetBytes(int64(total / b.N))
}
//These tests are generated by github.com/gogo/protobuf/plugin/testgen

View File

@ -2,50 +2,25 @@ package pstoreds
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
base32 "github.com/whyrusleeping/base32"
ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
pool "github.com/libp2p/go-buffer-pool"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
pb "github.com/libp2p/go-libp2p-peerstore/pb"
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
lru "github.com/hashicorp/golang-lru"
ma "github.com/multiformats/go-multiaddr"
b32 "github.com/whyrusleeping/base32"
)
var (
log = logging.Logger("peerstore/ds")
// The maximum representable value in time.Time is time.Unix(1<<63-62135596801, 999999999).
// But it's too brittle and implementation-dependent, so we prefer to use 1<<62, which is in the
// year 146138514283. We're safe.
maxTime = time.Unix(1<<62, 0)
ErrTTLDatastore = errors.New("datastore must provide TTL support")
)
// Peer addresses are stored under the following db key pattern:
// /peers/addr/<b32 peer id no padding>/<hash of maddr>
var abBase = ds.NewKey("/peers/addrs")
var _ pstore.AddrBook = (*dsAddrBook)(nil)
// dsAddrBook is an address book backed by a Datastore with both an
// in-memory TTL manager and an in-memory address stream manager.
type dsAddrBook struct {
cache cache
ds ds.TxnDatastore
subsManager *pstoremem.AddrSubManager
writeRetries int
}
type ttlWriteMode int
const (
@ -53,347 +28,375 @@ const (
ttlExtend
)
type cacheEntry struct {
expiration time.Time
addrs []ma.Multiaddr
var (
log = logging.Logger("peerstore/ds")
// Peer addresses are stored under the following db key pattern:
// /peers/addr/<b32 peer id no padding>/<hash of maddr>
addrBookBase = ds.NewKey("/peers/addrs")
)
// addrsRecord decorates the AddrBookRecord with locks and metadata.
type addrsRecord struct {
sync.RWMutex
*pb.AddrBookRecord
delete bool
}
type addrRecord struct {
ttl time.Duration
addr ma.Multiaddr
// FlushInTxn flushes the record to the datastore by calling ds.Put, unless the record is
// marked for deletion, in which case the deletion is executed.
func (r *addrsRecord) FlushInTxn(txn ds.Txn) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id)))
if r.delete {
return txn.Delete(key)
}
data := pool.Get(r.Size())
defer pool.Put(data)
i, err := r.MarshalTo(data)
if err != nil {
return err
}
return txn.Put(key, data[:i])
}
func (ar *addrRecord) MarshalBinary() ([]byte, error) {
ttlB := make([]byte, 8)
binary.LittleEndian.PutUint64(ttlB, uint64(ar.ttl))
return append(ttlB, ar.addr.Bytes()...), nil
// Flush creates a ds.Txn, and calls FlushInTxn with it.
func (r *addrsRecord) Flush(ds ds.TxnDatastore) (err error) {
txn, err := ds.NewTransaction(false)
if err != nil {
return err
}
defer txn.Discard()
if err = r.FlushInTxn(txn); err != nil {
return err
}
return txn.Commit()
}
func (ar *addrRecord) UnmarshalBinary(b []byte) error {
ar.ttl = time.Duration(binary.LittleEndian.Uint64(b))
// this had been serialized by us, no need to check for errors
ar.addr, _ = ma.NewMultiaddrBytes(b[8:])
return nil
// Refresh is called on in-memory entries to perform housekeeping. Refresh does the following:
// * removes all expired addresses.
// * recalculates the date in which the record needs to be revisited (earliest expiration of survivors).
// * marks the record for deletion if no addresses are left.
//
// A `true` value of `force` tells us to proceed with housekeeping even if the `NextVisit` date has not arrived.
//
// Refresh is called in several occasions:
// * with force=false, when accessing and loading an entry, or when performing GC.
// * with force=true, after an entry has been modified (e.g. addresses have been added or removed,
// TTLs updated, etc.)
func (r *addrsRecord) Refresh(force bool) (chgd bool) {
if len(r.Addrs) == 0 {
r.delete = true
r.NextVisit = nil
return true
}
now := time.Now()
if !force && r.NextVisit != nil && !r.NextVisit.IsZero() && now.Before(*r.NextVisit) {
// no expired entries to purge, and no forced housekeeping.
return false
}
// nv stores the next visit for surviving addresses following the purge
var nv time.Time
for addr, entry := range r.Addrs {
if entry.Expiry == nil {
continue
}
if nv.IsZero() {
nv = *entry.Expiry
}
if now.After(*entry.Expiry) {
// this entry is expired; remove it.
delete(r.Addrs, addr)
chgd = true
} else if nv.After(*entry.Expiry) {
// keep track of the earliest expiry across survivors.
nv = *entry.Expiry
}
}
if len(r.Addrs) == 0 {
r.delete = true
r.NextVisit = nil
return true
}
chgd = chgd || r.NextVisit == nil || nv != *r.NextVisit
r.NextVisit = &nv
return chgd
}
// dsAddrBook is an address book backed by a Datastore with a GC-like procedure
// to purge expired entries. It uses an in-memory address stream manager.
type dsAddrBook struct {
ctx context.Context
gcInterval time.Duration
gcMaxPurgePerCycle int
cache cache
ds ds.TxnDatastore
subsManager *pstoremem.AddrSubManager
flushJobCh chan *addrsRecord
cancelFn func()
closedCh chan struct{}
}
var _ pstore.AddrBook = (*dsAddrBook)(nil)
// NewAddrBook initializes a new address book given a
// Datastore instance, a context for managing the TTL manager,
// and the interval at which the TTL manager should sweep the Datastore.
func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (*dsAddrBook, error) {
if _, ok := store.(ds.TTLDatastore); !ok {
return nil, ErrTTLDatastore
}
var (
cache cache = &noopCache{}
err error
)
func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab *dsAddrBook, err error) {
var cache cache = new(noopCache)
if opts.CacheSize > 0 {
if cache, err = lru.NewARC(int(opts.CacheSize)); err != nil {
return nil, err
}
}
ctx, cancelFn := context.WithCancel(ctx)
mgr := &dsAddrBook{
cache: cache,
ds: store,
subsManager: pstoremem.NewAddrSubManager(),
writeRetries: int(opts.WriteRetries),
ctx: ctx,
cancelFn: cancelFn,
gcInterval: opts.GCInterval,
cache: cache,
ds: store,
subsManager: pstoremem.NewAddrSubManager(),
flushJobCh: make(chan *addrsRecord, 32),
closedCh: make(chan struct{}),
}
go mgr.background()
return mgr, nil
}
func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) {
var (
keys = make([]ds.Key, len(addrs))
clean = make([]ma.Multiaddr, len(addrs))
parentKey = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
i = 0
)
func (ab *dsAddrBook) Close() {
ab.cancelFn()
<-ab.closedCh
}
for _, addr := range addrs {
if addr == nil {
continue
func (ab *dsAddrBook) tryFlush(pr *addrsRecord) {
select {
case ab.flushJobCh <- pr:
default:
id, _ := peer.IDFromBytes(pr.Id)
log.Warningf("flush queue is full; could not flush peer %v", id)
}
}
func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) {
if e, ok := ab.cache.Get(id); ok {
pr = e.(*addrsRecord)
if pr.Refresh(false) && update {
ab.tryFlush(pr)
}
hash, err := mh.Sum((addr).Bytes(), mh.MURMUR3, -1)
if err != nil {
return nil, nil, err
}
keys[i] = parentKey.ChildString(base32.RawStdEncoding.EncodeToString(hash))
clean[i] = addr
i++
return pr, nil
}
return keys[:i], clean[:i], nil
}
// AddAddr will add a new address if it's not already in the AddrBook.
func (mgr *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs will add many new addresses if they're not already in the AddrBook.
func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
return
}
mgr.setAddrs(p, addrs, ttl, ttlExtend)
}
// SetAddr will add or update the TTL of an address in the AddrBook.
func (mgr *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
addrs := []ma.Multiaddr{addr}
mgr.SetAddrs(p, addrs, ttl)
}
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
mgr.deleteAddrs(p, addrs)
return
}
mgr.setAddrs(p, addrs, ttl, ttlOverride)
}
func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error {
// Keys and cleaned up addresses.
keys, addrs, err := keysAndAddrs(p, addrs)
if err != nil {
return err
}
mgr.cache.Remove(p)
// Attempt transactional KV deletion.
for i := 0; i < mgr.writeRetries; i++ {
if err = mgr.dbDelete(keys); err == nil {
break
}
log.Errorf("failed to delete addresses for peer %s: %s\n", p.Pretty(), err)
}
if err != nil {
log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err)
return err
}
return nil
}
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) error {
// Keys and cleaned up addresses.
keys, addrs, err := keysAndAddrs(p, addrs)
if err != nil {
return err
}
mgr.cache.Remove(p)
// Attempt transactional KV insertion.
var existed []bool
for i := 0; i < mgr.writeRetries; i++ {
if existed, err = mgr.dbInsert(keys, addrs, ttl, mode); err == nil {
break
}
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
}
if err != nil {
log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err)
return err
}
// Update was successful, so broadcast event only for new addresses.
for i, _ := range keys {
if !existed[i] {
mgr.subsManager.BroadcastAddr(p, addrs[i])
}
}
return nil
}
// dbInsert performs a transactional insert of the provided keys and values.
func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) ([]bool, error) {
var (
err error
existed = make([]bool, len(keys))
exp = time.Now().Add(ttl)
)
txn, err := mgr.ds.NewTransaction(false)
txn, err := ab.ds.NewTransaction(true)
if err != nil {
return nil, err
}
defer txn.Discard()
ttltxn := txn.(ds.TTLDatastore)
for i, key := range keys {
// Check if the key existed previously.
if existed[i], err = ttltxn.Has(key); err != nil {
log.Errorf("transaction failed and aborted while checking key existence: %s, cause: %v", key.String(), err)
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
data, err := txn.Get(key)
if err == nil {
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
if err = pr.Unmarshal(data); err != nil {
return nil, err
}
if pr.Refresh(false) {
ab.tryFlush(pr)
}
if cache {
ab.cache.Add(id, pr)
}
return pr, nil
}
// The key embeds a hash of the value, so if it existed, we can safely skip the insert and
// just update the TTL.
if existed[i] {
switch mode {
case ttlOverride:
err = ttltxn.SetTTL(key, ttl)
case ttlExtend:
var curr time.Time
if curr, err = ttltxn.GetExpiration(key); err == nil && exp.After(curr) {
err = ttltxn.SetTTL(key, ttl)
if err == ds.ErrNotFound {
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{
Id: []byte(id),
Addrs: make(map[string]*pb.AddrBookRecord_AddrEntry),
}}
if cache {
ab.cache.Add(id, pr)
}
return pr, nil
}
log.Error(err)
return nil, err
}
// background is the peerstore process that takes care of:
// * purging expired addresses and peers with no addresses from the datastore at regular intervals.
// * asynchronously flushing cached entries with expired addresses to the datastore.
func (ab *dsAddrBook) background() {
timer := time.NewTicker(ab.gcInterval)
for {
select {
case fj := <-ab.flushJobCh:
id, _ := peer.IDFromBytes(fj.Id)
if cached, ok := ab.cache.Peek(id); ok {
// Only continue flushing if the record we have in memory is the same as for which the flush
// job was requested. If it's not in memory, it has been evicted and we don't know if we hold
// the latest state or not. Similarly, if it's cached but the pointer is different, it means
// it was evicted and has been reloaded, so we're also uncertain if we hold the latest state.
if pr := cached.(*addrsRecord); pr == fj {
pr.RLock()
pr.Flush(ab.ds)
pr.RUnlock()
}
}
if err != nil {
// mode will be printed as an int
log.Errorf("failed while updating the ttl for key: %s, mode: %v, cause: %v", key.String(), mode, err)
return nil, err
case <-timer.C:
ab.purgeCycle()
case <-ab.ctx.Done():
timer.Stop()
close(ab.closedCh)
return
}
}
}
func (ab *dsAddrBook) purgeCycle() {
var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
q := query.Query{Prefix: addrBookBase.String()}
txn, err := ab.ds.NewTransaction(false)
if err != nil {
log.Warningf("failed while purging entries: %v\n", err)
return
}
defer txn.Discard()
results, err := txn.Query(q)
if err != nil {
log.Warningf("failed while purging entries: %v\n", err)
return
}
defer results.Close()
for result := range results.Next() {
id, _ = peer.IDFromBytes(record.Id)
// if the record is in cache, let's refresh that one and flush it if necessary.
if e, ok := ab.cache.Peek(id); ok {
cached := e.(*addrsRecord)
cached.Lock()
if cached.Refresh(false) {
cached.FlushInTxn(txn)
}
cached.Unlock()
continue
}
r := &addrRecord{
ttl: ttl,
addr: addrs[i],
if err := record.Unmarshal(result.Value); err != nil {
log.Warningf("failed while purging entries: %v\n", err)
continue
}
value, _ := r.MarshalBinary()
if err = ttltxn.PutWithTTL(key, value, ttl); err != nil {
log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err)
return nil, err
if record.Refresh(false) {
record.FlushInTxn(txn)
}
record.Reset()
}
if err = txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when setting keys, cause: %v", err)
return nil, err
log.Warningf("failed to commit GC transaction: %v\n", err)
}
}
return existed, nil
// AddAddr will add a new address if it's not already in the AddrBook.
func (ab *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ab.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs will add many new addresses if they're not already in the AddrBook.
func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
return
}
addrs = cleanAddrs(addrs)
ab.setAddrs(p, addrs, ttl, ttlExtend)
}
// SetAddr will add or update the TTL of an address in the AddrBook.
func (ab *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
ab.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
func (ab *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
addrs = cleanAddrs(addrs)
if ttl <= 0 {
ab.deleteAddrs(p, addrs)
return
}
ab.setAddrs(p, addrs, ttl, ttlOverride)
}
// UpdateAddrs will update any addresses for a given peer and TTL combination to
// have a new TTL.
func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
mgr.cache.Remove(p)
var err error
for i := 0; i < mgr.writeRetries; i++ {
if err = mgr.dbUpdateTTL(p, oldTTL, newTTL); err == nil {
break
}
log.Errorf("failed to update ttlsfor peer %s: %s\n", p.Pretty(), err)
}
func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
log.Errorf("failed to avoid write conflict when updating ttls for peer %s after %d retries: %v\n",
p.Pretty(), mgr.writeRetries, err)
log.Errorf("failed to update ttls for peer %s: %s\n", p.Pretty(), err)
}
}
func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error {
var (
prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
q = query.Query{Prefix: prefix.String(), KeysOnly: false}
results query.Results
err error
)
pr.Lock()
defer pr.Unlock()
txn, err := mgr.ds.NewTransaction(false)
if err != nil {
return err
}
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
return err
}
defer results.Close()
ttltxn := txn.(ds.TTLDatastore)
r := &addrRecord{}
for result := range results.Next() {
r.UnmarshalBinary(result.Value)
if r.ttl != oldTTL {
chgd, newExp := false, time.Now().Add(newTTL)
for _, entry := range pr.Addrs {
if entry.Ttl == nil || *entry.Ttl != oldTTL {
continue
}
r.ttl = newTTL
value, _ := r.MarshalBinary()
if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), value, newTTL); err != nil {
return err
}
entry.Ttl, entry.Expiry = &newTTL, &newExp
chgd = true
}
if err := txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when updating ttls, cause: %v", err)
return err
if chgd {
pr.Refresh(true)
pr.Flush(ab.ds)
}
return nil
}
// Addrs returns all of the non-expired addresses for a given peer.
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
var (
prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true}
results query.Results
err error
)
// Check the cache and return the entry only if it hasn't expired; if expired, remove.
if e, ok := mgr.cache.Get(p); ok {
entry := e.(cacheEntry)
if entry.expiration.After(time.Now()) {
addrs := make([]ma.Multiaddr, len(entry.addrs))
copy(addrs, entry.addrs)
return addrs
} else {
mgr.cache.Remove(p)
}
}
txn, err := mgr.ds.NewTransaction(true)
func (ab *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
pr, err := ab.loadRecord(p, true, true)
if err != nil {
log.Warning("failed to load peerstore entry for peer %v while querying addrs, err: %v", p, err)
return nil
}
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
log.Error(err)
return nil
}
defer results.Close()
pr.RLock()
defer pr.RUnlock()
var addrs []ma.Multiaddr
var r addrRecord
// used to set the expiration for the entire cache entry
earliestExp := maxTime
for result := range results.Next() {
if err = r.UnmarshalBinary(result.Value); err == nil {
addrs = append(addrs, r.addr)
}
if exp := result.Expiration; !exp.IsZero() && exp.Before(earliestExp) {
earliestExp = exp
addrs := make([]ma.Multiaddr, 0, len(pr.Addrs))
for k, _ := range pr.Addrs {
if a, err := ma.NewMultiaddr(k); err == nil {
addrs = append(addrs, a)
}
}
// Store a copy in the cache.
addrsCpy := make([]ma.Multiaddr, len(addrs))
copy(addrsCpy, addrs)
entry := cacheEntry{addrs: addrsCpy, expiration: earliestExp}
mgr.cache.Add(p, entry)
return addrs
}
// Peers returns all of the peer IDs for which the AddrBook has addresses.
func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
ids, err := uniquePeerIds(mgr.ds, abBase, func(result query.Result) string {
return ds.RawKey(result.Key).Parent().Name()
func (ab *dsAddrBook) PeersWithAddrs() peer.IDSlice {
ids, err := uniquePeerIds(ab.ds, addrBookBase, func(result query.Result) string {
return ds.RawKey(result.Key).Name()
})
if err != nil {
log.Errorf("error while retrieving peers with addresses: %v", err)
@ -403,107 +406,91 @@ func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := mgr.Addrs(p)
return mgr.subsManager.AddrStream(ctx, p, initial)
func (ab *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := ab.Addrs(p)
return ab.subsManager.AddrStream(ctx, p, initial)
}
// ClearAddrs will delete all known addresses for a peer ID.
func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
var (
err error
prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p)))
deleteFn func() error
)
if e, ok := mgr.cache.Peek(p); ok {
mgr.cache.Remove(p)
keys, _, _ := keysAndAddrs(p, e.(cacheEntry).addrs)
deleteFn = func() error {
return mgr.dbDelete(keys)
}
} else {
deleteFn = func() error {
return mgr.dbDeleteIter(prefix)
}
}
// Attempt transactional KV deletion.
for i := 0; i < mgr.writeRetries; i++ {
if err = deleteFn(); err == nil {
break
}
log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err)
}
func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
ab.cache.Remove(p)
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p)))
txn, err := ab.ds.NewTransaction(false)
if err != nil {
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), mgr.writeRetries)
}
}
// dbDelete transactionally deletes the provided keys.
func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error {
var err error
txn, err := mgr.ds.NewTransaction(false)
if err != nil {
return err
log.Errorf("failed to clear addresses for peer %s: %v\n", p.Pretty(), err)
}
defer txn.Discard()
for _, key := range keys {
if err = txn.Delete(key); err != nil {
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
return err
}
if err := txn.Delete(key); err != nil {
log.Errorf("failed to clear addresses for peer %s: %v\n", p.Pretty(), err)
}
if err = txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
return err
}
return nil
}
// dbDeleteIter removes all entries whose keys are prefixed with the argument.
// it returns a slice of the removed keys in case it's needed
func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) error {
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
txn, err := mgr.ds.NewTransaction(false)
func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) (err error) {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
return err
}
defer txn.Discard()
results, err := txn.Query(q)
if err != nil {
log.Errorf("failed to fetch all keys prefixed with: %s, cause: %v", prefix.String(), err)
return err
return fmt.Errorf("failed to load peerstore entry for peer %v while deleting addrs, err: %v", p, err)
}
var keys = make([]ds.Key, 0, 4) // cap: 4 to reduce allocs
var key ds.Key
for result := range results.Next() {
key = ds.RawKey(result.Key)
keys = append(keys, key)
pr.Lock()
defer pr.Unlock()
if err = txn.Delete(key); err != nil {
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
return err
now := time.Now()
broadcast := make([]bool, len(addrs))
newExp := now.Add(ttl)
for i, addr := range addrs {
e, ok := pr.Addrs[addr.String()]
if ok && mode == ttlExtend && e.Expiry.After(newExp) {
continue
}
pr.Addrs[addr.String()] = &pb.AddrBookRecord_AddrEntry{Expiry: &newExp, Ttl: &ttl}
broadcast[i] = !ok
}
// Update was successful, so broadcast event only for new addresses.
for i, v := range broadcast {
if v {
ab.subsManager.BroadcastAddr(p, addrs[i])
}
}
if err = results.Close(); err != nil {
log.Errorf("failed to close cursor, cause: %v", err)
return err
}
if err = txn.Commit(); err != nil {
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
return err
}
return nil
pr.Refresh(true)
return pr.Flush(ab.ds)
}
func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
pr, err := ab.loadRecord(p, false, false)
if err != nil {
return fmt.Errorf("failed to load peerstore entry for peer %v while deleting addrs, err: %v", p, err)
}
if pr.Addrs == nil {
return nil
}
pr.Lock()
defer pr.Unlock()
for _, addr := range addrs {
delete(pr.Addrs, addr.String())
}
pr.Refresh(true)
return pr.Flush(ab.ds)
}
func cleanAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
clean := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
if addr == nil {
continue
}
clean = append(clean, addr)
}
return clean
}

View File

@ -38,7 +38,7 @@ func TestDsAddrBook(t *testing.T) {
t.Parallel()
opts := DefaultOpts()
opts.TTLInterval = 100 * time.Microsecond
opts.GCInterval = 100 * time.Microsecond
opts.CacheSize = 1024
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
@ -48,7 +48,7 @@ func TestDsAddrBook(t *testing.T) {
t.Parallel()
opts := DefaultOpts()
opts.TTLInterval = 100 * time.Microsecond
opts.GCInterval = 100 * time.Microsecond
opts.CacheSize = 0
pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts))
@ -134,7 +134,10 @@ func addressBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Optio
tb.Fatal(err)
}
return ab, closeFunc
return ab, func() {
ab.Close()
closeFunc()
}
}
}

View File

@ -18,9 +18,8 @@ type Options struct {
// The size of the in-memory cache. A value of 0 or lower disables the cache.
CacheSize uint
// Sweep interval to expire entries, only used when TTL is *not* natively managed
// by the underlying datastore.
TTLInterval time.Duration
// Sweep interval to purge expired addresses from the datastore.
GCInterval time.Duration
// Number of times to retry transactional writes.
WriteRetries uint
@ -33,7 +32,7 @@ type Options struct {
func DefaultOpts() Options {
return Options{
CacheSize: 1024,
TTLInterval: time.Second,
GCInterval: 5 * time.Minute,
WriteRetries: 5,
}
}