From 4c43736fe94841a841d43be57881c1fcf817acf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 15 Nov 2018 00:27:28 +0000 Subject: [PATCH] pstoreds: tighten up gc-based peerstore. Introduces custom types in protobuf to serde directly into multiaddrs and peer IDs. Simplify purge by ordering addrs by expiry. In general, getting this readier for merge. --- pb/custom.go | 89 ++++++++ pb/pstore.pb.go | 457 +++++++++++++----------------------------- pb/pstore.proto | 19 +- pb/pstorepb_test.go | 2 - pstoreds/addr_book.go | 285 +++++++++++++++----------- 5 files changed, 397 insertions(+), 455 deletions(-) create mode 100644 pb/custom.go diff --git a/pb/custom.go b/pb/custom.go new file mode 100644 index 0000000..e51b2ca --- /dev/null +++ b/pb/custom.go @@ -0,0 +1,89 @@ +package pstore_pb + +import ( + "encoding/json" + + peer "github.com/libp2p/go-libp2p-peer" + pt "github.com/libp2p/go-libp2p-peer/test" + ma "github.com/multiformats/go-multiaddr" +) + +type ProtoPeerID struct { + peer.ID +} + +func NewPopulatedProtoPeerID(r randyPstore) *ProtoPeerID { + id, _ := pt.RandPeerID() + return &ProtoPeerID{ID: id} +} + +func (id ProtoPeerID) Marshal() ([]byte, error) { + return []byte(id.ID), nil +} + +func (id ProtoPeerID) MarshalTo(data []byte) (n int, err error) { + return copy(data, []byte(id.ID)), nil +} + +func (id ProtoPeerID) MarshalJSON() ([]byte, error) { + m, _ := id.Marshal() + return json.Marshal(m) +} + +func (id *ProtoPeerID) Unmarshal(data []byte) (err error) { + id.ID = peer.ID(string(data)) + return nil +} + +func (id *ProtoPeerID) UnmarshalJSON(data []byte) error { + v := new([]byte) + err := json.Unmarshal(data, v) + if err != nil { + return err + } + return id.Unmarshal(*v) +} + +func (id ProtoPeerID) Size() int { + return len([]byte(id.ID)) +} + +type ProtoAddr struct { + ma.Multiaddr +} + +func NewPopulatedProtoAddr(r randyPstore) *ProtoAddr { + a, _ := ma.NewMultiaddr("/ip4/123.123.123.123/tcp/7001") + return &ProtoAddr{Multiaddr: a} +} + +func (a ProtoAddr) Marshal() ([]byte, error) { + return a.Bytes(), nil +} + +func (a ProtoAddr) MarshalTo(data []byte) (n int, err error) { + return copy(data, a.Bytes()), nil +} + +func (a ProtoAddr) MarshalJSON() ([]byte, error) { + m, _ := a.Marshal() + return json.Marshal(m) +} + +func (a *ProtoAddr) Unmarshal(data []byte) (err error) { + a.Multiaddr, err = ma.NewMultiaddrBytes(data) + return err +} + +func (a *ProtoAddr) UnmarshalJSON(data []byte) error { + v := new([]byte) + err := json.Unmarshal(data, v) + if err != nil { + return err + } + return a.Unmarshal(*v) +} + +func (a ProtoAddr) Size() int { + return len(a.Bytes()) +} diff --git a/pb/pstore.pb.go b/pb/pstore.pb.go index cf4b04b..ba5a51b 100644 --- a/pb/pstore.pb.go +++ b/pb/pstore.pb.go @@ -7,12 +7,6 @@ import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" import _ "github.com/gogo/protobuf/gogoproto" -import _ "github.com/golang/protobuf/ptypes/duration" -import _ "github.com/golang/protobuf/ptypes/timestamp" - -import time "time" - -import github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" import io "io" @@ -20,7 +14,6 @@ import io "io" var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf -var _ = time.Kitchen // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. @@ -31,21 +24,19 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package // AddrBookRecord represents a record for a peer in the address book. type AddrBookRecord struct { // The peer ID. - Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - // The timestamp where purging needs to happen. - NextVisit *time.Time `protobuf:"bytes,2,opt,name=nextVisit,stdtime" json:"nextVisit,omitempty"` - // The multiaddresses. - Addrs map[string]*AddrBookRecord_AddrEntry `protobuf:"bytes,3,rep,name=addrs" json:"addrs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Id *ProtoPeerID `protobuf:"bytes,1,opt,name=id,proto3,customtype=ProtoPeerID" json:"id,omitempty"` + // The multiaddresses. This is a sorted list where element 0 expires the soonest. + Addrs []*AddrBookRecord_AddrEntry `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *AddrBookRecord) Reset() { *m = AddrBookRecord{} } func (m *AddrBookRecord) String() string { return proto.CompactTextString(m) } func (*AddrBookRecord) ProtoMessage() {} func (*AddrBookRecord) Descriptor() ([]byte, []int) { - return fileDescriptor_pstore_842456c80d89ffef, []int{0} + return fileDescriptor_pstore_fcc3073dbc5464a9, []int{0} } func (m *AddrBookRecord) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -74,21 +65,7 @@ func (m *AddrBookRecord) XXX_DiscardUnknown() { var xxx_messageInfo_AddrBookRecord proto.InternalMessageInfo -func (m *AddrBookRecord) GetId() []byte { - if m != nil { - return m.Id - } - return nil -} - -func (m *AddrBookRecord) GetNextVisit() *time.Time { - if m != nil { - return m.NextVisit - } - return nil -} - -func (m *AddrBookRecord) GetAddrs() map[string]*AddrBookRecord_AddrEntry { +func (m *AddrBookRecord) GetAddrs() []*AddrBookRecord_AddrEntry { if m != nil { return m.Addrs } @@ -97,20 +74,21 @@ func (m *AddrBookRecord) GetAddrs() map[string]*AddrBookRecord_AddrEntry { // AddrEntry represents a single multiaddress. type AddrBookRecord_AddrEntry struct { + Addr *ProtoAddr `protobuf:"bytes,1,opt,name=addr,proto3,customtype=ProtoAddr" json:"addr,omitempty"` // The point in time when this address expires. - Expiry *time.Time `protobuf:"bytes,2,opt,name=expiry,stdtime" json:"expiry,omitempty"` + Expiry int64 `protobuf:"varint,2,opt,name=expiry,proto3" json:"expiry,omitempty"` // The original TTL of this address. - Ttl *time.Duration `protobuf:"bytes,3,opt,name=ttl,stdduration" json:"ttl,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Ttl int64 `protobuf:"varint,3,opt,name=ttl,proto3" json:"ttl,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *AddrBookRecord_AddrEntry) Reset() { *m = AddrBookRecord_AddrEntry{} } func (m *AddrBookRecord_AddrEntry) String() string { return proto.CompactTextString(m) } func (*AddrBookRecord_AddrEntry) ProtoMessage() {} func (*AddrBookRecord_AddrEntry) Descriptor() ([]byte, []int) { - return fileDescriptor_pstore_842456c80d89ffef, []int{0, 1} + return fileDescriptor_pstore_fcc3073dbc5464a9, []int{0, 0} } func (m *AddrBookRecord_AddrEntry) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -139,23 +117,22 @@ func (m *AddrBookRecord_AddrEntry) XXX_DiscardUnknown() { var xxx_messageInfo_AddrBookRecord_AddrEntry proto.InternalMessageInfo -func (m *AddrBookRecord_AddrEntry) GetExpiry() *time.Time { +func (m *AddrBookRecord_AddrEntry) GetExpiry() int64 { if m != nil { return m.Expiry } - return nil + return 0 } -func (m *AddrBookRecord_AddrEntry) GetTtl() *time.Duration { +func (m *AddrBookRecord_AddrEntry) GetTtl() int64 { if m != nil { return m.Ttl } - return nil + return 0 } func init() { proto.RegisterType((*AddrBookRecord)(nil), "pstore.pb.AddrBookRecord") - proto.RegisterMapType((map[string]*AddrBookRecord_AddrEntry)(nil), "pstore.pb.AddrBookRecord.AddrsEntry") proto.RegisterType((*AddrBookRecord_AddrEntry)(nil), "pstore.pb.AddrBookRecord.AddrEntry") } func (m *AddrBookRecord) Marshal() (dAtA []byte, err error) { @@ -173,48 +150,26 @@ func (m *AddrBookRecord) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Id) > 0 { + if m.Id != nil { dAtA[i] = 0xa i++ - i = encodeVarintPstore(dAtA, i, uint64(len(m.Id))) - i += copy(dAtA[i:], m.Id) - } - if m.NextVisit != nil { - dAtA[i] = 0x12 - i++ - i = encodeVarintPstore(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.NextVisit))) - n1, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.NextVisit, dAtA[i:]) + i = encodeVarintPstore(dAtA, i, uint64(m.Id.Size())) + n1, err := m.Id.MarshalTo(dAtA[i:]) if err != nil { return 0, err } i += n1 } if len(m.Addrs) > 0 { - for k, _ := range m.Addrs { - dAtA[i] = 0x1a + for _, msg := range m.Addrs { + dAtA[i] = 0x12 i++ - v := m.Addrs[k] - msgSize := 0 - if v != nil { - msgSize = v.Size() - msgSize += 1 + sovPstore(uint64(msgSize)) - } - mapSize := 1 + len(k) + sovPstore(uint64(len(k))) + msgSize - i = encodeVarintPstore(dAtA, i, uint64(mapSize)) - dAtA[i] = 0xa - i++ - i = encodeVarintPstore(dAtA, i, uint64(len(k))) - i += copy(dAtA[i:], k) - if v != nil { - dAtA[i] = 0x12 - i++ - i = encodeVarintPstore(dAtA, i, uint64(v.Size())) - n2, err := v.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n2 + i = encodeVarintPstore(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err } + i += n } } if m.XXX_unrecognized != nil { @@ -238,25 +193,25 @@ func (m *AddrBookRecord_AddrEntry) MarshalTo(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.Expiry != nil { - dAtA[i] = 0x12 + if m.Addr != nil { + dAtA[i] = 0xa i++ - i = encodeVarintPstore(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(*m.Expiry))) - n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.Expiry, dAtA[i:]) + i = encodeVarintPstore(dAtA, i, uint64(m.Addr.Size())) + n2, err := m.Addr.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n3 + i += n2 } - if m.Ttl != nil { - dAtA[i] = 0x1a + if m.Expiry != 0 { + dAtA[i] = 0x10 i++ - i = encodeVarintPstore(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdDuration(*m.Ttl))) - n4, err := github_com_gogo_protobuf_types.StdDurationMarshalTo(*m.Ttl, dAtA[i:]) - if err != nil { - return 0, err - } - i += n4 + i = encodeVarintPstore(dAtA, i, uint64(m.Expiry)) + } + if m.Ttl != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintPstore(dAtA, i, uint64(m.Ttl)) } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -275,34 +230,30 @@ func encodeVarintPstore(dAtA []byte, offset int, v uint64) int { } func NewPopulatedAddrBookRecord(r randyPstore, easy bool) *AddrBookRecord { this := &AddrBookRecord{} - v1 := r.Intn(100) - this.Id = make([]byte, v1) - for i := 0; i < v1; i++ { - this.Id[i] = byte(r.Intn(256)) - } + this.Id = NewPopulatedProtoPeerID(r) if r.Intn(10) != 0 { - this.NextVisit = github_com_gogo_protobuf_types.NewPopulatedStdTime(r, easy) - } - if r.Intn(10) != 0 { - v2 := r.Intn(10) - this.Addrs = make(map[string]*AddrBookRecord_AddrEntry) - for i := 0; i < v2; i++ { - this.Addrs[randStringPstore(r)] = NewPopulatedAddrBookRecord_AddrEntry(r, easy) + v1 := r.Intn(5) + this.Addrs = make([]*AddrBookRecord_AddrEntry, v1) + for i := 0; i < v1; i++ { + this.Addrs[i] = NewPopulatedAddrBookRecord_AddrEntry(r, easy) } } if !easy && r.Intn(10) != 0 { - this.XXX_unrecognized = randUnrecognizedPstore(r, 4) + this.XXX_unrecognized = randUnrecognizedPstore(r, 3) } return this } func NewPopulatedAddrBookRecord_AddrEntry(r randyPstore, easy bool) *AddrBookRecord_AddrEntry { this := &AddrBookRecord_AddrEntry{} - if r.Intn(10) != 0 { - this.Expiry = github_com_gogo_protobuf_types.NewPopulatedStdTime(r, easy) + this.Addr = NewPopulatedProtoAddr(r) + this.Expiry = int64(r.Int63()) + if r.Intn(2) == 0 { + this.Expiry *= -1 } - if r.Intn(10) != 0 { - this.Ttl = github_com_gogo_protobuf_types.NewPopulatedStdDuration(r, easy) + this.Ttl = int64(r.Int63()) + if r.Intn(2) == 0 { + this.Ttl *= -1 } if !easy && r.Intn(10) != 0 { this.XXX_unrecognized = randUnrecognizedPstore(r, 4) @@ -329,9 +280,9 @@ func randUTF8RunePstore(r randyPstore) rune { return rune(ru + 61) } func randStringPstore(r randyPstore) string { - v3 := r.Intn(100) - tmps := make([]rune, v3) - for i := 0; i < v3; i++ { + v2 := r.Intn(100) + tmps := make([]rune, v2) + for i := 0; i < v2; i++ { tmps[i] = randUTF8RunePstore(r) } return string(tmps) @@ -353,11 +304,11 @@ func randFieldPstore(dAtA []byte, r randyPstore, fieldNumber int, wire int) []by switch wire { case 0: dAtA = encodeVarintPopulatePstore(dAtA, uint64(key)) - v4 := r.Int63() + v3 := r.Int63() if r.Intn(2) == 0 { - v4 *= -1 + v3 *= -1 } - dAtA = encodeVarintPopulatePstore(dAtA, uint64(v4)) + dAtA = encodeVarintPopulatePstore(dAtA, uint64(v3)) case 1: dAtA = encodeVarintPopulatePstore(dAtA, uint64(key)) dAtA = append(dAtA, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) @@ -388,25 +339,14 @@ func (m *AddrBookRecord) Size() (n int) { } var l int _ = l - l = len(m.Id) - if l > 0 { - n += 1 + l + sovPstore(uint64(l)) - } - if m.NextVisit != nil { - l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.NextVisit) + if m.Id != nil { + l = m.Id.Size() n += 1 + l + sovPstore(uint64(l)) } if len(m.Addrs) > 0 { - for k, v := range m.Addrs { - _ = k - _ = v - l = 0 - if v != nil { - l = v.Size() - l += 1 + sovPstore(uint64(l)) - } - mapEntrySize := 1 + len(k) + sovPstore(uint64(len(k))) + l - n += mapEntrySize + 1 + sovPstore(uint64(mapEntrySize)) + for _, e := range m.Addrs { + l = e.Size() + n += 1 + l + sovPstore(uint64(l)) } } if m.XXX_unrecognized != nil { @@ -421,13 +361,15 @@ func (m *AddrBookRecord_AddrEntry) Size() (n int) { } var l int _ = l - if m.Expiry != nil { - l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.Expiry) + if m.Addr != nil { + l = m.Addr.Size() n += 1 + l + sovPstore(uint64(l)) } - if m.Ttl != nil { - l = github_com_gogo_protobuf_types.SizeOfStdDuration(*m.Ttl) - n += 1 + l + sovPstore(uint64(l)) + if m.Expiry != 0 { + n += 1 + sovPstore(uint64(m.Expiry)) + } + if m.Ttl != 0 { + n += 1 + sovPstore(uint64(m.Ttl)) } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) @@ -503,45 +445,13 @@ func (m *AddrBookRecord) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Id = append(m.Id[:0], dAtA[iNdEx:postIndex]...) - if m.Id == nil { - m.Id = []byte{} - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field NextVisit", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPstore - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPstore - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.NextVisit == nil { - m.NextVisit = new(time.Time) - } - if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.NextVisit, dAtA[iNdEx:postIndex]); err != nil { + var v ProtoPeerID + m.Id = &v + if err := m.Id.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex - case 3: + case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType) } @@ -567,102 +477,10 @@ func (m *AddrBookRecord) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.Addrs == nil { - m.Addrs = make(map[string]*AddrBookRecord_AddrEntry) + m.Addrs = append(m.Addrs, &AddrBookRecord_AddrEntry{}) + if err := m.Addrs[len(m.Addrs)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err } - var mapkey string - var mapvalue *AddrBookRecord_AddrEntry - for iNdEx < postIndex { - entryPreIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPstore - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - if fieldNum == 1 { - var stringLenmapkey uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPstore - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLenmapkey |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - intStringLenmapkey := int(stringLenmapkey) - if intStringLenmapkey < 0 { - return ErrInvalidLengthPstore - } - postStringIndexmapkey := iNdEx + intStringLenmapkey - if postStringIndexmapkey > l { - return io.ErrUnexpectedEOF - } - mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) - iNdEx = postStringIndexmapkey - } else if fieldNum == 2 { - var mapmsglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPstore - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - mapmsglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if mapmsglen < 0 { - return ErrInvalidLengthPstore - } - postmsgIndex := iNdEx + mapmsglen - if mapmsglen < 0 { - return ErrInvalidLengthPstore - } - if postmsgIndex > l { - return io.ErrUnexpectedEOF - } - mapvalue = &AddrBookRecord_AddrEntry{} - if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { - return err - } - iNdEx = postmsgIndex - } else { - iNdEx = entryPreIndex - skippy, err := skipPstore(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPstore - } - if (iNdEx + skippy) > postIndex { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - m.Addrs[mapkey] = mapvalue iNdEx = postIndex default: iNdEx = preIndex @@ -715,11 +533,43 @@ func (m *AddrBookRecord_AddrEntry) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: AddrEntry: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { - case 2: + case 1: if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPstore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthPstore + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + var v ProtoAddr + m.Addr = &v + if err := m.Addr.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Expiry", wireType) } - var msglen int + m.Expiry = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowPstore @@ -729,30 +579,16 @@ func (m *AddrBookRecord_AddrEntry) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + m.Expiry |= (int64(b) & 0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { - return ErrInvalidLengthPstore - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Expiry == nil { - m.Expiry = new(time.Time) - } - if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.Expiry, dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex case 3: - if wireType != 2 { + if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Ttl", wireType) } - var msglen int + m.Ttl = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowPstore @@ -762,25 +598,11 @@ func (m *AddrBookRecord_AddrEntry) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - msglen |= (int(b) & 0x7F) << shift + m.Ttl |= (int64(b) & 0x7F) << shift if b < 0x80 { break } } - if msglen < 0 { - return ErrInvalidLengthPstore - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Ttl == nil { - m.Ttl = new(time.Duration) - } - if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(m.Ttl, dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipPstore(dAtA[iNdEx:]) @@ -908,29 +730,24 @@ var ( ErrIntOverflowPstore = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("pstore.proto", fileDescriptor_pstore_842456c80d89ffef) } +func init() { proto.RegisterFile("pstore.proto", fileDescriptor_pstore_fcc3073dbc5464a9) } -var fileDescriptor_pstore_842456c80d89ffef = []byte{ - // 330 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xbf, 0x4e, 0xc3, 0x30, - 0x10, 0xc6, 0xe5, 0xa4, 0xad, 0x14, 0xb7, 0xaa, 0x90, 0xc5, 0x10, 0x32, 0xb8, 0x15, 0x30, 0x74, - 0xc1, 0x15, 0x65, 0x29, 0x1d, 0x90, 0x88, 0xe0, 0x05, 0x22, 0xc4, 0xc6, 0x90, 0xd4, 0x26, 0x58, - 0xfd, 0x73, 0x91, 0xe3, 0xa0, 0xf6, 0x2d, 0x18, 0x79, 0x1c, 0x46, 0x46, 0xde, 0x00, 0xc8, 0x3b, - 0x20, 0x31, 0xa2, 0x38, 0x69, 0x23, 0x40, 0x42, 0x6c, 0xf7, 0xdd, 0x7d, 0xbf, 0xf3, 0x77, 0x32, - 0xee, 0x24, 0xa9, 0x06, 0x25, 0x58, 0xa2, 0x40, 0x03, 0x71, 0x36, 0x2a, 0xf2, 0x7a, 0x31, 0x40, - 0x3c, 0x17, 0x43, 0x33, 0x88, 0xb2, 0xdb, 0xa1, 0x96, 0x0b, 0x91, 0xea, 0x70, 0x91, 0x94, 0x5e, - 0x8f, 0xfe, 0x34, 0xf0, 0x4c, 0x85, 0x5a, 0xc2, 0xb2, 0x9a, 0x1f, 0xc5, 0x52, 0xdf, 0x65, 0x11, - 0x9b, 0xc2, 0x62, 0x18, 0x43, 0x0c, 0xb5, 0xb1, 0x50, 0x46, 0x98, 0xaa, 0xb4, 0xef, 0x7f, 0x58, - 0xb8, 0x7b, 0xce, 0xb9, 0xf2, 0x01, 0x66, 0x81, 0x98, 0x82, 0xe2, 0xa4, 0x8b, 0x2d, 0xc9, 0x5d, - 0xd4, 0x47, 0x83, 0x4e, 0x60, 0x49, 0x4e, 0xce, 0xb0, 0xb3, 0x14, 0x2b, 0x7d, 0x2d, 0x53, 0xa9, - 0x5d, 0xab, 0x8f, 0x06, 0xed, 0x91, 0xc7, 0xca, 0x14, 0x6c, 0xb3, 0x9c, 0x5d, 0x6d, 0x62, 0xfa, - 0x8d, 0x87, 0xd7, 0x1e, 0x0a, 0x6a, 0x84, 0x4c, 0x70, 0x33, 0xe4, 0x5c, 0xa5, 0xae, 0xdd, 0xb7, - 0x07, 0xed, 0xd1, 0x21, 0xdb, 0x5e, 0xcb, 0xbe, 0xbf, 0x6c, 0x64, 0x7a, 0xb9, 0xd4, 0x6a, 0x1d, - 0x94, 0x88, 0x77, 0x83, 0x71, 0xdd, 0x24, 0x3b, 0xd8, 0x9e, 0x89, 0xb5, 0x89, 0xe6, 0x04, 0x45, - 0x49, 0x4e, 0x71, 0xf3, 0x3e, 0x9c, 0x67, 0xa2, 0xca, 0x75, 0xf0, 0xf7, 0xee, 0x6a, 0xb5, 0x21, - 0x26, 0xd6, 0x18, 0x79, 0x2b, 0xec, 0x6c, 0xfb, 0x64, 0x8c, 0x5b, 0x62, 0x95, 0x48, 0xb5, 0xfe, - 0xf7, 0x91, 0x95, 0x9f, 0x1c, 0x63, 0x5b, 0xeb, 0xb9, 0x6b, 0x1b, 0x6c, 0xef, 0x17, 0x76, 0x51, - 0xfd, 0x90, 0xdf, 0x78, 0x2c, 0xa8, 0xc2, 0xeb, 0xef, 0x7e, 0xbe, 0x53, 0xf4, 0x94, 0x53, 0xf4, - 0x9c, 0x53, 0xf4, 0x92, 0x53, 0xf4, 0x96, 0x53, 0x14, 0xb5, 0x0c, 0x73, 0xf2, 0x15, 0x00, 0x00, - 0xff, 0xff, 0x0c, 0x8d, 0xe8, 0x16, 0x1f, 0x02, 0x00, 0x00, +var fileDescriptor_pstore_fcc3073dbc5464a9 = []byte{ + // 243 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x28, 0x2e, 0xc9, + 0x2f, 0x4a, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0xf1, 0x92, 0xa4, 0x74, 0xd3, + 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0xf5, 0xc1, + 0x2a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0xe8, 0x54, 0x3a, 0xc6, 0xc8, 0xc5, + 0xe7, 0x98, 0x92, 0x52, 0xe4, 0x94, 0x9f, 0x9f, 0x1d, 0x94, 0x9a, 0x9c, 0x5f, 0x94, 0x22, 0x24, + 0xcf, 0xc5, 0x94, 0x99, 0x22, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0xe3, 0xc4, 0x7f, 0xeb, 0x9e, 0x3c, + 0x77, 0x00, 0x48, 0x65, 0x40, 0x6a, 0x6a, 0x91, 0xa7, 0x4b, 0x10, 0x53, 0x66, 0x8a, 0x90, 0x25, + 0x17, 0x6b, 0x62, 0x4a, 0x4a, 0x51, 0xb1, 0x04, 0x93, 0x02, 0xb3, 0x06, 0xb7, 0x91, 0xb2, 0x1e, + 0xdc, 0x76, 0x3d, 0x54, 0xa3, 0xc0, 0x5c, 0xd7, 0xbc, 0x92, 0xa2, 0xca, 0x20, 0x88, 0x0e, 0xa9, + 0x08, 0x2e, 0x4e, 0xb8, 0x98, 0x90, 0x22, 0x17, 0x0b, 0x48, 0x14, 0x6a, 0x15, 0xef, 0xad, 0x7b, + 0xf2, 0x9c, 0x60, 0xab, 0x40, 0x2a, 0x82, 0xc0, 0x52, 0x42, 0x62, 0x5c, 0x6c, 0xa9, 0x15, 0x05, + 0x99, 0x45, 0x95, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x50, 0x9e, 0x90, 0x00, 0x17, 0x73, + 0x49, 0x49, 0x8e, 0x04, 0x33, 0x58, 0x10, 0xc4, 0x74, 0x12, 0xf9, 0xf1, 0x50, 0x8e, 0xf1, 0xc0, + 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0x31, + 0x89, 0x0d, 0xec, 0x4b, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x2b, 0x91, 0xbf, 0xc2, 0x2f, + 0x01, 0x00, 0x00, } diff --git a/pb/pstore.proto b/pb/pstore.proto index 2a76846..3894534 100644 --- a/pb/pstore.proto +++ b/pb/pstore.proto @@ -1,32 +1,27 @@ syntax = "proto3"; package pstore.pb; -import "google/protobuf/timestamp.proto"; -import "google/protobuf/duration.proto"; import "github.com/gogo/protobuf/gogoproto/gogo.proto"; -option (gogoproto.marshaler_all) = true; -option (gogoproto.unmarshaler_all) = true; option (gogoproto.benchgen_all) = true; option (gogoproto.populate_all) = true; // AddrBookRecord represents a record for a peer in the address book. message AddrBookRecord { // The peer ID. - bytes id = 1; + bytes id = 1 [(gogoproto.customtype) = "ProtoPeerID"]; - // The timestamp where purging needs to happen. - google.protobuf.Timestamp nextVisit = 2 [(gogoproto.stdtime) = true]; - - // The multiaddresses. - map addrs = 3; + // The multiaddresses. This is a sorted list where element 0 expires the soonest. + repeated AddrEntry addrs = 2; // AddrEntry represents a single multiaddress. message AddrEntry { + bytes addr = 1 [(gogoproto.customtype) = "ProtoAddr"]; + // The point in time when this address expires. - google.protobuf.Timestamp expiry = 2 [(gogoproto.stdtime) = true]; + int64 expiry = 2; // The original TTL of this address. - google.protobuf.Duration ttl = 3 [(gogoproto.stdduration) = true]; + int64 ttl = 3; } } diff --git a/pb/pstorepb_test.go b/pb/pstorepb_test.go index 6d8cbf3..9b7ae03 100644 --- a/pb/pstorepb_test.go +++ b/pb/pstorepb_test.go @@ -10,8 +10,6 @@ import proto "github.com/gogo/protobuf/proto" import fmt "fmt" import math "math" import _ "github.com/gogo/protobuf/gogoproto" -import _ "github.com/golang/protobuf/ptypes/duration" -import _ "github.com/golang/protobuf/ptypes/timestamp" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal diff --git a/pstoreds/addr_book.go b/pstoreds/addr_book.go index 7eb08b7..0dfff50 100644 --- a/pstoreds/addr_book.go +++ b/pstoreds/addr_book.go @@ -3,6 +3,7 @@ package pstoreds import ( "context" "fmt" + "sort" "sync" "time" @@ -39,24 +40,30 @@ var ( type addrsRecord struct { sync.RWMutex *pb.AddrBookRecord - - delete bool + dirty bool } -// FlushInTxn flushes the record to the datastore by calling ds.Put, unless the record is -// marked for deletion, in which case the deletion is executed. +// FlushInTxn writes the record to the datastore by calling ds.Put, unless the record is +// marked for deletion, in which case the deletion is executed via ds.Delete. func (r *addrsRecord) FlushInTxn(txn ds.Txn) (err error) { - key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id))) - if r.delete { + key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID))) + if len(r.Addrs) == 0 { return txn.Delete(key) } data := pool.Get(r.Size()) defer pool.Put(data) + + // i is the number of bytes that were effectively written. i, err := r.MarshalTo(data) if err != nil { return err } - return txn.Put(key, data[:i]) + if err := txn.Put(key, data[:i]); err != nil { + return err + } + // write succeeded; record is no longer dirty. + r.dirty = false + return nil } // Flush creates a ds.Txn, and calls FlushInTxn with it. @@ -73,58 +80,56 @@ func (r *addrsRecord) Flush(ds ds.TxnDatastore) (err error) { return txn.Commit() } -// Refresh is called on in-memory entries to perform housekeeping. Refresh does the following: -// * removes all expired addresses. -// * recalculates the date in which the record needs to be revisited (earliest expiration of survivors). -// * marks the record for deletion if no addresses are left. +// Refresh is called on records to perform housekeeping. The return value signals if the record was changed +// as a result of the refresh. // -// A `true` value of `force` tells us to proceed with housekeeping even if the `NextVisit` date has not arrived. +// Refresh does the following: +// * sorts the addresses by expiration (soonest expiring first). +// * removes the addresses that have expired. // -// Refresh is called in several occasions: -// * with force=false, when accessing and loading an entry, or when performing GC. -// * with force=true, after an entry has been modified (e.g. addresses have been added or removed, +// It short-circuits optimistically when we know there's nothing to do. +// +// Refresh is called from several points: +// * when accessing and loading an entry. +// * when performing periodic GC. +// * after an entry has been modified (e.g. addresses have been added or removed, // TTLs updated, etc.) -func (r *addrsRecord) Refresh(force bool) (chgd bool) { - if len(r.Addrs) == 0 { - r.delete = true - r.NextVisit = nil - return true - } - - now := time.Now() - if !force && r.NextVisit != nil && !r.NextVisit.IsZero() && now.Before(*r.NextVisit) { - // no expired entries to purge, and no forced housekeeping. +// +// If the return value is true, the caller can perform a flush immediately, or can schedule an async +// flush, depending on the context. +func (r *addrsRecord) Refresh() (chgd bool) { + now := time.Now().Unix() + if !r.dirty && len(r.Addrs) > 0 && r.Addrs[0].Expiry > now { + // record is not dirty, and we have no expired entries to purge. return false } - // nv stores the next visit for surviving addresses following the purge - var nv time.Time - for addr, entry := range r.Addrs { - if entry.Expiry == nil { - continue - } - if nv.IsZero() { - nv = *entry.Expiry - } - if now.After(*entry.Expiry) { - // this entry is expired; remove it. - delete(r.Addrs, addr) - chgd = true - } else if nv.After(*entry.Expiry) { - // keep track of the earliest expiry across survivors. - nv = *entry.Expiry - } - } - if len(r.Addrs) == 0 { - r.delete = true - r.NextVisit = nil + // this is a ghost record; let's signal it has to be written. + // Flush() will take care of doing the deletion. return true } - chgd = chgd || r.NextVisit == nil || nv != *r.NextVisit - r.NextVisit = &nv - return chgd + if r.dirty && len(r.Addrs) > 1 { + // the record has been modified, so it may need resorting. + // we keep addresses sorted by expiration, where 0 is the soonest expiring. + sort.Slice(r.Addrs, func(i, j int) bool { + return r.Addrs[i].Expiry < r.Addrs[j].Expiry + }) + } + + // since addresses are sorted by expiration, we find the first survivor and split the + // slice on its index. + pivot := -1 + for i, addr := range r.Addrs { + if addr.Expiry > now { + break + } + pivot = i + } + + r.Addrs = r.Addrs[pivot+1:] + return r.dirty || pivot >= 0 } // dsAddrBook is an address book backed by a Datastore with a GC-like procedure @@ -145,8 +150,7 @@ type dsAddrBook struct { var _ pstore.AddrBook = (*dsAddrBook)(nil) -// NewAddrBook initializes a new address book given a -// Datastore instance, a context for managing the TTL manager, +// NewAddrBook initializes a new address book given a Datastore instance, a context for managing the TTL manager, // and the interval at which the TTL manager should sweep the Datastore. func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab *dsAddrBook, err error) { var cache cache = new(noopCache) @@ -168,6 +172,7 @@ func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab * closedCh: make(chan struct{}), } + // kick off periodic GC. go mgr.background() return mgr, nil @@ -178,20 +183,26 @@ func (ab *dsAddrBook) Close() { <-ab.closedCh } -func (ab *dsAddrBook) tryFlush(pr *addrsRecord) { +func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) { select { case ab.flushJobCh <- pr: default: - id, _ := peer.IDFromBytes(pr.Id) - log.Warningf("flush queue is full; could not flush peer %v", id) + log.Warningf("flush queue is full; could not flush peer %v", pr.Id.ID.Pretty()) } } +// loadRecord is a read-through fetch. It fetches a record from cache, falling back to the +// datastore upon a miss, and returning an newly initialized record if the peer doesn't exist. +// +// loadRecord calls Refresh() on the record before returning it. If the record changes +// as a result and `update=true`, an async flush is scheduled. +// +// If `cache=true`, the record is inserted in the cache when loaded from the datastore. func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) { if e, ok := ab.cache.Get(id); ok { pr = e.(*addrsRecord) - if pr.Refresh(false) && update { - ab.tryFlush(pr) + if pr.Refresh() && update { + ab.asyncFlush(pr) } return pr, nil } @@ -205,46 +216,38 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id))) data, err := txn.Get(key) + if err != nil && err != ds.ErrNotFound { + return nil, err + } + if err == nil { pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} if err = pr.Unmarshal(data); err != nil { return nil, err } - if pr.Refresh(false) { - ab.tryFlush(pr) + if pr.Refresh() && update { + ab.asyncFlush(pr) } - if cache { - ab.cache.Add(id, pr) - } - return pr, nil + } else { + pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{Id: &pb.ProtoPeerID{ID: id}}} } - if err == ds.ErrNotFound { - pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{ - Id: []byte(id), - Addrs: make(map[string]*pb.AddrBookRecord_AddrEntry), - }} - if cache { - ab.cache.Add(id, pr) - } - return pr, nil + if cache { + ab.cache.Add(id, pr) } - - log.Error(err) - return nil, err + return pr, nil } -// background is the peerstore process that takes care of: -// * purging expired addresses and peers with no addresses from the datastore at regular intervals. -// * asynchronously flushing cached entries with expired addresses to the datastore. +// background runs the housekeeping process that takes care of: +// +// * purging expired addresses from the datastore at regular intervals. +// * persisting asynchronous flushes to the datastore. func (ab *dsAddrBook) background() { timer := time.NewTicker(ab.gcInterval) - for { select { case fj := <-ab.flushJobCh: - id, _ := peer.IDFromBytes(fj.Id) - if cached, ok := ab.cache.Peek(id); ok { + if cached, ok := ab.cache.Peek(fj.Id.ID); ok { // Only continue flushing if the record we have in memory is the same as for which the flush // job was requested. If it's not in memory, it has been evicted and we don't know if we hold // the latest state or not. Similarly, if it's cached but the pointer is different, it means @@ -267,35 +270,44 @@ func (ab *dsAddrBook) background() { } } +var purgeQuery = query.Query{Prefix: addrBookBase.String()} + +// purgeCycle runs a GC cycle func (ab *dsAddrBook) purgeCycle() { var id peer.ID record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} - q := query.Query{Prefix: addrBookBase.String()} - txn, err := ab.ds.NewTransaction(false) if err != nil { log.Warningf("failed while purging entries: %v\n", err) return } - defer txn.Discard() - results, err := txn.Query(q) + results, err := txn.Query(purgeQuery) if err != nil { log.Warningf("failed while purging entries: %v\n", err) return } - defer results.Close() for result := range results.Next() { - id, _ = peer.IDFromBytes(record.Id) - - // if the record is in cache, let's refresh that one and flush it if necessary. + k, err := b32.RawStdEncoding.DecodeString(ds.RawKey(result.Key).Name()) + if err != nil { + // TODO: drop the record? this will keep failing forever. + log.Warningf("failed while purging record: %v, err: %v\n", result.Key, err) + continue + } + id, err = peer.IDFromBytes(k) + if err != nil { + // TODO: drop the record? this will keep failing forever. + log.Warningf("failed to get extract peer ID from bytes (hex): %x, err: %v\n", k, err) + continue + } + // if the record is in cache, we refresh it and flush it if necessary. if e, ok := ab.cache.Peek(id); ok { cached := e.(*addrsRecord) cached.Lock() - if cached.Refresh(false) { + if cached.Refresh() { cached.FlushInTxn(txn) } cached.Unlock() @@ -303,11 +315,11 @@ func (ab *dsAddrBook) purgeCycle() { } if err := record.Unmarshal(result.Value); err != nil { - log.Warningf("failed while purging entries: %v\n", err) + // TODO: drop the record? this will keep failing forever. + log.Warningf("failed while deserializing entry with key: %v, err: %v\n", result.Key, err) continue } - - if record.Refresh(false) { + if record.Refresh() { record.FlushInTxn(txn) } record.Reset() @@ -358,17 +370,16 @@ func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.D pr.Lock() defer pr.Unlock() - chgd, newExp := false, time.Now().Add(newTTL) + newExp := time.Now().Add(newTTL).Unix() for _, entry := range pr.Addrs { - if entry.Ttl == nil || *entry.Ttl != oldTTL { + if entry.Ttl != int64(oldTTL) { continue } - entry.Ttl, entry.Expiry = &newTTL, &newExp - chgd = true + entry.Ttl, entry.Expiry = int64(newTTL), newExp + pr.dirty = true } - if chgd { - pr.Refresh(true) + if pr.Refresh() { pr.Flush(ab.ds) } } @@ -385,10 +396,8 @@ func (ab *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { defer pr.RUnlock() addrs := make([]ma.Multiaddr, 0, len(pr.Addrs)) - for k, _ := range pr.Addrs { - if a, err := ma.NewMultiaddr(k); err == nil { - addrs = append(addrs, a) - } + for _, a := range pr.Addrs { + addrs = append(addrs, a.Addr) } return addrs } @@ -434,32 +443,54 @@ func (ab *dsAddrBook) ClearAddrs(p peer.ID) { func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) (err error) { pr, err := ab.loadRecord(p, true, false) if err != nil { - return fmt.Errorf("failed to load peerstore entry for peer %v while deleting addrs, err: %v", p, err) + return fmt.Errorf("failed to load peerstore entry for peer %v while setting addrs, err: %v", p, err) } pr.Lock() defer pr.Unlock() - now := time.Now() - broadcast := make([]bool, len(addrs)) - newExp := now.Add(ttl) - for i, addr := range addrs { - e, ok := pr.Addrs[addr.String()] - if ok && mode == ttlExtend && e.Expiry.After(newExp) { + newExp := time.Now().Add(ttl).Unix() + existed := make([]bool, len(addrs)) // keeps track of which addrs we found + +Outer: + for i, incoming := range addrs { + for _, have := range pr.Addrs { + if incoming.Equal(have.Addr) { + existed[i] = true + if mode == ttlExtend && have.Expiry > newExp { + // if we're only extending TTLs but the addr already has a longer one, we skip it. + continue Outer + } + have.Expiry = newExp + // we found the address, and addresses cannot be duplicate, + // so let's move on to the next. + continue Outer + } + } + } + + // add addresses we didn't hold. + var added []*pb.AddrBookRecord_AddrEntry + for i, e := range existed { + if e { continue } - pr.Addrs[addr.String()] = &pb.AddrBookRecord_AddrEntry{Expiry: &newExp, Ttl: &ttl} - broadcast[i] = !ok - } - - // Update was successful, so broadcast event only for new addresses. - for i, v := range broadcast { - if v { - ab.subsManager.BroadcastAddr(p, addrs[i]) + addr := addrs[i] + entry := &pb.AddrBookRecord_AddrEntry{ + Addr: &pb.ProtoAddr{Multiaddr: addr}, + Ttl: int64(ttl), + Expiry: newExp, } + added = append(added, entry) + // TODO: should we only broadcast if we updated the store successfully? + // we have no way of rolling back the state of the in-memory record, although we + // could at the expense of allocs. But is it worthwhile? + ab.subsManager.BroadcastAddr(p, addr) } - pr.Refresh(true) + pr.Addrs = append(pr.Addrs, added...) + pr.dirty = true + pr.Refresh() return pr.Flush(ab.ds) } @@ -476,11 +507,23 @@ func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) { pr.Lock() defer pr.Unlock() - for _, addr := range addrs { - delete(pr.Addrs, addr.String()) + // deletes addresses in place, and avoiding copies until we encounter the first deletion. + survived := 0 + for i, addr := range pr.Addrs { + for _, del := range addrs { + if addr.Addr.Equal(del) { + continue + } + if i != survived { + pr.Addrs[survived] = pr.Addrs[i] + } + survived++ + } } + pr.Addrs = pr.Addrs[:survived] - pr.Refresh(true) + pr.dirty = true + pr.Refresh() return pr.Flush(ab.ds) }