migrate Addrs() and PeersWithAddrs() to use txns, improve perf, benchmarks.

This commit is contained in:
Raúl Kripalani 2018-09-07 18:37:01 +01:00
parent fe732515cd
commit 2901497643
2 changed files with 59 additions and 54 deletions

View File

@ -89,11 +89,6 @@ func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, er
return keys[:i], clean[:i], nil return keys[:i], clean[:i], nil
} }
func peerIDFromKey(key ds.Key) (peer.ID, error) {
idstring := key.Parent().Name()
return peer.IDB58Decode(idstring)
}
// AddAddr will add a new address if it's not already in the AddrBook. // AddAddr will add a new address if it's not already in the AddrBook.
func (mgr *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { func (mgr *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
@ -264,36 +259,41 @@ func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.
mgr.ttlManager.adjustTTLs(prefix, oldTTL, newTTL) mgr.ttlManager.adjustTTLs(prefix, oldTTL, newTTL)
} }
// Addrs Returns all of the non-expired addresses for a given peer. // Addrs returns all of the non-expired addresses for a given peer.
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
prefix := ds.NewKey(p.Pretty()) var (
q := query.Query{Prefix: prefix.String(), KeysOnly: true} prefix = ds.NewKey(p.Pretty())
results, err := mgr.ds.Query(q) q = query.Query{Prefix: prefix.String(), KeysOnly: true}
results query.Results
err error
)
if err != nil { txn := mgr.ds.NewTransaction(true)
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
log.Error(err) log.Error(err)
return nil return nil
} }
defer results.Close()
var addrs []ma.Multiaddr var addrs []ma.Multiaddr
for result := range results.Next() { for result := range results.Next() {
key := ds.RawKey(result.Key) key := ds.RawKey(result.Key)
var addri interface{} var addri interface{}
addri, ok := mgr.cache.Get(key) addri, ok := mgr.cache.Get(key)
if !ok { if !ok {
addri, err = mgr.ds.Get(key) if addri, err = txn.Get(key); err != nil {
if err != nil {
log.Error(err) log.Error(err)
continue continue
} }
} }
addrbytes := addri.([]byte)
addr, err := ma.NewMultiaddrBytes(addrbytes) if addr, err := ma.NewMultiaddrBytes(addri.([]byte)); err != nil {
if err != nil { addrs = append(addrs, addr)
log.Error(err)
continue
} }
addrs = append(addrs, addr)
} }
return addrs return addrs
@ -301,27 +301,36 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
// Peers returns all of the peer IDs for which the AddrBook has addresses. // Peers returns all of the peer IDs for which the AddrBook has addresses.
func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice { func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
q := query.Query{KeysOnly: true} var (
results, err := mgr.ds.Query(q) q = query.Query{KeysOnly: true}
results query.Results
err error
)
if err != nil { txn := mgr.ds.NewTransaction(true)
defer txn.Discard()
if results, err = txn.Query(q); err != nil {
log.Error(err) log.Error(err)
return peer.IDSlice{} return peer.IDSlice{}
} }
idset := make(map[peer.ID]struct{}) defer results.Close()
idset := make(map[string]struct{})
for result := range results.Next() { for result := range results.Next() {
key := ds.RawKey(result.Key) key := ds.RawKey(result.Key)
id, err := peerIDFromKey(key) idset[key.Parent().Name()] = struct{}{}
if err != nil {
continue
}
idset[id] = struct{}{}
} }
ids := make(peer.IDSlice, 0, len(idset)) if len(idset) == 0 {
return peer.IDSlice{}
}
ids := make(peer.IDSlice, 1, len(idset))
for id := range idset { for id := range idset {
ids = append(ids, id) i, _ := peer.IDB58Decode(id)
ids = append(ids, i)
} }
return ids return ids
} }
@ -397,21 +406,6 @@ func (mgr *dsAddrBook) dbClear(prefix ds.Key) ([]ds.Key, error) {
return keys, nil return keys, nil
} }
func (mgr *dsAddrBook) doInTransaction(readOnly bool, op func(txn ds.Txn) error) error {
txn := mgr.ds.NewTransaction(false)
defer txn.Discard()
if err := op(txn); err != nil {
return err
}
if err := txn.Commit(); err != nil {
return err
}
return nil
}
type ttlEntry struct { type ttlEntry struct {
TTL time.Duration TTL time.Duration
ExpiresAt time.Time ExpiresAt time.Time

View File

@ -9,10 +9,11 @@ import (
) )
var peerstoreBenchmarks = map[string]func(pstore.Peerstore, chan *peerpair) func(*testing.B){ var peerstoreBenchmarks = map[string]func(pstore.Peerstore, chan *peerpair) func(*testing.B){
"AddAddrs": benchmarkAddAddrs, "AddAddrs": benchmarkAddAddrs,
"SetAddrs": benchmarkSetAddrs, "SetAddrs": benchmarkSetAddrs,
"GetAddrs": benchmarkGetAddrs, "GetAddrs": benchmarkGetAddrs,
"AddAndClearAddrs": benchmarkAddAndClearAddrs, "AddAndClearAddrs": benchmarkAddAndClearAddrs,
"Get1000PeersWithAddrs": benchmarkGet1000PeersWithAddrs,
} }
func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory, variant string) { func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory, variant string) {
@ -84,18 +85,28 @@ func benchmarkGetAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.
} }
func benchmarkAddAndClearAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) { func benchmarkAddAndClearAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) {
// 1000 peers. return func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
pp := <-addrs
ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
ps.ClearAddrs(pp.ID)
}
}
}
func benchmarkGet1000PeersWithAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) {
return func(b *testing.B) { return func(b *testing.B) {
var peers = make([]*peerpair, 1000) var peers = make([]*peerpair, 1000)
for i := 0; i < len(peers); peers[i], i = <-addrs, i+1 { for i, _ := range peers {
pp := <-addrs
ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
peers[i] = pp
} }
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
for _, pp := range peers { _ = ps.PeersWithAddrs()
ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
ps.ClearAddrs(pp.ID)
}
} }
} }
} }