diff --git a/codecov.yml b/codecov.yml index 5f88a9e..96dcfd7 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,3 +1,5 @@ coverage: range: "50...100" comment: off +ignore: + - "pb/*.pb.go" \ No newline at end of file diff --git a/interface.go b/interface.go index 25d2c08..4d16668 100644 --- a/interface.go +++ b/interface.go @@ -3,6 +3,7 @@ package peerstore import ( "context" "errors" + "io" "math" "time" @@ -49,6 +50,8 @@ const ( // Peerstore provides a threadsafe store of Peer related // information. type Peerstore interface { + io.Closer + AddrBook KeyBook PeerMetadata diff --git a/pb/Makefile b/pb/Makefile new file mode 100644 index 0000000..51e71f8 --- /dev/null +++ b/pb/Makefile @@ -0,0 +1,12 @@ +PB = $(wildcard *.proto) +GO = $(PB:.proto=.pb.go) + +all: $(GO) + +%.pb.go: %.proto + protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $< + +clean: + rm -f *.pb.go + +.PHONY: clean \ No newline at end of file diff --git a/pb/custom.go b/pb/custom.go new file mode 100644 index 0000000..711963c --- /dev/null +++ b/pb/custom.go @@ -0,0 +1,110 @@ +package pstore_pb + +import ( + "encoding/json" + + proto "github.com/gogo/protobuf/proto" + peer "github.com/libp2p/go-libp2p-peer" + pt "github.com/libp2p/go-libp2p-peer/test" + ma "github.com/multiformats/go-multiaddr" +) + +// customGogoType aggregates the interfaces that custom Gogo types need to implement. +// it is only used for type assertions. +type customGogoType interface { + proto.Marshaler + proto.Unmarshaler + json.Marshaler + json.Unmarshaler + proto.Sizer +} + +// ProtoAddr is a custom type used by gogo to serde raw peer IDs into the peer.ID type, and back. +type ProtoPeerID struct { + peer.ID +} + +var _ customGogoType = (*ProtoPeerID)(nil) + +func (id ProtoPeerID) Marshal() ([]byte, error) { + return []byte(id.ID), nil +} + +func (id ProtoPeerID) MarshalTo(data []byte) (n int, err error) { + return copy(data, []byte(id.ID)), nil +} + +func (id ProtoPeerID) MarshalJSON() ([]byte, error) { + m, _ := id.Marshal() + return json.Marshal(m) +} + +func (id *ProtoPeerID) Unmarshal(data []byte) (err error) { + id.ID = peer.ID(string(data)) + return nil +} + +func (id *ProtoPeerID) UnmarshalJSON(data []byte) error { + var v []byte + err := json.Unmarshal(data, v) + if err != nil { + return err + } + return id.Unmarshal(v) +} + +func (id ProtoPeerID) Size() int { + return len([]byte(id.ID)) +} + +// ProtoAddr is a custom type used by gogo to serde raw multiaddresses into the ma.Multiaddr type, and back. +type ProtoAddr struct { + ma.Multiaddr +} + +var _ customGogoType = (*ProtoAddr)(nil) + +func (a ProtoAddr) Marshal() ([]byte, error) { + return a.Bytes(), nil +} + +func (a ProtoAddr) MarshalTo(data []byte) (n int, err error) { + return copy(data, a.Bytes()), nil +} + +func (a ProtoAddr) MarshalJSON() ([]byte, error) { + m, _ := a.Marshal() + return json.Marshal(m) +} + +func (a *ProtoAddr) Unmarshal(data []byte) (err error) { + a.Multiaddr, err = ma.NewMultiaddrBytes(data) + return err +} + +func (a *ProtoAddr) UnmarshalJSON(data []byte) error { + v := new([]byte) + err := json.Unmarshal(data, v) + if err != nil { + return err + } + return a.Unmarshal(*v) +} + +func (a ProtoAddr) Size() int { + return len(a.Bytes()) +} + +// NewPopulatedProtoAddr generates a populated instance of the custom gogo type ProtoAddr. +// It is required by gogo-generated tests. +func NewPopulatedProtoAddr(r randyPstore) *ProtoAddr { + a, _ := ma.NewMultiaddr("/ip4/123.123.123.123/tcp/7001") + return &ProtoAddr{Multiaddr: a} +} + +// NewPopulatedProtoPeerID generates a populated instance of the custom gogo type ProtoPeerID. +// It is required by gogo-generated tests. +func NewPopulatedProtoPeerID(r randyPstore) *ProtoPeerID { + id, _ := pt.RandPeerID() + return &ProtoPeerID{ID: id} +} diff --git a/pb/pstore.pb.go b/pb/pstore.pb.go new file mode 100644 index 0000000..c80958c --- /dev/null +++ b/pb/pstore.pb.go @@ -0,0 +1,754 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pstore.proto + +package pstore_pb + +import ( + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// AddrBookRecord represents a record for a peer in the address book. +type AddrBookRecord struct { + // The peer ID. + Id *ProtoPeerID `protobuf:"bytes,1,opt,name=id,proto3,customtype=ProtoPeerID" json:"id,omitempty"` + // The multiaddresses. This is a sorted list where element 0 expires the soonest. + Addrs []*AddrBookRecord_AddrEntry `protobuf:"bytes,2,rep,name=addrs,proto3" json:"addrs,omitempty"` +} + +func (m *AddrBookRecord) Reset() { *m = AddrBookRecord{} } +func (m *AddrBookRecord) String() string { return proto.CompactTextString(m) } +func (*AddrBookRecord) ProtoMessage() {} +func (*AddrBookRecord) Descriptor() ([]byte, []int) { + return fileDescriptor_f96873690e08a98f, []int{0} +} +func (m *AddrBookRecord) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AddrBookRecord) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AddrBookRecord.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AddrBookRecord) XXX_Merge(src proto.Message) { + xxx_messageInfo_AddrBookRecord.Merge(m, src) +} +func (m *AddrBookRecord) XXX_Size() int { + return m.Size() +} +func (m *AddrBookRecord) XXX_DiscardUnknown() { + xxx_messageInfo_AddrBookRecord.DiscardUnknown(m) +} + +var xxx_messageInfo_AddrBookRecord proto.InternalMessageInfo + +func (m *AddrBookRecord) GetAddrs() []*AddrBookRecord_AddrEntry { + if m != nil { + return m.Addrs + } + return nil +} + +// AddrEntry represents a single multiaddress. +type AddrBookRecord_AddrEntry struct { + Addr *ProtoAddr `protobuf:"bytes,1,opt,name=addr,proto3,customtype=ProtoAddr" json:"addr,omitempty"` + // The point in time when this address expires. + Expiry int64 `protobuf:"varint,2,opt,name=expiry,proto3" json:"expiry,omitempty"` + // The original TTL of this address. + Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"` +} + +func (m *AddrBookRecord_AddrEntry) Reset() { *m = AddrBookRecord_AddrEntry{} } +func (m *AddrBookRecord_AddrEntry) String() string { return proto.CompactTextString(m) } +func (*AddrBookRecord_AddrEntry) ProtoMessage() {} +func (*AddrBookRecord_AddrEntry) Descriptor() ([]byte, []int) { + return fileDescriptor_f96873690e08a98f, []int{0, 0} +} +func (m *AddrBookRecord_AddrEntry) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AddrBookRecord_AddrEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AddrBookRecord_AddrEntry.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AddrBookRecord_AddrEntry) XXX_Merge(src proto.Message) { + xxx_messageInfo_AddrBookRecord_AddrEntry.Merge(m, src) +} +func (m *AddrBookRecord_AddrEntry) XXX_Size() int { + return m.Size() +} +func (m *AddrBookRecord_AddrEntry) XXX_DiscardUnknown() { + xxx_messageInfo_AddrBookRecord_AddrEntry.DiscardUnknown(m) +} + +var xxx_messageInfo_AddrBookRecord_AddrEntry proto.InternalMessageInfo + +func (m *AddrBookRecord_AddrEntry) GetExpiry() int64 { + if m != nil { + return m.Expiry + } + return 0 +} + +func (m *AddrBookRecord_AddrEntry) GetTtl() int64 { + if m != nil { + return m.Ttl + } + return 0 +} + +func init() { + proto.RegisterType((*AddrBookRecord)(nil), "pstore.pb.AddrBookRecord") + proto.RegisterType((*AddrBookRecord_AddrEntry)(nil), "pstore.pb.AddrBookRecord.AddrEntry") +} + +func init() { proto.RegisterFile("pstore.proto", fileDescriptor_f96873690e08a98f) } + +var fileDescriptor_f96873690e08a98f = []byte{ + // 255 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x28, 0x2e, 0xc9, + 0x2f, 0x4a, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0xf1, 0x92, 0xa4, 0x74, 0xd3, + 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5, 0xc1, + 0x2a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0xe8, 0x54, 0x3a, 0xc6, 0xc8, 0xc5, + 0xe7, 0x98, 0x92, 0x52, 0xe4, 0x94, 0x9f, 0x9f, 0x1d, 0x94, 0x9a, 0x9c, 0x5f, 0x94, 0x22, 0x24, + 0xcf, 0xc5, 0x94, 0x99, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0xe3, 0xc4, 0x7f, 0xeb, 0x9e, 0x3c, + 0x77, 0x00, 0x48, 0x65, 0x40, 0x6a, 0x6a, 0x91, 0xa7, 0x4b, 0x10, 0x53, 0x66, 0x8a, 0x90, 0x25, + 0x17, 0x6b, 0x62, 0x4a, 0x4a, 0x51, 0xb1, 0x04, 0x93, 0x02, 0xb3, 0x06, 0xb7, 0x91, 0xb2, 0x1e, + 0xdc, 0x76, 0x3d, 0x54, 0xa3, 0xc0, 0x5c, 0xd7, 0xbc, 0x92, 0xa2, 0xca, 0x20, 0x88, 0x0e, 0xa9, + 0x08, 0x2e, 0x4e, 0xb8, 0x98, 0x90, 0x22, 0x17, 0x0b, 0x48, 0x14, 0x6a, 0x15, 0xef, 0xad, 0x7b, + 0xf2, 0x9c, 0x60, 0xab, 0x40, 0x2a, 0x82, 0xc0, 0x52, 0x42, 0x62, 0x5c, 0x6c, 0xa9, 0x15, 0x05, + 0x99, 0x45, 0x95, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x50, 0x9e, 0x90, 0x00, 0x17, 0x73, + 0x49, 0x49, 0x8e, 0x04, 0x33, 0x58, 0x10, 0xc4, 0x74, 0x52, 0xf8, 0xf1, 0x50, 0x8e, 0xf1, 0xc0, + 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x71, + 0xc2, 0x63, 0x39, 0x86, 0x0b, 0x8f, 0xe5, 0x18, 0x6e, 0x3c, 0x96, 0x63, 0x48, 0x62, 0x03, 0xfb, + 0xd8, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xb1, 0x1a, 0x16, 0x43, 0x3b, 0x01, 0x00, 0x00, +} + +func (m *AddrBookRecord) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AddrBookRecord) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Id != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintPstore(dAtA, i, uint64(m.Id.Size())) + n1, err := m.Id.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if len(m.Addrs) > 0 { + for _, msg := range m.Addrs { + dAtA[i] = 0x12 + i++ + i = encodeVarintPstore(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *AddrBookRecord_AddrEntry) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AddrBookRecord_AddrEntry) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Addr != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintPstore(dAtA, i, uint64(m.Addr.Size())) + n2, err := m.Addr.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + if m.Expiry != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintPstore(dAtA, i, uint64(m.Expiry)) + } + if m.Ttl != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintPstore(dAtA, i, uint64(m.Ttl)) + } + return i, nil +} + +func encodeVarintPstore(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func NewPopulatedAddrBookRecord(r randyPstore, easy bool) *AddrBookRecord { + this := &AddrBookRecord{} + this.Id = NewPopulatedProtoPeerID(r) + if r.Intn(10) != 0 { + v1 := r.Intn(5) + this.Addrs = make([]*AddrBookRecord_AddrEntry, v1) + for i := 0; i < v1; i++ { + this.Addrs[i] = NewPopulatedAddrBookRecord_AddrEntry(r, easy) + } + } + if !easy && r.Intn(10) != 0 { + } + return this +} + +func NewPopulatedAddrBookRecord_AddrEntry(r randyPstore, easy bool) *AddrBookRecord_AddrEntry { + this := &AddrBookRecord_AddrEntry{} + this.Addr = NewPopulatedProtoAddr(r) + this.Expiry = int64(r.Int63()) + if r.Intn(2) == 0 { + this.Expiry *= -1 + } + this.Ttl = int64(r.Int63()) + if r.Intn(2) == 0 { + this.Ttl *= -1 + } + if !easy && r.Intn(10) != 0 { + } + return this +} + +type randyPstore interface { + Float32() float32 + Float64() float64 + Int63() int64 + Int31() int32 + Uint32() uint32 + Intn(n int) int +} + +func randUTF8RunePstore(r randyPstore) rune { + ru := r.Intn(62) + if ru < 10 { + return rune(ru + 48) + } else if ru < 36 { + return rune(ru + 55) + } + return rune(ru + 61) +} +func randStringPstore(r randyPstore) string { + v2 := r.Intn(100) + tmps := make([]rune, v2) + for i := 0; i < v2; i++ { + tmps[i] = randUTF8RunePstore(r) + } + return string(tmps) +} +func randUnrecognizedPstore(r randyPstore, maxFieldNumber int) (dAtA []byte) { + l := r.Intn(5) + for i := 0; i < l; i++ { + wire := r.Intn(4) + if wire == 3 { + wire = 5 + } + fieldNumber := maxFieldNumber + r.Intn(100) + dAtA = randFieldPstore(dAtA, r, fieldNumber, wire) + } + return dAtA +} +func randFieldPstore(dAtA []byte, r randyPstore, fieldNumber int, wire int) []byte { + key := uint32(fieldNumber)<<3 | uint32(wire) + switch wire { + case 0: + dAtA = encodeVarintPopulatePstore(dAtA, uint64(key)) + v3 := r.Int63() + if r.Intn(2) == 0 { + v3 *= -1 + } + dAtA = encodeVarintPopulatePstore(dAtA, uint64(v3)) + case 1: + dAtA = encodeVarintPopulatePstore(dAtA, uint64(key)) + dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) + case 2: + dAtA = encodeVarintPopulatePstore(dAtA, uint64(key)) + ll := r.Intn(100) + dAtA = encodeVarintPopulatePstore(dAtA, uint64(ll)) + for j := 0; j < ll; j++ { + dAtA = append(dAtA, byte(r.Intn(256))) + } + default: + dAtA = encodeVarintPopulatePstore(dAtA, uint64(key)) + dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) + } + return dAtA +} +func encodeVarintPopulatePstore(dAtA []byte, v uint64) []byte { + for v >= 1<<7 { + dAtA = append(dAtA, uint8(uint64(v)&0x7f|0x80)) + v >>= 7 + } + dAtA = append(dAtA, uint8(v)) + return dAtA +} +func (m *AddrBookRecord) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Id != nil { + l = m.Id.Size() + n += 1 + l + sovPstore(uint64(l)) + } + if len(m.Addrs) > 0 { + for _, e := range m.Addrs { + l = e.Size() + n += 1 + l + sovPstore(uint64(l)) + } + } + return n +} + +func (m *AddrBookRecord_AddrEntry) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Addr != nil { + l = m.Addr.Size() + n += 1 + l + sovPstore(uint64(l)) + } + if m.Expiry != 0 { + n += 1 + sovPstore(uint64(m.Expiry)) + } + if m.Ttl != 0 { + n += 1 + sovPstore(uint64(m.Ttl)) + } + return n +} + +func sovPstore(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozPstore(x uint64) (n int) { + return sovPstore(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *AddrBookRecord) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AddrBookRecord: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AddrBookRecord: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthPstore + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthPstore + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var v ProtoPeerID + m.Id = &v + if err := m.Id.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPstore + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthPstore + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addrs = append(m.Addrs, &AddrBookRecord_AddrEntry{}) + if err := m.Addrs[len(m.Addrs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipPstore(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthPstore + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthPstore + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AddrBookRecord_AddrEntry) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AddrEntry: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AddrEntry: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthPstore + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthPstore + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var v ProtoAddr + m.Addr = &v + if err := m.Addr.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiry", wireType) + } + m.Expiry = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Expiry |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) + } + m.Ttl = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Ttl |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipPstore(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthPstore + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthPstore + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipPstore(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPstore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPstore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPstore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthPstore + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthPstore + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPstore + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipPstore(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthPstore + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthPstore = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowPstore = fmt.Errorf("proto: integer overflow") +) diff --git a/pb/pstore.proto b/pb/pstore.proto new file mode 100644 index 0000000..3894534 --- /dev/null +++ b/pb/pstore.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; +package pstore.pb; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +option (gogoproto.benchgen_all) = true; +option (gogoproto.populate_all) = true; + +// AddrBookRecord represents a record for a peer in the address book. +message AddrBookRecord { + // The peer ID. + bytes id = 1 [(gogoproto.customtype) = "ProtoPeerID"]; + + // The multiaddresses. This is a sorted list where element 0 expires the soonest. + repeated AddrEntry addrs = 2; + + // AddrEntry represents a single multiaddress. + message AddrEntry { + bytes addr = 1 [(gogoproto.customtype) = "ProtoAddr"]; + + // The point in time when this address expires. + int64 expiry = 2; + + // The original TTL of this address. + int64 ttl = 3; + } +} diff --git a/pb/pstorepb_test.go b/pb/pstorepb_test.go new file mode 100644 index 0000000..3569d04 --- /dev/null +++ b/pb/pstorepb_test.go @@ -0,0 +1,129 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pstore.proto + +package pstore_pb + +import ( + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto" + proto "github.com/gogo/protobuf/proto" + math "math" + math_rand "math/rand" + testing "testing" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +func BenchmarkAddrBookRecordProtoMarshal(b *testing.B) { + popr := math_rand.New(math_rand.NewSource(616)) + total := 0 + pops := make([]*AddrBookRecord, 10000) + for i := 0; i < 10000; i++ { + pops[i] = NewPopulatedAddrBookRecord(popr, false) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + dAtA, err := github_com_gogo_protobuf_proto.Marshal(pops[i%10000]) + if err != nil { + panic(err) + } + total += len(dAtA) + } + b.SetBytes(int64(total / b.N)) +} + +func BenchmarkAddrBookRecordProtoUnmarshal(b *testing.B) { + popr := math_rand.New(math_rand.NewSource(616)) + total := 0 + datas := make([][]byte, 10000) + for i := 0; i < 10000; i++ { + dAtA, err := github_com_gogo_protobuf_proto.Marshal(NewPopulatedAddrBookRecord(popr, false)) + if err != nil { + panic(err) + } + datas[i] = dAtA + } + msg := &AddrBookRecord{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + total += len(datas[i%10000]) + if err := github_com_gogo_protobuf_proto.Unmarshal(datas[i%10000], msg); err != nil { + panic(err) + } + } + b.SetBytes(int64(total / b.N)) +} + +func BenchmarkAddrBookRecord_AddrEntryProtoMarshal(b *testing.B) { + popr := math_rand.New(math_rand.NewSource(616)) + total := 0 + pops := make([]*AddrBookRecord_AddrEntry, 10000) + for i := 0; i < 10000; i++ { + pops[i] = NewPopulatedAddrBookRecord_AddrEntry(popr, false) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + dAtA, err := github_com_gogo_protobuf_proto.Marshal(pops[i%10000]) + if err != nil { + panic(err) + } + total += len(dAtA) + } + b.SetBytes(int64(total / b.N)) +} + +func BenchmarkAddrBookRecord_AddrEntryProtoUnmarshal(b *testing.B) { + popr := math_rand.New(math_rand.NewSource(616)) + total := 0 + datas := make([][]byte, 10000) + for i := 0; i < 10000; i++ { + dAtA, err := github_com_gogo_protobuf_proto.Marshal(NewPopulatedAddrBookRecord_AddrEntry(popr, false)) + if err != nil { + panic(err) + } + datas[i] = dAtA + } + msg := &AddrBookRecord_AddrEntry{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + total += len(datas[i%10000]) + if err := github_com_gogo_protobuf_proto.Unmarshal(datas[i%10000], msg); err != nil { + panic(err) + } + } + b.SetBytes(int64(total / b.N)) +} + +func BenchmarkAddrBookRecordSize(b *testing.B) { + popr := math_rand.New(math_rand.NewSource(616)) + total := 0 + pops := make([]*AddrBookRecord, 1000) + for i := 0; i < 1000; i++ { + pops[i] = NewPopulatedAddrBookRecord(popr, false) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + total += pops[i%1000].Size() + } + b.SetBytes(int64(total / b.N)) +} + +func BenchmarkAddrBookRecord_AddrEntrySize(b *testing.B) { + popr := math_rand.New(math_rand.NewSource(616)) + total := 0 + pops := make([]*AddrBookRecord_AddrEntry, 1000) + for i := 0; i < 1000; i++ { + pops[i] = NewPopulatedAddrBookRecord_AddrEntry(popr, false) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + total += pops[i%1000].Size() + } + b.SetBytes(int64(total / b.N)) +} + +//These tests are generated by github.com/gogo/protobuf/plugin/testgen diff --git a/peerstore.go b/peerstore.go index e460b1c..40411fb 100644 --- a/peerstore.go +++ b/peerstore.go @@ -2,6 +2,7 @@ package peerstore import ( "fmt" + "io" "sync" peer "github.com/libp2p/go-libp2p-peer" @@ -31,6 +32,26 @@ func NewPeerstore(kb KeyBook, ab AddrBook, md PeerMetadata) Peerstore { } } +func (ps *peerstore) Close() (err error) { + var errs []error + weakClose := func(name string, c interface{}) { + if cl, ok := c.(io.Closer); ok { + if err = cl.Close(); err != nil { + errs = append(errs, fmt.Errorf("%s error: %s", name, err)) + } + } + } + + weakClose("keybook", ps.KeyBook) + weakClose("addressbook", ps.AddrBook) + weakClose("peermetadata", ps.PeerMetadata) + + if len(errs) > 0 { + return fmt.Errorf("failed while closing peerstore; err(s): %q", errs) + } + return nil +} + func (ps *peerstore) Peers() peer.IDSlice { set := map[peer.ID]struct{}{} for _, p := range ps.PeersWithKeys() { diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 9e92336..9690bd5 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -2,50 +2,25 @@ package pstoreds import ( "context" - "encoding/binary" - "errors" + "fmt" + "sort" + "sync" "time" - lru "github.com/hashicorp/golang-lru" - base32 "github.com/whyrusleeping/base32" - ds "github.com/ipfs/go-datastore" query "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log" - ma "github.com/multiformats/go-multiaddr" - mh "github.com/multiformats/go-multihash" - peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" + pb "github.com/libp2p/go-libp2p-peerstore/pb" pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" + + lru "github.com/hashicorp/golang-lru" + ma "github.com/multiformats/go-multiaddr" + b32 "github.com/whyrusleeping/base32" ) -var ( - log = logging.Logger("peerstore/ds") - // The maximum representable value in time.Time is time.Unix(1<<63-62135596801, 999999999). - // But it's too brittle and implementation-dependent, so we prefer to use 1<<62, which is in the - // year 146138514283. We're safe. - maxTime = time.Unix(1<<62, 0) - - ErrTTLDatastore = errors.New("datastore must provide TTL support") -) - -// Peer addresses are stored under the following db key pattern: -// /peers/addr// -var abBase = ds.NewKey("/peers/addrs") - -var _ pstore.AddrBook = (*dsAddrBook)(nil) - -// dsAddrBook is an address book backed by a Datastore with both an -// in-memory TTL manager and an in-memory address stream manager. -type dsAddrBook struct { - cache cache - ds ds.TxnDatastore - subsManager *pstoremem.AddrSubManager - writeRetries int -} - type ttlWriteMode int const ( @@ -53,347 +28,283 @@ const ( ttlExtend ) -type cacheEntry struct { - expiration time.Time - addrs []ma.Multiaddr +var ( + log = logging.Logger("peerstore/ds") + + // Peer addresses are stored db key pattern: + // /peers/addrs/ + addrBookBase = ds.NewKey("/peers/addrs") +) + +// addrsRecord decorates the AddrBookRecord with locks and metadata. +type addrsRecord struct { + sync.RWMutex + *pb.AddrBookRecord + dirty bool } -type addrRecord struct { - ttl time.Duration - addr ma.Multiaddr -} +// flush writes the record to the datastore by calling ds.Put, unless the record is +// marked for deletion, in which case we call ds.Delete. To be called within a lock. +func (r *addrsRecord) flush(write ds.Write) (err error) { + key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID))) + if len(r.Addrs) == 0 { + if err = write.Delete(key); err == nil { + r.dirty = false + } + return err + } -func (ar *addrRecord) MarshalBinary() ([]byte, error) { - ttlB := make([]byte, 8) - binary.LittleEndian.PutUint64(ttlB, uint64(ar.ttl)) - return append(ttlB, ar.addr.Bytes()...), nil -} - -func (ar *addrRecord) UnmarshalBinary(b []byte) error { - ar.ttl = time.Duration(binary.LittleEndian.Uint64(b)) - // this had been serialized by us, no need to check for errors - ar.addr, _ = ma.NewMultiaddrBytes(b[8:]) + data, err := r.Marshal() + if err != nil { + return err + } + if err = write.Put(key, data); err != nil { + return err + } + // write succeeded; record is no longer dirty. + r.dirty = false return nil } -// NewAddrBook initializes a new address book given a -// Datastore instance, a context for managing the TTL manager, -// and the interval at which the TTL manager should sweep the Datastore. -func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (*dsAddrBook, error) { - if _, ok := store.(ds.TTLDatastore); !ok { - return nil, ErrTTLDatastore +// clean is called on records to perform housekeeping. The return value indicates if the record was changed +// as a result of this call. +// +// clean does the following: +// * sorts addresses by expiration (soonest expiring first). +// * removes expired addresses. +// +// It short-circuits optimistically when there's nothing to do. +// +// clean is called from several points: +// * when accessing an entry. +// * when performing periodic GC. +// * after an entry has been modified (e.g. addresses have been added or removed, TTLs updated, etc.) +// +// If the return value is true, the caller should perform a flush immediately to sync the record with the store. +func (r *addrsRecord) clean() (chgd bool) { + now := time.Now().Unix() + if !r.dirty && len(r.Addrs) > 0 && r.Addrs[0].Expiry > now { + // record is not dirty, and we have no expired entries to purge. + return false } - var ( - cache cache = &noopCache{} - err error - ) + if len(r.Addrs) == 0 { + // this is a ghost record; let's signal it has to be written. + // flush() will take care of doing the deletion. + return true + } - if opts.CacheSize > 0 { - if cache, err = lru.NewARC(int(opts.CacheSize)); err != nil { - return nil, err + if r.dirty && len(r.Addrs) > 1 { + // the record has been modified, so it may need resorting. + // we keep addresses sorted by expiration, where 0 is the soonest expiring. + sort.Slice(r.Addrs, func(i, j int) bool { + return r.Addrs[i].Expiry < r.Addrs[j].Expiry + }) + } + + // since addresses are sorted by expiration, we find the first + // survivor and split the slice on its index. + pivot := -1 + for i, addr := range r.Addrs { + if addr.Expiry > now { + break } + pivot = i } - mgr := &dsAddrBook{ - cache: cache, - ds: store, - subsManager: pstoremem.NewAddrSubManager(), - writeRetries: int(opts.WriteRetries), - } - return mgr, nil + r.Addrs = r.Addrs[pivot+1:] + return r.dirty || pivot >= 0 } -func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) { - var ( - keys = make([]ds.Key, len(addrs)) - clean = make([]ma.Multiaddr, len(addrs)) - parentKey = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))) - i = 0 - ) +// dsAddrBook is an address book backed by a Datastore with a GC procedure to purge expired entries. It uses an +// in-memory address stream manager. See the NewAddrBook for more information. +type dsAddrBook struct { + ctx context.Context + opts Options - for _, addr := range addrs { - if addr == nil { - continue - } + cache cache + ds ds.Batching + gc *dsAddrBookGc + subsManager *pstoremem.AddrSubManager - hash, err := mh.Sum((addr).Bytes(), mh.MURMUR3, -1) - if err != nil { - return nil, nil, err - } - keys[i] = parentKey.ChildString(base32.RawStdEncoding.EncodeToString(hash)) - clean[i] = addr - i++ + // controls children goroutine lifetime. + childrenDone sync.WaitGroup + cancelFn func() +} + +var _ pstore.AddrBook = (*dsAddrBook)(nil) + +// NewAddrBook initializes a new datastore-backed address book. It serves as a drop-in replacement for pstoremem +// (memory-backed peerstore), and works with any datastore implementing the ds.Batching interface. +// +// Addresses and peer records are serialized into protobuf, storing one datastore entry per peer, along with metadata +// to control address expiration. To alleviate disk access and serde overhead, we internally use a read/write-through +// ARC cache, the size of which is adjustable via Options.CacheSize. +// +// The user has a choice of two GC algorithms: +// +// - lookahead GC: minimises the amount of full store traversals by maintaining a time-indexed list of entries that +// need to be visited within the period specified in Options.GCLookaheadInterval. This is useful in scenarios with +// considerable TTL variance, coupled with datastores whose native iterators return entries in lexicographical key +// order. Enable this mode by passing a value Options.GCLookaheadInterval > 0. Lookahead windows are jumpy, not +// sliding. Purges operate exclusively over the lookahead window with periodicity Options.GCPurgeInterval. +// +// - full-purge GC (default): performs a full visit of the store with periodicity Options.GCPurgeInterval. Useful when +// the range of possible TTL values is small and the values themselves are also extreme, e.g. 10 minutes or +// permanent, popular values used in other libp2p modules. In this cited case, optimizing with lookahead windows +// makes little sense. +func NewAddrBook(ctx context.Context, store ds.Batching, opts Options) (ab *dsAddrBook, err error) { + ctx, cancelFn := context.WithCancel(ctx) + ab = &dsAddrBook{ + ctx: ctx, + ds: store, + opts: opts, + cancelFn: cancelFn, + subsManager: pstoremem.NewAddrSubManager(), } - return keys[:i], clean[:i], nil + if opts.CacheSize > 0 { + if ab.cache, err = lru.NewARC(int(opts.CacheSize)); err != nil { + return nil, err + } + } else { + ab.cache = new(noopCache) + } + + if ab.gc, err = newAddressBookGc(ctx, ab); err != nil { + return nil, err + } + + return ab, nil +} + +func (ab *dsAddrBook) Close() error { + ab.cancelFn() + ab.childrenDone.Wait() + return nil +} + +// loadRecord is a read-through fetch. It fetches a record from cache, falling back to the +// datastore upon a miss, and returning a newly initialized record if the peer doesn't exist. +// +// loadRecord calls clean() on an existing record before returning it. If the record changes +// as a result and the update argument is true, the resulting state is saved in the datastore. +// +// If the cache argument is true, the record is inserted in the cache when loaded from the datastore. +func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) { + if e, ok := ab.cache.Get(id); ok { + pr = e.(*addrsRecord) + pr.Lock() + defer pr.Unlock() + + if pr.clean() && update { + err = pr.flush(ab.ds) + } + return pr, err + } + + pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} + key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id))) + data, err := ab.ds.Get(key) + + switch err { + case ds.ErrNotFound: + err = nil + pr.Id = &pb.ProtoPeerID{ID: id} + case nil: + if err = pr.Unmarshal(data); err != nil { + return nil, err + } + // this record is new and local for now (not in cache), so we don't need to lock. + if pr.clean() && update { + err = pr.flush(ab.ds) + } + default: + return nil, err + } + + if cache { + ab.cache.Add(id, pr) + } + return pr, err } // AddAddr will add a new address if it's not already in the AddrBook. -func (mgr *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { - mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) +func (ab *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { + ab.AddAddrs(p, []ma.Multiaddr{addr}, ttl) } // AddAddrs will add many new addresses if they're not already in the AddrBook. -func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { +func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { if ttl <= 0 { return } - mgr.setAddrs(p, addrs, ttl, ttlExtend) + addrs = cleanAddrs(addrs) + ab.setAddrs(p, addrs, ttl, ttlExtend) } // SetAddr will add or update the TTL of an address in the AddrBook. -func (mgr *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { - addrs := []ma.Multiaddr{addr} - mgr.SetAddrs(p, addrs, ttl) +func (ab *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { + ab.SetAddrs(p, []ma.Multiaddr{addr}, ttl) } // SetAddrs will add or update the TTLs of addresses in the AddrBook. -func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { +func (ab *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { + addrs = cleanAddrs(addrs) if ttl <= 0 { - mgr.deleteAddrs(p, addrs) + ab.deleteAddrs(p, addrs) return } - mgr.setAddrs(p, addrs, ttl, ttlOverride) -} - -func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error { - // Keys and cleaned up addresses. - keys, addrs, err := keysAndAddrs(p, addrs) - if err != nil { - return err - } - - mgr.cache.Remove(p) - // Attempt transactional KV deletion. - for i := 0; i < mgr.writeRetries; i++ { - if err = mgr.dbDelete(keys); err == nil { - break - } - log.Errorf("failed to delete addresses for peer %s: %s\n", p.Pretty(), err) - } - - if err != nil { - log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err) - return err - } - - return nil -} - -func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) error { - // Keys and cleaned up addresses. - keys, addrs, err := keysAndAddrs(p, addrs) - if err != nil { - return err - } - - mgr.cache.Remove(p) - // Attempt transactional KV insertion. - var existed []bool - for i := 0; i < mgr.writeRetries; i++ { - if existed, err = mgr.dbInsert(keys, addrs, ttl, mode); err == nil { - break - } - log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err) - } - - if err != nil { - log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err) - return err - } - - // Update was successful, so broadcast event only for new addresses. - for i, _ := range keys { - if !existed[i] { - mgr.subsManager.BroadcastAddr(p, addrs[i]) - } - } - return nil -} - -// dbInsert performs a transactional insert of the provided keys and values. -func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) ([]bool, error) { - var ( - err error - existed = make([]bool, len(keys)) - exp = time.Now().Add(ttl) - ) - - txn, err := mgr.ds.NewTransaction(false) - if err != nil { - return nil, err - } - defer txn.Discard() - - ttltxn := txn.(ds.TTLDatastore) - for i, key := range keys { - // Check if the key existed previously. - if existed[i], err = ttltxn.Has(key); err != nil { - log.Errorf("transaction failed and aborted while checking key existence: %s, cause: %v", key.String(), err) - return nil, err - } - - // The key embeds a hash of the value, so if it existed, we can safely skip the insert and - // just update the TTL. - if existed[i] { - switch mode { - case ttlOverride: - err = ttltxn.SetTTL(key, ttl) - case ttlExtend: - var curr time.Time - if curr, err = ttltxn.GetExpiration(key); err == nil && exp.After(curr) { - err = ttltxn.SetTTL(key, ttl) - } - } - if err != nil { - // mode will be printed as an int - log.Errorf("failed while updating the ttl for key: %s, mode: %v, cause: %v", key.String(), mode, err) - return nil, err - } - continue - } - - r := &addrRecord{ - ttl: ttl, - addr: addrs[i], - } - value, _ := r.MarshalBinary() - if err = ttltxn.PutWithTTL(key, value, ttl); err != nil { - log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err) - return nil, err - } - } - - if err = txn.Commit(); err != nil { - log.Errorf("failed to commit transaction when setting keys, cause: %v", err) - return nil, err - } - - return existed, nil + ab.setAddrs(p, addrs, ttl, ttlOverride) } // UpdateAddrs will update any addresses for a given peer and TTL combination to // have a new TTL. -func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { - mgr.cache.Remove(p) - - var err error - for i := 0; i < mgr.writeRetries; i++ { - if err = mgr.dbUpdateTTL(p, oldTTL, newTTL); err == nil { - break - } - log.Errorf("failed to update ttlsfor peer %s: %s\n", p.Pretty(), err) - } - +func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) { + pr, err := ab.loadRecord(p, true, false) if err != nil { - log.Errorf("failed to avoid write conflict when updating ttls for peer %s after %d retries: %v\n", - p.Pretty(), mgr.writeRetries, err) + log.Errorf("failed to update ttls for peer %s: %s\n", p.Pretty(), err) } -} -func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error { - var ( - prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))) - q = query.Query{Prefix: prefix.String(), KeysOnly: false} - results query.Results - err error - ) + pr.Lock() + defer pr.Unlock() - txn, err := mgr.ds.NewTransaction(false) - if err != nil { - return err - } - defer txn.Discard() - - if results, err = txn.Query(q); err != nil { - return err - } - defer results.Close() - - ttltxn := txn.(ds.TTLDatastore) - r := &addrRecord{} - for result := range results.Next() { - r.UnmarshalBinary(result.Value) - if r.ttl != oldTTL { + newExp := time.Now().Add(newTTL).Unix() + for _, entry := range pr.Addrs { + if entry.Ttl != int64(oldTTL) { continue } - - r.ttl = newTTL - value, _ := r.MarshalBinary() - if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), value, newTTL); err != nil { - return err - } + entry.Ttl, entry.Expiry = int64(newTTL), newExp + pr.dirty = true } - if err := txn.Commit(); err != nil { - log.Errorf("failed to commit transaction when updating ttls, cause: %v", err) - return err + if pr.clean() { + pr.flush(ab.ds) } - - return nil } // Addrs returns all of the non-expired addresses for a given peer. -func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { - var ( - prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))) - q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true} - results query.Results - err error - ) - - // Check the cache and return the entry only if it hasn't expired; if expired, remove. - if e, ok := mgr.cache.Get(p); ok { - entry := e.(cacheEntry) - if entry.expiration.After(time.Now()) { - addrs := make([]ma.Multiaddr, len(entry.addrs)) - copy(addrs, entry.addrs) - return addrs - } else { - mgr.cache.Remove(p) - } - } - - txn, err := mgr.ds.NewTransaction(true) +func (ab *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { + pr, err := ab.loadRecord(p, true, true) if err != nil { + log.Warning("failed to load peerstore entry for peer %v while querying addrs, err: %v", p, err) return nil } - defer txn.Discard() - if results, err = txn.Query(q); err != nil { - log.Error(err) - return nil + pr.RLock() + defer pr.RUnlock() + + addrs := make([]ma.Multiaddr, 0, len(pr.Addrs)) + for _, a := range pr.Addrs { + addrs = append(addrs, a.Addr) } - defer results.Close() - - var addrs []ma.Multiaddr - var r addrRecord - // used to set the expiration for the entire cache entry - earliestExp := maxTime - for result := range results.Next() { - if err = r.UnmarshalBinary(result.Value); err == nil { - addrs = append(addrs, r.addr) - } - - if exp := result.Expiration; !exp.IsZero() && exp.Before(earliestExp) { - earliestExp = exp - } - } - - // Store a copy in the cache. - addrsCpy := make([]ma.Multiaddr, len(addrs)) - copy(addrsCpy, addrs) - entry := cacheEntry{addrs: addrsCpy, expiration: earliestExp} - mgr.cache.Add(p, entry) - return addrs } // Peers returns all of the peer IDs for which the AddrBook has addresses. -func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice { - ids, err := uniquePeerIds(mgr.ds, abBase, func(result query.Result) string { - return ds.RawKey(result.Key).Parent().Name() +func (ab *dsAddrBook) PeersWithAddrs() peer.IDSlice { + ids, err := uniquePeerIds(ab.ds, addrBookBase, func(result query.Result) string { + return ds.RawKey(result.Key).Name() }) if err != nil { log.Errorf("error while retrieving peers with addresses: %v", err) @@ -403,107 +314,114 @@ func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice { // AddrStream returns a channel on which all new addresses discovered for a // given peer ID will be published. -func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { - initial := mgr.Addrs(p) - return mgr.subsManager.AddrStream(ctx, p, initial) +func (ab *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { + initial := ab.Addrs(p) + return ab.subsManager.AddrStream(ctx, p, initial) } // ClearAddrs will delete all known addresses for a peer ID. -func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { - var ( - err error - prefix = abBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))) - deleteFn func() error - ) +func (ab *dsAddrBook) ClearAddrs(p peer.ID) { + ab.cache.Remove(p) - if e, ok := mgr.cache.Peek(p); ok { - mgr.cache.Remove(p) - keys, _, _ := keysAndAddrs(p, e.(cacheEntry).addrs) - deleteFn = func() error { - return mgr.dbDelete(keys) - } - } else { - deleteFn = func() error { - return mgr.dbDeleteIter(prefix) - } - } - - // Attempt transactional KV deletion. - for i := 0; i < mgr.writeRetries; i++ { - if err = deleteFn(); err == nil { - break - } - log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err) - } - - if err != nil { - log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), mgr.writeRetries) + key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p))) + if err := ab.ds.Delete(key); err != nil { + log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err) } } -// dbDelete transactionally deletes the provided keys. -func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error { - var err error - - txn, err := mgr.ds.NewTransaction(false) +func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) (err error) { + pr, err := ab.loadRecord(p, true, false) if err != nil { - return err + return fmt.Errorf("failed to load peerstore entry for peer %v while setting addrs, err: %v", p, err) } - defer txn.Discard() - for _, key := range keys { - if err = txn.Delete(key); err != nil { - log.Errorf("failed to delete key: %s, cause: %v", key.String(), err) - return err + pr.Lock() + defer pr.Unlock() + + newExp := time.Now().Add(ttl).Unix() + existed := make([]bool, len(addrs)) // keeps track of which addrs we found. + +Outer: + for i, incoming := range addrs { + for _, have := range pr.Addrs { + if incoming.Equal(have.Addr) { + existed[i] = true + if mode == ttlExtend && have.Expiry > newExp { + // if we're only extending TTLs but the addr already has a longer one, we skip it. + continue Outer + } + have.Expiry = newExp + // we found the address, and addresses cannot be duplicate, + // so let's move on to the next. + continue Outer + } } } - if err = txn.Commit(); err != nil { - log.Errorf("failed to commit transaction when deleting keys, cause: %v", err) - return err + // add addresses we didn't hold. + var added []*pb.AddrBookRecord_AddrEntry + for i, e := range existed { + if e { + continue + } + addr := addrs[i] + entry := &pb.AddrBookRecord_AddrEntry{ + Addr: &pb.ProtoAddr{Multiaddr: addr}, + Ttl: int64(ttl), + Expiry: newExp, + } + added = append(added, entry) + // note: there's a minor chance that writing the record will fail, in which case we would've broadcast + // the addresses without persisting them. This is very unlikely and not much of an issue. + ab.subsManager.BroadcastAddr(p, addr) } - return nil + pr.Addrs = append(pr.Addrs, added...) + pr.dirty = true + pr.clean() + return pr.flush(ab.ds) } -// dbDeleteIter removes all entries whose keys are prefixed with the argument. -// it returns a slice of the removed keys in case it's needed -func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) error { - q := query.Query{Prefix: prefix.String(), KeysOnly: true} - - txn, err := mgr.ds.NewTransaction(false) +func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) { + pr, err := ab.loadRecord(p, false, false) if err != nil { - return err - } - defer txn.Discard() - - results, err := txn.Query(q) - if err != nil { - log.Errorf("failed to fetch all keys prefixed with: %s, cause: %v", prefix.String(), err) - return err + return fmt.Errorf("failed to load peerstore entry for peer %v while deleting addrs, err: %v", p, err) } - var keys = make([]ds.Key, 0, 4) // cap: 4 to reduce allocs - var key ds.Key - for result := range results.Next() { - key = ds.RawKey(result.Key) - keys = append(keys, key) + if pr.Addrs == nil { + return nil + } - if err = txn.Delete(key); err != nil { - log.Errorf("failed to delete key: %s, cause: %v", key.String(), err) - return err + pr.Lock() + defer pr.Unlock() + + // deletes addresses in place, and avoiding copies until we encounter the first deletion. + survived := 0 + for i, addr := range pr.Addrs { + for _, del := range addrs { + if addr.Addr.Equal(del) { + continue + } + if i != survived { + pr.Addrs[survived] = pr.Addrs[i] + } + survived++ } } + pr.Addrs = pr.Addrs[:survived] - if err = results.Close(); err != nil { - log.Errorf("failed to close cursor, cause: %v", err) - return err - } - - if err = txn.Commit(); err != nil { - log.Errorf("failed to commit transaction when deleting keys, cause: %v", err) - return err - } - - return nil + pr.dirty = true + pr.clean() + return pr.flush(ab.ds) +} + +func cleanAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + clean := make([]ma.Multiaddr, 0, len(addrs)) + for _, addr := range addrs { + if addr == nil { + continue + } + clean = append(clean, addr) + } + return clean } diff --git a/pstoreds/addr_book_gc.go b/pstoreds/addr_book_gc.go new file mode 100644 index 0000000..f40d553 --- /dev/null +++ b/pstoreds/addr_book_gc.go @@ -0,0 +1,388 @@ +package pstoreds + +import ( + "context" + "fmt" + "strconv" + "time" + + ds "github.com/ipfs/go-datastore" + query "github.com/ipfs/go-datastore/query" + + peer "github.com/libp2p/go-libp2p-peer" + pb "github.com/libp2p/go-libp2p-peerstore/pb" + + b32 "github.com/whyrusleeping/base32" +) + +var ( + // GC lookahead entries are stored in key pattern: + // /peers/gc/addrs// => nil + // in databases with lexicographical key order, this time-indexing allows us to visit + // only the timeslice we are interested in. + gcLookaheadBase = ds.NewKey("/peers/gc/addrs") + + // queries + purgeLookaheadQuery = query.Query{ + Prefix: gcLookaheadBase.String(), + Orders: []query.Order{query.OrderByKey{}}, + KeysOnly: true, + } + + purgeStoreQuery = query.Query{ + Prefix: addrBookBase.String(), + Orders: []query.Order{query.OrderByKey{}}, + KeysOnly: false, + } + + populateLookaheadQuery = query.Query{ + Prefix: addrBookBase.String(), + Orders: []query.Order{query.OrderByKey{}}, + KeysOnly: true, + } +) + +// dsAddrBookGc is responsible for garbage collection in a datastore-backed address book. +type dsAddrBookGc struct { + ctx context.Context + ab *dsAddrBook + running chan struct{} + lookaheadEnabled bool + purgeFunc func() + currWindowEnd int64 +} + +func newAddressBookGc(ctx context.Context, ab *dsAddrBook) (*dsAddrBookGc, error) { + if ab.opts.GCPurgeInterval < 0 { + return nil, fmt.Errorf("negative GC purge interval provided: %s", ab.opts.GCPurgeInterval) + } + if ab.opts.GCLookaheadInterval < 0 { + return nil, fmt.Errorf("negative GC lookahead interval provided: %s", ab.opts.GCLookaheadInterval) + } + if ab.opts.GCInitialDelay < 0 { + return nil, fmt.Errorf("negative GC initial delay provided: %s", ab.opts.GCInitialDelay) + } + if ab.opts.GCLookaheadInterval > 0 && ab.opts.GCLookaheadInterval < ab.opts.GCPurgeInterval { + return nil, fmt.Errorf("lookahead interval must be larger than purge interval, respectively: %s, %s", + ab.opts.GCLookaheadInterval, ab.opts.GCPurgeInterval) + } + + lookaheadEnabled := ab.opts.GCLookaheadInterval > 0 + gc := &dsAddrBookGc{ + ctx: ctx, + ab: ab, + running: make(chan struct{}, 1), + lookaheadEnabled: lookaheadEnabled, + } + + if lookaheadEnabled { + gc.purgeFunc = gc.purgeLookahead + } else { + gc.purgeFunc = gc.purgeStore + } + + // do not start GC timers if purge is disabled; this GC can only be triggered manually. + if ab.opts.GCPurgeInterval > 0 { + gc.ab.childrenDone.Add(1) + go gc.background() + } + + return gc, nil +} + +// gc prunes expired addresses from the datastore at regular intervals. It should be spawned as a goroutine. +func (gc *dsAddrBookGc) background() { + defer gc.ab.childrenDone.Done() + + select { + case <-time.After(gc.ab.opts.GCInitialDelay): + case <-gc.ab.ctx.Done(): + // yield if we have been cancelled/closed before the delay elapses. + return + } + + purgeTimer := time.NewTicker(gc.ab.opts.GCPurgeInterval) + defer purgeTimer.Stop() + + var lookaheadCh <-chan time.Time + if gc.lookaheadEnabled { + lookaheadTimer := time.NewTicker(gc.ab.opts.GCLookaheadInterval) + lookaheadCh = lookaheadTimer.C + gc.populateLookahead() // do a lookahead now + defer lookaheadTimer.Stop() + } + + for { + select { + case <-purgeTimer.C: + gc.purgeFunc() + + case <-lookaheadCh: + // will never trigger if lookahead is disabled (nil Duration). + gc.populateLookahead() + + case <-gc.ctx.Done(): + return + } + } +} + +// purgeCycle runs a single GC purge cycle. It operates within the lookahead window if lookahead is enabled; else it +// visits all entries in the datastore, deleting the addresses that have expired. +func (gc *dsAddrBookGc) purgeLookahead() { + select { + case gc.running <- struct{}{}: + defer func() { <-gc.running }() + default: + // yield if lookahead is running. + return + } + + var id peer.ID + record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs. + batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch) + if err != nil { + log.Warningf("failed while creating batch to purge GC entries: %v", err) + } + + // This function drops an unparseable GC entry; this is for safety. It is an escape hatch in case + // we modify the format of keys going forward. If a user runs a new version against an old DB, + // if we don't clean up unparseable entries we'll end up accumulating garbage. + dropInError := func(key ds.Key, err error, msg string) { + if err != nil { + log.Warningf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err) + } + if err = batch.Delete(key); err != nil { + log.Warningf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err) + } + } + + // This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit + // if the next earliest expiry falls within the current window again. + dropOrReschedule := func(key ds.Key, ar *addrsRecord) { + if err := batch.Delete(key); err != nil { + log.Warningf("failed to delete lookahead entry: %v, err: %v", key, err) + } + + // re-add the record if it needs to be visited again in this window. + if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd { + gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name())) + if err := batch.Put(gcKey, []byte{}); err != nil { + log.Warningf("failed to add new GC key: %v, err: %v", gcKey, err) + } + } + } + + results, err := gc.ab.ds.Query(purgeLookaheadQuery) + if err != nil { + log.Warningf("failed while fetching entries to purge: %v", err) + return + } + defer results.Close() + + now := time.Now().Unix() + + // keys: /peers/gc/addrs// + // values: nil + for result := range results.Next() { + gcKey := ds.RawKey(result.Key) + ts, err := strconv.ParseInt(gcKey.Parent().Name(), 10, 64) + if err != nil { + dropInError(gcKey, err, "parsing timestamp") + log.Warningf("failed while parsing timestamp from key: %v, err: %v", result.Key, err) + continue + } else if ts > now { + // this is an ordered cursor; when we hit an entry with a timestamp beyond now, we can break. + break + } + + idb32, err := b32.RawStdEncoding.DecodeString(gcKey.Name()) + if err != nil { + dropInError(gcKey, err, "parsing peer ID") + log.Warningf("failed while parsing b32 peer ID from key: %v, err: %v", result.Key, err) + continue + } + + id, err = peer.IDFromBytes(idb32) + if err != nil { + dropInError(gcKey, err, "decoding peer ID") + log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err) + continue + } + + // if the record is in cache, we clean it and flush it if necessary. + if e, ok := gc.ab.cache.Peek(id); ok { + cached := e.(*addrsRecord) + cached.Lock() + if cached.clean() { + if err = cached.flush(batch); err != nil { + log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err) + } + } + dropOrReschedule(gcKey, cached) + cached.Unlock() + continue + } + + record.Reset() + + // otherwise, fetch it from the store, clean it and flush it. + entryKey := addrBookBase.ChildString(gcKey.Name()) + val, err := gc.ab.ds.Get(entryKey) + if err != nil { + // captures all errors, including ErrNotFound. + dropInError(gcKey, err, "fetching entry") + continue + } + err = record.Unmarshal(val) + if err != nil { + dropInError(gcKey, err, "unmarshalling entry") + continue + } + if record.clean() { + err = record.flush(batch) + if err != nil { + log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id.Pretty(), err) + } + } + dropOrReschedule(gcKey, record) + } + + if err = batch.Commit(); err != nil { + log.Warningf("failed to commit GC purge batch: %v", err) + } +} + +func (gc *dsAddrBookGc) purgeStore() { + select { + case gc.running <- struct{}{}: + defer func() { <-gc.running }() + default: + // yield if lookahead is running. + return + } + + record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs. + batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch) + if err != nil { + log.Warningf("failed while creating batch to purge GC entries: %v", err) + } + + results, err := gc.ab.ds.Query(purgeStoreQuery) + if err != nil { + log.Warningf("failed while opening iterator: %v", err) + return + } + defer results.Close() + + // keys: /peers/addrs/ + for result := range results.Next() { + record.Reset() + if err = record.Unmarshal(result.Value); err != nil { + // TODO log + continue + } + + id := record.Id.ID + if !record.clean() { + continue + } + + if err := record.flush(batch); err != nil { + log.Warningf("failed to flush entry modified by GC for peer: &v, err: %v", id, err) + } + gc.ab.cache.Remove(id) + } + + if err = batch.Commit(); err != nil { + log.Warningf("failed to commit GC purge batch: %v", err) + } +} + +// populateLookahead populates the lookahead window by scanning the entire store and picking entries whose earliest +// expiration falls within the window period. +// +// Those entries are stored in the lookahead region in the store, indexed by the timestamp when they need to be +// visited, to facilitate temporal range scans. +func (gc *dsAddrBookGc) populateLookahead() { + if gc.ab.opts.GCLookaheadInterval == 0 { + return + } + + select { + case gc.running <- struct{}{}: + defer func() { <-gc.running }() + default: + // yield if something's running. + return + } + + until := time.Now().Add(gc.ab.opts.GCLookaheadInterval).Unix() + + var id peer.ID + record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} + results, err := gc.ab.ds.Query(populateLookaheadQuery) + if err != nil { + log.Warningf("failed while querying to populate lookahead GC window: %v", err) + return + } + defer results.Close() + + batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch) + if err != nil { + log.Warningf("failed while creating batch to populate lookahead GC window: %v", err) + return + } + + for result := range results.Next() { + idb32 := ds.RawKey(result.Key).Name() + k, err := b32.RawStdEncoding.DecodeString(idb32) + if err != nil { + log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err) + continue + } + if id, err = peer.IDFromBytes(k); err != nil { + log.Warningf("failed while decoding peer ID from key: %v, err: %v", result.Key, err) + } + + // if the record is in cache, use the cached version. + if e, ok := gc.ab.cache.Peek(id); ok { + cached := e.(*addrsRecord) + cached.RLock() + if len(cached.Addrs) == 0 || cached.Addrs[0].Expiry > until { + cached.RUnlock() + continue + } + gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32)) + if err = batch.Put(gcKey, []byte{}); err != nil { + log.Warningf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err) + } + cached.RUnlock() + continue + } + + record.Reset() + + val, err := gc.ab.ds.Get(ds.RawKey(result.Key)) + if err != nil { + log.Warningf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err) + continue + } + if err := record.Unmarshal(val); err != nil { + log.Warningf("failed while unmarshalling record from store for peer: %v, err: %v", id.Pretty(), err) + continue + } + if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until { + gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32)) + if err = batch.Put(gcKey, []byte{}); err != nil { + log.Warningf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err) + } + } + } + + if err = batch.Commit(); err != nil { + log.Warningf("failed to commit GC lookahead batch: %v", err) + } + + gc.currWindowEnd = until +} diff --git a/pstoreds/addr_book_gc_test.go b/pstoreds/addr_book_gc_test.go new file mode 100644 index 0000000..2c328ee --- /dev/null +++ b/pstoreds/addr_book_gc_test.go @@ -0,0 +1,254 @@ +package pstoreds + +import ( + "testing" + "time" + + query "github.com/ipfs/go-datastore/query" + pstore "github.com/libp2p/go-libp2p-peerstore" + test "github.com/libp2p/go-libp2p-peerstore/test" + ma "github.com/multiformats/go-multiaddr" +) + +var lookaheadQuery = query.Query{Prefix: gcLookaheadBase.String(), KeysOnly: true} + +type testProbe struct { + t *testing.T + ab pstore.AddrBook +} + +func (tp *testProbe) countLookaheadEntries() (i int) { + results, err := tp.ab.(*dsAddrBook).ds.Query(lookaheadQuery) + if err != nil { + tp.t.Fatal(err) + } + + defer results.Close() + for range results.Next() { + i++ + } + return i +} +func (tp *testProbe) clearCache() { + for _, k := range tp.ab.(*dsAddrBook).cache.Keys() { + tp.ab.(*dsAddrBook).cache.Remove(k) + } +} + +func TestGCLookahead(t *testing.T) { + opts := DefaultOpts() + + // effectively disable automatic GC for this test. + opts.GCInitialDelay = 90 * time.Hour + opts.GCLookaheadInterval = 10 * time.Second + opts.GCPurgeInterval = 1 * time.Second + + factory := addressBookFactory(t, badgerStore, opts) + ab, closeFn := factory() + gc := ab.(*dsAddrBook).gc + defer closeFn() + + tp := &testProbe{t, ab} + + ids := test.GeneratePeerIDs(10) + addrs := test.GenerateAddrs(100) + + // lookahead is 10 seconds, so these entries will be outside the lookahead window. + ab.AddAddrs(ids[0], addrs[:10], time.Hour) + ab.AddAddrs(ids[1], addrs[10:20], time.Hour) + ab.AddAddrs(ids[2], addrs[20:30], time.Hour) + + gc.populateLookahead() + if i := tp.countLookaheadEntries(); i != 0 { + t.Errorf("expected no GC lookahead entries, got: %v", i) + } + + // change addresses of a peer to have TTL 1 second, placing them in the lookahead window. + ab.UpdateAddrs(ids[1], time.Hour, time.Second) + + // Purge the cache, to exercise a different path in the lookahead cycle. + tp.clearCache() + + gc.populateLookahead() + if i := tp.countLookaheadEntries(); i != 1 { + t.Errorf("expected 1 GC lookahead entry, got: %v", i) + } + + // change addresses of another to have TTL 5 second, placing them in the lookahead window. + ab.UpdateAddrs(ids[2], time.Hour, 5*time.Second) + gc.populateLookahead() + if i := tp.countLookaheadEntries(); i != 2 { + t.Errorf("expected 2 GC lookahead entries, got: %v", i) + } +} + +func TestGCPurging(t *testing.T) { + opts := DefaultOpts() + + // effectively disable automatic GC for this test. + opts.GCInitialDelay = 90 * time.Hour + opts.GCLookaheadInterval = 20 * time.Second + opts.GCPurgeInterval = 1 * time.Second + + factory := addressBookFactory(t, badgerStore, opts) + ab, closeFn := factory() + gc := ab.(*dsAddrBook).gc + defer closeFn() + + tp := &testProbe{t, ab} + + ids := test.GeneratePeerIDs(10) + addrs := test.GenerateAddrs(100) + + // stagger addresses within the lookahead window, but stagger them. + ab.AddAddrs(ids[0], addrs[:10], 1*time.Second) + ab.AddAddrs(ids[1], addrs[30:40], 1*time.Second) + ab.AddAddrs(ids[2], addrs[60:70], 1*time.Second) + + ab.AddAddrs(ids[0], addrs[10:20], 4*time.Second) + ab.AddAddrs(ids[1], addrs[40:50], 4*time.Second) + + ab.AddAddrs(ids[0], addrs[20:30], 10*time.Second) + ab.AddAddrs(ids[1], addrs[50:60], 10*time.Second) + + // this is inside the window, but it will survive the purges we do in the test. + ab.AddAddrs(ids[3], addrs[70:80], 15*time.Second) + + gc.populateLookahead() + if i := tp.countLookaheadEntries(); i != 4 { + t.Errorf("expected 4 GC lookahead entries, got: %v", i) + } + + <-time.After(2 * time.Second) + gc.purgeLookahead() + if i := tp.countLookaheadEntries(); i != 3 { + t.Errorf("expected 3 GC lookahead entries, got: %v", i) + } + + // Purge the cache, to exercise a different path in the purge cycle. + tp.clearCache() + + <-time.After(5 * time.Second) + gc.purgeLookahead() + if i := tp.countLookaheadEntries(); i != 3 { + t.Errorf("expected 3 GC lookahead entries, got: %v", i) + } + + <-time.After(5 * time.Second) + gc.purgeLookahead() + if i := tp.countLookaheadEntries(); i != 1 { + t.Errorf("expected 1 GC lookahead entries, got: %v", i) + } + if i := len(ab.PeersWithAddrs()); i != 1 { + t.Errorf("expected 1 entries in database, got: %v", i) + } + if p := ab.PeersWithAddrs()[0]; p != ids[3] { + t.Errorf("expected remaining peer to be #3, got: %v, expected: %v", p, ids[3]) + } +} + +func TestGCDelay(t *testing.T) { + ids := test.GeneratePeerIDs(10) + addrs := test.GenerateAddrs(100) + + opts := DefaultOpts() + + opts.GCInitialDelay = 2 * time.Second + opts.GCLookaheadInterval = 1 * time.Minute + opts.GCPurgeInterval = 30 * time.Second + + factory := addressBookFactory(t, badgerStore, opts) + ab, closeFn := factory() + defer closeFn() + + tp := &testProbe{t, ab} + + ab.AddAddrs(ids[0], addrs, 1*time.Second) + + // immediately after we should be having no lookahead entries. + if i := tp.countLookaheadEntries(); i != 0 { + t.Errorf("expected no lookahead entries, got: %d", i) + } + + // after the initial delay has passed. + <-time.After(3 * time.Second) + if i := tp.countLookaheadEntries(); i != 1 { + t.Errorf("expected 1 lookahead entry, got: %d", i) + } +} + +func TestGCLookaheadDisabled(t *testing.T) { + ids := test.GeneratePeerIDs(10) + addrs := test.GenerateAddrs(100) + + opts := DefaultOpts() + + // effectively disable automatic GC for this test. + opts.GCInitialDelay = 90 * time.Hour + opts.GCLookaheadInterval = 0 // disable lookahead + opts.GCPurgeInterval = 9 * time.Hour + + factory := addressBookFactory(t, badgerStore, opts) + ab, closeFn := factory() + defer closeFn() + + tp := &testProbe{t, ab} + + // four peers: + // ids[0] has 10 addresses, all of which expire in 500ms. + // ids[1] has 20 addresses; 50% expire in 500ms and 50% in 10 hours. + // ids[2] has 10 addresses; all expire in 10 hours. + // ids[3] has 60 addresses; all expire in 10 hours. + ab.AddAddrs(ids[0], addrs[:10], 500*time.Millisecond) + ab.AddAddrs(ids[1], addrs[10:20], 500*time.Millisecond) + ab.AddAddrs(ids[1], addrs[20:30], 10*time.Hour) + ab.AddAddrs(ids[2], addrs[30:40], 10*time.Hour) + ab.AddAddrs(ids[3], addrs[40:], 10*time.Hour) + + time.Sleep(100 * time.Millisecond) + + if i := tp.countLookaheadEntries(); i != 0 { + t.Errorf("expected no GC lookahead entries, got: %v", i) + } + + time.Sleep(500 * time.Millisecond) + gc := ab.(*dsAddrBook).gc + gc.purgeFunc() + + var empty []ma.Multiaddr + test.AssertAddressesEqual(t, empty, ab.Addrs(ids[0])) + test.AssertAddressesEqual(t, addrs[20:30], ab.Addrs(ids[1])) + test.AssertAddressesEqual(t, addrs[30:40], ab.Addrs(ids[2])) + test.AssertAddressesEqual(t, addrs[40:], ab.Addrs(ids[3])) +} + +func BenchmarkLookaheadCycle(b *testing.B) { + ids := test.GeneratePeerIDs(100) + addrs := test.GenerateAddrs(100) + + opts := DefaultOpts() + + opts.GCInitialDelay = 2 * time.Hour + opts.GCLookaheadInterval = 2 * time.Hour + opts.GCPurgeInterval = 6 * time.Hour + + factory := addressBookFactory(b, badgerStore, opts) + ab, closeFn := factory() + defer closeFn() + + inside, outside := 1*time.Minute, 48*time.Hour + for i, id := range ids { + var ttl time.Duration + if i%2 == 0 { + ttl = inside + } else { + ttl = outside + } + ab.AddAddrs(id, addrs, ttl) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ab.(*dsAddrBook).gc.populateLookahead() + } +} diff --git a/pstoreds/cache.go b/pstoreds/cache.go index 2e20ae6..d3d43aa 100644 --- a/pstoreds/cache.go +++ b/pstoreds/cache.go @@ -8,12 +8,15 @@ type cache interface { Remove(key interface{}) Contains(key interface{}) bool Peek(key interface{}) (value interface{}, ok bool) + Keys() []interface{} } // noopCache is a dummy implementation that's used when the cache is disabled. type noopCache struct { } +var _ cache = (*noopCache)(nil) + func (*noopCache) Get(key interface{}) (value interface{}, ok bool) { return nil, false } @@ -31,3 +34,7 @@ func (*noopCache) Contains(key interface{}) bool { func (*noopCache) Peek(key interface{}) (value interface{}, ok bool) { return nil, false } + +func (*noopCache) Keys() (keys []interface{}) { + return keys +} diff --git a/pstoreds/cyclic_batch.go b/pstoreds/cyclic_batch.go new file mode 100644 index 0000000..1cf8579 --- /dev/null +++ b/pstoreds/cyclic_batch.go @@ -0,0 +1,76 @@ +package pstoreds + +import ( + "github.com/pkg/errors" + + ds "github.com/ipfs/go-datastore" +) + +// how many operations are queued in a cyclic batch before we flush it. +var defaultOpsPerCyclicBatch = 20 + +// cyclicBatch buffers ds write operations and automatically flushes them after defaultOpsPerCyclicBatch (20) have been +// queued. An explicit `Commit()` closes this cyclic batch, erroring all further operations. +// +// It is similar to go-ds autobatch, but it's driven by an actual Batch facility offered by the +// ds. +type cyclicBatch struct { + threshold int + ds.Batch + ds ds.Batching + pending int +} + +func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) { + batch, err := ds.Batch() + if err != nil { + return nil, err + } + return &cyclicBatch{Batch: batch, ds: ds}, nil +} + +func (cb *cyclicBatch) cycle() (err error) { + if cb.Batch == nil { + return errors.New("cyclic batch is closed") + } + if cb.pending < cb.threshold { + // we haven't reached the threshold yet. + return nil + } + // commit and renew the batch. + if err = cb.Batch.Commit(); err != nil { + return errors.Wrap(err, "failed while committing cyclic batch") + } + if cb.Batch, err = cb.ds.Batch(); err != nil { + return errors.Wrap(err, "failed while renewing cyclic batch") + } + return nil +} + +func (cb *cyclicBatch) Put(key ds.Key, val []byte) error { + if err := cb.cycle(); err != nil { + return err + } + cb.pending++ + return cb.Batch.Put(key, val) +} + +func (cb *cyclicBatch) Delete(key ds.Key) error { + if err := cb.cycle(); err != nil { + return err + } + cb.pending++ + return cb.Batch.Delete(key) +} + +func (cb *cyclicBatch) Commit() error { + if cb.Batch == nil { + return errors.New("cyclic batch is closed") + } + if err := cb.Batch.Commit(); err != nil { + return err + } + cb.pending = 0 + cb.Batch = nil + return nil +} diff --git a/pstoreds/ds_test.go b/pstoreds/ds_test.go index c98d4a6..3ae7ef7 100644 --- a/pstoreds/ds_test.go +++ b/pstoreds/ds_test.go @@ -15,11 +15,10 @@ import ( pt "github.com/libp2p/go-libp2p-peerstore/test" ) -type datastoreFactory func(tb testing.TB) (ds.TxnDatastore, func()) +type datastoreFactory func(tb testing.TB) (ds.Batching, func()) var dstores = map[string]datastoreFactory{ "Badger": badgerStore, - // TODO: Enable once go-ds-leveldb supports TTL via a shim. // "Leveldb": leveldbStore, } @@ -37,7 +36,7 @@ func TestDsAddrBook(t *testing.T) { t.Parallel() opts := DefaultOpts() - opts.TTLInterval = 100 * time.Microsecond + opts.GCPurgeInterval = 1 * time.Second opts.CacheSize = 1024 pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts)) @@ -47,7 +46,7 @@ func TestDsAddrBook(t *testing.T) { t.Parallel() opts := DefaultOpts() - opts.TTLInterval = 100 * time.Microsecond + opts.GCPurgeInterval = 1 * time.Second opts.CacheSize = 0 pt.TestAddrBook(t, addressBookFactory(t, dsFactory, opts)) @@ -88,7 +87,7 @@ func BenchmarkDsPeerstore(b *testing.B) { } } -func badgerStore(tb testing.TB) (ds.TxnDatastore, func()) { +func badgerStore(tb testing.TB) (ds.Batching, func()) { dataPath, err := ioutil.TempDir(os.TempDir(), "badger") if err != nil { tb.Fatal(err) @@ -122,39 +121,41 @@ func leveldbStore(tb testing.TB) (ds.TxnDatastore, func()) { func peerstoreFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.PeerstoreFactory { return func() (pstore.Peerstore, func()) { - store, closeFunc := storeFactory(tb) - + store, storeCloseFn := storeFactory(tb) ps, err := NewPeerstore(context.Background(), store, opts) if err != nil { tb.Fatal(err) } - - return ps, closeFunc + closer := func() { + ps.Close() + storeCloseFn() + } + return ps, closer } } func addressBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.AddrBookFactory { return func() (pstore.AddrBook, func()) { store, closeFunc := storeFactory(tb) - ab, err := NewAddrBook(context.Background(), store, opts) if err != nil { tb.Fatal(err) } - - return ab, closeFunc + closer := func() { + ab.Close() + closeFunc() + } + return ab, closer } } func keyBookFactory(tb testing.TB, storeFactory datastoreFactory, opts Options) pt.KeyBookFactory { return func() (pstore.KeyBook, func()) { - store, closeFunc := storeFactory(tb) - + store, storeCloseFn := storeFactory(tb) kb, err := NewKeyBook(context.Background(), store, opts) if err != nil { tb.Fatal(err) } - - return kb, closeFunc + return kb, storeCloseFn } } diff --git a/pstoreds/keybook.go b/pstoreds/keybook.go index dc54b19..8dcc826 100644 --- a/pstoreds/keybook.go +++ b/pstoreds/keybook.go @@ -23,12 +23,12 @@ var ( ) type dsKeyBook struct { - ds ds.TxnDatastore + ds ds.Datastore } var _ pstore.KeyBook = (*dsKeyBook)(nil) -func NewKeyBook(_ context.Context, store ds.TxnDatastore, _ Options) (pstore.KeyBook, error) { +func NewKeyBook(_ context.Context, store ds.Datastore, _ Options) (pstore.KeyBook, error) { return &dsKeyBook{store}, nil } diff --git a/pstoreds/peerstore.go b/pstoreds/peerstore.go index b1a64f0..fc0263d 100644 --- a/pstoreds/peerstore.go +++ b/pstoreds/peerstore.go @@ -18,28 +18,36 @@ type Options struct { // The size of the in-memory cache. A value of 0 or lower disables the cache. CacheSize uint - // Sweep interval to expire entries, only used when TTL is *not* natively managed - // by the underlying datastore. - TTLInterval time.Duration + // Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run + // automatically, but it'll be available on demand via explicit calls. + GCPurgeInterval time.Duration - // Number of times to retry transactional writes. - WriteRetries uint + // Interval to renew the GC lookahead window. If this is a zero value, lookahead will be disabled and we'll + // traverse the entire datastore for every purge cycle. + GCLookaheadInterval time.Duration + + // Initial delay before GC processes start. Intended to give the system breathing room to fully boot + // before starting GC. + GCInitialDelay time.Duration } -// DefaultOpts returns the default options for a persistent peerstore: -// * Cache size: 1024 -// * TTL sweep interval: 1 second -// * WriteRetries: 5 +// DefaultOpts returns the default options for a persistent peerstore, with the full-purge GC algorithm: +// +// * Cache size: 1024. +// * GC purge interval: 2 hours. +// * GC lookahead interval: disabled. +// * GC initial delay: 60 seconds. func DefaultOpts() Options { return Options{ - CacheSize: 1024, - TTLInterval: time.Second, - WriteRetries: 5, + CacheSize: 1024, + GCPurgeInterval: 2 * time.Hour, + GCLookaheadInterval: 0, + GCInitialDelay: 60 * time.Second, } } // NewPeerstore creates a peerstore backed by the provided persistent datastore. -func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pstore.Peerstore, error) { +func NewPeerstore(ctx context.Context, store ds.Batching, opts Options) (pstore.Peerstore, error) { addrBook, err := NewAddrBook(ctx, store, opts) if err != nil { return nil, err @@ -60,20 +68,14 @@ func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pst } // uniquePeerIds extracts and returns unique peer IDs from database keys. -func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result query.Result) string) (peer.IDSlice, error) { +func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.Result) string) (peer.IDSlice, error) { var ( q = query.Query{Prefix: prefix.String(), KeysOnly: true} results query.Results err error ) - txn, err := ds.NewTransaction(true) - if err != nil { - return nil, err - } - defer txn.Discard() - - if results, err = txn.Query(q); err != nil { + if results, err = ds.Query(q); err != nil { log.Error(err) return nil, err } @@ -90,12 +92,11 @@ func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result quer return peer.IDSlice{}, nil } - ids := make(peer.IDSlice, len(idset)) - i := 0 + ids := make(peer.IDSlice, 0, len(idset)) for id := range idset { pid, _ := base32.RawStdEncoding.DecodeString(id) - ids[i], _ = peer.IDFromBytes(pid) - i++ + id, _ := peer.IDFromBytes(pid) + ids = append(ids, id) } return ids, nil } diff --git a/test/addr_book_suite.go b/test/addr_book_suite.go index cbea8b8..c01fe98 100644 --- a/test/addr_book_suite.go +++ b/test/addr_book_suite.go @@ -1,14 +1,10 @@ package test import ( - "fmt" "testing" "time" - peer "github.com/libp2p/go-libp2p-peer" - pt "github.com/libp2p/go-libp2p-peer/test" pstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" ) var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){ @@ -39,64 +35,48 @@ func TestAddrBook(t *testing.T, factory AddrBookFactory) { } } -func generateAddrs(count int) []ma.Multiaddr { - var addrs = make([]ma.Multiaddr, count) - for i := 0; i < count; i++ { - addrs[i] = multiaddr(fmt.Sprintf("/ip4/1.1.1.%d/tcp/1111", i)) - } - return addrs -} - -func generatePeerIds(count int) []peer.ID { - var ids = make([]peer.ID, count) - for i := 0; i < count; i++ { - ids[i], _ = pt.RandPeerID() - } - return ids -} - func testAddAddress(ab pstore.AddrBook) func(*testing.T) { return func(t *testing.T) { t.Run("add a single address", func(t *testing.T) { - id := generatePeerIds(1)[0] - addrs := generateAddrs(1) + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(1) ab.AddAddr(id, addrs[0], time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("idempotent add single address", func(t *testing.T) { - id := generatePeerIds(1)[0] - addrs := generateAddrs(1) + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(1) ab.AddAddr(id, addrs[0], time.Hour) ab.AddAddr(id, addrs[0], time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("add multiple addresses", func(t *testing.T) { - id := generatePeerIds(1)[0] - addrs := generateAddrs(3) + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(3) ab.AddAddrs(id, addrs, time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("idempotent add multiple addresses", func(t *testing.T) { - id := generatePeerIds(1)[0] - addrs := generateAddrs(3) + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(3) ab.AddAddrs(id, addrs, time.Hour) ab.AddAddrs(id, addrs, time.Hour) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) t.Run("adding an existing address with a later expiration extends its ttl", func(t *testing.T) { - id := generatePeerIds(1)[0] - addrs := generateAddrs(3) + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(3) ab.AddAddrs(id, addrs, time.Second) @@ -105,12 +85,12 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { // after the initial TTL has expired, check that only the third address is present. time.Sleep(1200 * time.Millisecond) - testHas(t, addrs[2:], ab.Addrs(id)) + AssertAddressesEqual(t, addrs[2:], ab.Addrs(id)) }) t.Run("adding an existing address with an earlier expiration is noop", func(t *testing.T) { - id := generatePeerIds(1)[0] - addrs := generateAddrs(3) + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(3) ab.AddAddrs(id, addrs, time.Hour) @@ -120,57 +100,64 @@ func testAddAddress(ab pstore.AddrBook) func(*testing.T) { // after the initial TTL has expired, check that all three addresses are still present (i.e. the TTL on // the modified one was not shortened). time.Sleep(2100 * time.Millisecond) - testHas(t, addrs, ab.Addrs(id)) + AssertAddressesEqual(t, addrs, ab.Addrs(id)) }) } } func testClearWorks(ab pstore.AddrBook) func(t *testing.T) { return func(t *testing.T) { - ids := generatePeerIds(2) - addrs := generateAddrs(5) + ids := GeneratePeerIDs(2) + addrs := GenerateAddrs(5) ab.AddAddrs(ids[0], addrs[0:3], time.Hour) ab.AddAddrs(ids[1], addrs[3:], time.Hour) - testHas(t, addrs[0:3], ab.Addrs(ids[0])) - testHas(t, addrs[3:], ab.Addrs(ids[1])) + AssertAddressesEqual(t, addrs[0:3], ab.Addrs(ids[0])) + AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1])) ab.ClearAddrs(ids[0]) - testHas(t, nil, ab.Addrs(ids[0])) - testHas(t, addrs[3:], ab.Addrs(ids[1])) + AssertAddressesEqual(t, nil, ab.Addrs(ids[0])) + AssertAddressesEqual(t, addrs[3:], ab.Addrs(ids[1])) ab.ClearAddrs(ids[1]) - testHas(t, nil, ab.Addrs(ids[0])) - testHas(t, nil, ab.Addrs(ids[1])) + AssertAddressesEqual(t, nil, ab.Addrs(ids[0])) + AssertAddressesEqual(t, nil, ab.Addrs(ids[1])) } } func testSetNegativeTTLClears(m pstore.AddrBook) func(t *testing.T) { return func(t *testing.T) { - id := generatePeerIds(1)[0] - addr := generateAddrs(1)[0] + id := GeneratePeerIDs(1)[0] + addrs := GenerateAddrs(100) - m.SetAddr(id, addr, time.Hour) - testHas(t, []ma.Multiaddr{addr}, m.Addrs(id)) + m.SetAddrs(id, addrs, time.Hour) + AssertAddressesEqual(t, addrs, m.Addrs(id)) - m.SetAddr(id, addr, -1) - testHas(t, nil, m.Addrs(id)) + // remove two addresses. + m.SetAddr(id, addrs[50], -1) + m.SetAddr(id, addrs[75], -1) + + // calculate the survivors + survivors := append(addrs[0:50], addrs[51:]...) + survivors = append(survivors[0:74], survivors[75:]...) + + AssertAddressesEqual(t, survivors, m.Addrs(id)) } } func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { return func(t *testing.T) { t.Run("update ttl of peer with no addrs", func(t *testing.T) { - id := generatePeerIds(1)[0] + id := GeneratePeerIDs(1)[0] // Shouldn't panic. m.UpdateAddrs(id, time.Hour, time.Minute) }) t.Run("update ttls successfully", func(t *testing.T) { - ids := generatePeerIds(2) - addrs1, addrs2 := generateAddrs(2), generateAddrs(2) + ids := GeneratePeerIDs(2) + addrs1, addrs2 := GenerateAddrs(2), GenerateAddrs(2) // set two keys with different ttls for each peer. m.SetAddr(ids[0], addrs1[0], time.Hour) @@ -179,8 +166,8 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { m.SetAddr(ids[1], addrs2[1], time.Minute) // Sanity check. - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs1[0]. // Badger does not support subsecond TTLs. @@ -188,26 +175,26 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { m.UpdateAddrs(ids[0], time.Hour, 1*time.Second) // No immediate effect. - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) // After a wait, addrs[0] is gone. time.Sleep(1500 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) // Will only affect addrs2[0]. m.UpdateAddrs(ids[1], time.Hour, 1*time.Second) // No immediate effect. - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) time.Sleep(1500 * time.Millisecond) // First addrs is gone in both. - testHas(t, addrs1[1:], m.Addrs(ids[0])) - testHas(t, addrs2[1:], m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1])) }) } @@ -215,7 +202,7 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) { func testNilAddrsDontBreak(m pstore.AddrBook) func(t *testing.T) { return func(t *testing.T) { - id := generatePeerIds(1)[0] + id := GeneratePeerIDs(1)[0] m.SetAddr(id, nil, time.Hour) m.AddAddr(id, nil, time.Hour) @@ -224,53 +211,53 @@ func testNilAddrsDontBreak(m pstore.AddrBook) func(t *testing.T) { func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) { return func(t *testing.T) { - ids := generatePeerIds(2) - addrs1 := generateAddrs(3) - addrs2 := generateAddrs(2) + ids := GeneratePeerIDs(2) + addrs1 := GenerateAddrs(3) + addrs2 := GenerateAddrs(2) m.AddAddrs(ids[0], addrs1, time.Hour) m.AddAddrs(ids[1], addrs2, time.Hour) - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.AddAddrs(ids[0], addrs1, 2*time.Hour) m.AddAddrs(ids[1], addrs2, 2*time.Hour) - testHas(t, addrs1, m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1, m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:3], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:3], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2, m.Addrs(ids[1])) m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, addrs2[1:], m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, addrs2[1:], m.Addrs(ids[1])) m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, addrs1[1:2], m.Addrs(ids[0])) - testHas(t, nil, m.Addrs(ids[1])) + AssertAddressesEqual(t, addrs1[1:2], m.Addrs(ids[0])) + AssertAddressesEqual(t, nil, m.Addrs(ids[1])) m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond) <-time.After(100 * time.Millisecond) - testHas(t, nil, m.Addrs(ids[0])) - testHas(t, nil, m.Addrs(ids[1])) + AssertAddressesEqual(t, nil, m.Addrs(ids[0])) + AssertAddressesEqual(t, nil, m.Addrs(ids[1])) } } func testClearWithIterator(m pstore.AddrBook) func(t *testing.T) { return func(t *testing.T) { - ids := generatePeerIds(2) - addrs := generateAddrs(100) + ids := GeneratePeerIDs(2) + addrs := GenerateAddrs(100) // Add the peers with 50 addresses each. m.AddAddrs(ids[0], addrs[:50], pstore.PermanentAddrTTL) @@ -307,8 +294,8 @@ func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) { }) t.Run("non-empty addrbook", func(t *testing.T) { - ids := generatePeerIds(2) - addrs := generateAddrs(10) + ids := GeneratePeerIDs(2) + addrs := GenerateAddrs(10) m.AddAddrs(ids[0], addrs[:5], pstore.PermanentAddrTTL) m.AddAddrs(ids[1], addrs[5:], pstore.PermanentAddrTTL) @@ -319,25 +306,3 @@ func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) { }) } } - -func testHas(t *testing.T, exp, act []ma.Multiaddr) { - t.Helper() - if len(exp) != len(act) { - t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act)) - } - - for _, a := range exp { - found := false - - for _, b := range act { - if a.Equal(b) { - found = true - break - } - } - - if !found { - t.Fatalf("expected address %s not found", a) - } - } -} diff --git a/test/benchmarks_suite.go b/test/benchmarks_suite.go index f283b9b..cf04eb9 100644 --- a/test/benchmarks_suite.go +++ b/test/benchmarks_suite.go @@ -36,7 +36,7 @@ func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory, variant string) // Start all test peer producing goroutines, where each produces peers with as many // multiaddrs as the n field in the param struct. for _, p := range params { - go addressProducer(ctx, b, p.ch, p.n) + go AddressProducer(ctx, b, p.ch, p.n) } // So tests are always run in the same order. diff --git a/test/utils.go b/test/utils.go index 6649820..d7af3ab 100644 --- a/test/utils.go +++ b/test/utils.go @@ -10,7 +10,7 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -func multiaddr(m string) ma.Multiaddr { +func Multiaddr(m string) ma.Multiaddr { maddr, err := ma.NewMultiaddr(m) if err != nil { panic(err) @@ -23,7 +23,7 @@ type peerpair struct { Addr []ma.Multiaddr } -func randomPeer(b *testing.B, addrCount int) *peerpair { +func RandomPeer(b *testing.B, addrCount int) *peerpair { var ( pid peer.ID err error @@ -44,11 +44,11 @@ func randomPeer(b *testing.B, addrCount int) *peerpair { return &peerpair{pid, addrs} } -func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, addrsPerPeer int) { +func AddressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, addrsPerPeer int) { b.Helper() defer close(addrs) for { - p := randomPeer(b, addrsPerPeer) + p := RandomPeer(b, addrsPerPeer) select { case addrs <- p: case <-ctx.Done(): @@ -56,3 +56,41 @@ func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, ad } } } + +func GenerateAddrs(count int) []ma.Multiaddr { + var addrs = make([]ma.Multiaddr, count) + for i := 0; i < count; i++ { + addrs[i] = Multiaddr(fmt.Sprintf("/ip4/1.1.1.%d/tcp/1111", i)) + } + return addrs +} + +func GeneratePeerIDs(count int) []peer.ID { + var ids = make([]peer.ID, count) + for i := 0; i < count; i++ { + ids[i], _ = pt.RandPeerID() + } + return ids +} + +func AssertAddressesEqual(t *testing.T, exp, act []ma.Multiaddr) { + t.Helper() + if len(exp) != len(act) { + t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act)) + } + + for _, a := range exp { + found := false + + for _, b := range act { + if a.Equal(b) { + found = true + break + } + } + + if !found { + t.Fatalf("expected address %s not found", a) + } + } +}