mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2025-03-24 13:10:06 +08:00
Merge pull request #33 from raulk/txndatastore
Migrate to ds.TxnDatastore, optimisations++, cache changes, benchmarks
This commit is contained in:
commit
9194e8fbdf
@ -17,36 +17,43 @@ import (
|
||||
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
|
||||
)
|
||||
|
||||
var log = logging.Logger("peerstore/ds")
|
||||
|
||||
// Number of times to retry transactional writes
|
||||
var dsWriteRetries = 5
|
||||
var (
|
||||
log = logging.Logger("peerstore/ds")
|
||||
)
|
||||
|
||||
var _ pstore.AddrBook = (*dsAddrBook)(nil)
|
||||
|
||||
// dsAddrBook is an address book backed by a Datastore with both an
|
||||
// in-memory TTL manager and an in-memory address stream manager.
|
||||
type dsAddrBook struct {
|
||||
cache *lru.ARCCache
|
||||
ds ds.Batching
|
||||
ttlManager *ttlmanager
|
||||
subsManager *pstoremem.AddrSubManager
|
||||
cache cache
|
||||
ds ds.TxnDatastore
|
||||
ttlManager *ttlManager
|
||||
subsManager *pstoremem.AddrSubManager
|
||||
writeRetries int
|
||||
}
|
||||
|
||||
// NewAddrBook initializes a new address book given a
|
||||
// Datastore instance, a context for managing the TTL manager,
|
||||
// and the interval at which the TTL manager should sweep the Datastore.
|
||||
func NewAddrBook(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) (*dsAddrBook, error) {
|
||||
cache, err := lru.NewARC(1024)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func NewAddrBook(ctx context.Context, ds ds.TxnDatastore, opts Options) (*dsAddrBook, error) {
|
||||
var (
|
||||
cache cache = &noopCache{}
|
||||
err error
|
||||
)
|
||||
|
||||
if opts.CacheSize > 0 {
|
||||
if cache, err = lru.NewARC(int(opts.CacheSize)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
mgr := &dsAddrBook{
|
||||
cache: cache,
|
||||
ds: ds,
|
||||
ttlManager: newTTLManager(ctx, ds, cache, ttlInterval),
|
||||
subsManager: pstoremem.NewAddrSubManager(),
|
||||
cache: cache,
|
||||
ds: ds,
|
||||
ttlManager: newTTLManager(ctx, ds, &cache, opts.TTLInterval),
|
||||
subsManager: pstoremem.NewAddrSubManager(),
|
||||
writeRetries: int(opts.WriteRetries),
|
||||
}
|
||||
return mgr, nil
|
||||
}
|
||||
@ -56,17 +63,29 @@ func (mgr *dsAddrBook) Stop() {
|
||||
mgr.ttlManager.cancel()
|
||||
}
|
||||
|
||||
func peerAddressKey(p *peer.ID, addr *ma.Multiaddr) (ds.Key, error) {
|
||||
hash, err := mh.Sum((*addr).Bytes(), mh.MURMUR3, -1)
|
||||
if err != nil {
|
||||
return ds.Key{}, nil
|
||||
}
|
||||
return ds.NewKey(peer.IDB58Encode(*p)).ChildString(hash.B58String()), nil
|
||||
}
|
||||
func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, error) {
|
||||
var (
|
||||
keys = make([]ds.Key, len(addrs))
|
||||
clean = make([]ma.Multiaddr, len(addrs))
|
||||
parentKey = ds.NewKey(peer.IDB58Encode(p))
|
||||
i = 0
|
||||
)
|
||||
|
||||
func peerIDFromKey(key ds.Key) (peer.ID, error) {
|
||||
idstring := key.Parent().Name()
|
||||
return peer.IDB58Decode(idstring)
|
||||
for _, addr := range addrs {
|
||||
if addr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
hash, err := mh.Sum((addr).Bytes(), mh.MURMUR3, -1)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
keys[i] = parentKey.ChildString(hash.B58String())
|
||||
clean[i] = addr
|
||||
i++
|
||||
}
|
||||
|
||||
return keys[:i], clean[:i], nil
|
||||
}
|
||||
|
||||
// AddAddr will add a new address if it's not already in the AddrBook.
|
||||
@ -79,141 +98,208 @@ func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Durati
|
||||
if ttl <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
mgr.setAddrs(p, addrs, ttl, true)
|
||||
mgr.setAddrs(p, addrs, ttl, false)
|
||||
}
|
||||
|
||||
// SetAddr will add or update the TTL of an address in the AddrBook.
|
||||
func (mgr *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||
addrs := []ma.Multiaddr{addr}
|
||||
mgr.SetAddrs(p, addrs, ttl)
|
||||
}
|
||||
|
||||
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
|
||||
func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.setAddrs(p, addrs, ttl, false)
|
||||
}
|
||||
|
||||
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) {
|
||||
for i := 0; i < dsWriteRetries; i++ {
|
||||
// keys to add to the TTL manager
|
||||
var keys []ds.Key
|
||||
batch, err := mgr.ds.Batch()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
if addr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
key, err := peerAddressKey(&p, &addr)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
|
||||
if ttl <= 0 {
|
||||
if err := batch.Delete(key); err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
mgr.cache.Remove(key)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
has := mgr.cache.Contains(key)
|
||||
if !has {
|
||||
has, err = mgr.ds.Has(key)
|
||||
}
|
||||
if err != nil || !has {
|
||||
mgr.subsManager.BroadcastAddr(p, addr)
|
||||
}
|
||||
|
||||
// Allows us to support AddAddr and SetAddr in one function
|
||||
if !has {
|
||||
if err := batch.Put(key, addr.Bytes()); err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
mgr.cache.Add(key, addr.Bytes())
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := batch.Commit(); err != nil {
|
||||
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
|
||||
continue
|
||||
}
|
||||
mgr.ttlManager.setTTLs(keys, ttl, add)
|
||||
if ttl <= 0 {
|
||||
mgr.deleteAddrs(p, addrs)
|
||||
return
|
||||
}
|
||||
log.Errorf("failed to avoid write conflict for peer %s after %d retries\n", p.Pretty(), dsWriteRetries)
|
||||
mgr.setAddrs(p, addrs, ttl, true)
|
||||
}
|
||||
|
||||
func (mgr *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) error {
|
||||
// Keys and cleaned up addresses.
|
||||
keys, addrs, err := keysAndAddrs(p, addrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mgr.cache.Remove(p.Pretty())
|
||||
// Attempt transactional KV deletion.
|
||||
for i := 0; i < mgr.writeRetries; i++ {
|
||||
if err = mgr.dbDelete(keys); err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed to delete addresses for peer %s: %s\n", p.Pretty(), err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err)
|
||||
return err
|
||||
}
|
||||
|
||||
mgr.ttlManager.deleteTTLs(keys)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, ttlReset bool) error {
|
||||
// Keys and cleaned up addresses.
|
||||
keys, addrs, err := keysAndAddrs(p, addrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mgr.cache.Remove(p.Pretty())
|
||||
// Attempt transactional KV insertion.
|
||||
var existed []bool
|
||||
for i := 0; i < mgr.writeRetries; i++ {
|
||||
if existed, err = mgr.dbInsert(keys, addrs); err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("failed to avoid write conflict for peer %s after %d retries: %v\n", p.Pretty(), mgr.writeRetries, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Update was successful, so broadcast event only for new addresses.
|
||||
for i, _ := range keys {
|
||||
if !existed[i] {
|
||||
mgr.subsManager.BroadcastAddr(p, addrs[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Force update TTLs only if TTL reset was requested; otherwise
|
||||
// insert the appropriate TTL entries if they don't already exist.
|
||||
if ttlReset {
|
||||
mgr.ttlManager.setTTLs(keys, ttl)
|
||||
} else {
|
||||
mgr.ttlManager.insertOrExtendTTLs(keys, ttl)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// dbInsert performs a transactional insert of the provided keys and values.
|
||||
func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr) ([]bool, error) {
|
||||
var (
|
||||
err error
|
||||
existed = make([]bool, len(keys))
|
||||
)
|
||||
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
for i, key := range keys {
|
||||
// Check if the key existed previously.
|
||||
if existed[i], err = txn.Has(key); err != nil {
|
||||
log.Errorf("transaction failed and aborted while checking key existence: %s, cause: %v", key.String(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The key embeds a hash of the value, so if it existed, we can safely skip the insert.
|
||||
if existed[i] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Attempt to add the key.
|
||||
if err = txn.Put(key, addrs[i].Bytes()); err != nil {
|
||||
log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err = txn.Commit(); err != nil {
|
||||
log.Errorf("failed to commit transaction when setting keys, cause: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return existed, nil
|
||||
}
|
||||
|
||||
// UpdateAddrs will update any addresses for a given peer and TTL combination to
|
||||
// have a new TTL.
|
||||
func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
||||
prefix := ds.NewKey(p.Pretty())
|
||||
mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL)
|
||||
mgr.ttlManager.adjustTTLs(prefix, oldTTL, newTTL)
|
||||
}
|
||||
|
||||
// Addrs Returns all of the non-expired addresses for a given peer.
|
||||
// Addrs returns all of the non-expired addresses for a given peer.
|
||||
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
||||
prefix := ds.NewKey(p.Pretty())
|
||||
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
||||
results, err := mgr.ds.Query(q)
|
||||
if err != nil {
|
||||
var (
|
||||
prefix = ds.NewKey(p.Pretty())
|
||||
q = query.Query{Prefix: prefix.String(), KeysOnly: false}
|
||||
results query.Results
|
||||
err error
|
||||
)
|
||||
|
||||
// Check the cache.
|
||||
if entry, ok := mgr.cache.Get(p.Pretty()); ok {
|
||||
e := entry.([]ma.Multiaddr)
|
||||
addrs := make([]ma.Multiaddr, len(e))
|
||||
copy(addrs, e)
|
||||
return addrs
|
||||
}
|
||||
|
||||
txn := mgr.ds.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
if results, err = txn.Query(q); err != nil {
|
||||
log.Error(err)
|
||||
return nil
|
||||
}
|
||||
defer results.Close()
|
||||
|
||||
var addrs []ma.Multiaddr
|
||||
for result := range results.Next() {
|
||||
key := ds.RawKey(result.Key)
|
||||
var addri interface{}
|
||||
addri, ok := mgr.cache.Get(key)
|
||||
if !ok {
|
||||
addri, err = mgr.ds.Get(key)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if addr, err := ma.NewMultiaddrBytes(result.Value); err == nil {
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
addrbytes := addri.([]byte)
|
||||
addr, err := ma.NewMultiaddrBytes(addrbytes)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
|
||||
// Store a copy in the cache.
|
||||
addrsCpy := make([]ma.Multiaddr, len(addrs))
|
||||
copy(addrsCpy, addrs)
|
||||
mgr.cache.Add(p.Pretty(), addrsCpy)
|
||||
|
||||
return addrs
|
||||
}
|
||||
|
||||
// Peers returns all of the peer IDs for which the AddrBook has addresses.
|
||||
func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
|
||||
q := query.Query{KeysOnly: true}
|
||||
results, err := mgr.ds.Query(q)
|
||||
if err != nil {
|
||||
var (
|
||||
q = query.Query{KeysOnly: true}
|
||||
results query.Results
|
||||
err error
|
||||
)
|
||||
|
||||
txn := mgr.ds.NewTransaction(true)
|
||||
defer txn.Discard()
|
||||
|
||||
if results, err = txn.Query(q); err != nil {
|
||||
log.Error(err)
|
||||
return peer.IDSlice{}
|
||||
}
|
||||
|
||||
idset := make(map[peer.ID]struct{})
|
||||
defer results.Close()
|
||||
|
||||
idset := make(map[string]struct{})
|
||||
for result := range results.Next() {
|
||||
key := ds.RawKey(result.Key)
|
||||
id, err := peerIDFromKey(key)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
idset[id] = struct{}{}
|
||||
idset[key.Parent().Name()] = struct{}{}
|
||||
}
|
||||
|
||||
ids := make(peer.IDSlice, 0, len(idset))
|
||||
if len(idset) == 0 {
|
||||
return peer.IDSlice{}
|
||||
}
|
||||
|
||||
ids := make(peer.IDSlice, len(idset))
|
||||
i := 0
|
||||
for id := range idset {
|
||||
ids = append(ids, id)
|
||||
pid, _ := peer.IDB58Decode(id)
|
||||
ids[i] = pid
|
||||
i++
|
||||
}
|
||||
return ids
|
||||
}
|
||||
@ -227,70 +313,128 @@ func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mult
|
||||
|
||||
// ClearAddrs will delete all known addresses for a peer ID.
|
||||
func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
|
||||
prefix := ds.NewKey(p.Pretty())
|
||||
for i := 0; i < dsWriteRetries; i++ {
|
||||
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
||||
results, err := mgr.ds.Query(q)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
batch, err := mgr.ds.Batch()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
var (
|
||||
err error
|
||||
prefix = ds.NewKey(p.Pretty())
|
||||
deleteFn func() error
|
||||
)
|
||||
|
||||
for result := range results.Next() {
|
||||
key := ds.NewKey(result.Key)
|
||||
err := batch.Delete(key)
|
||||
if err != nil {
|
||||
// From inspectin badger, errors here signify a problem with
|
||||
// the transaction as a whole, so we can log and abort.
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
mgr.cache.Remove(key)
|
||||
if e, ok := mgr.cache.Peek(p.Pretty()); ok {
|
||||
mgr.cache.Remove(p.Pretty())
|
||||
keys, _, _ := keysAndAddrs(p, e.([]ma.Multiaddr))
|
||||
deleteFn = func() error {
|
||||
return mgr.dbDelete(keys)
|
||||
}
|
||||
if err = batch.Commit(); err != nil {
|
||||
log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err)
|
||||
continue
|
||||
} else {
|
||||
deleteFn = func() error {
|
||||
_, err := mgr.dbDeleteIter(prefix)
|
||||
return err
|
||||
}
|
||||
mgr.ttlManager.clear(ds.NewKey(p.Pretty()))
|
||||
return
|
||||
}
|
||||
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries)
|
||||
|
||||
// Attempt transactional KV deletion.
|
||||
for i := 0; i < mgr.writeRetries; i++ {
|
||||
if err = deleteFn(); err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), mgr.writeRetries)
|
||||
}
|
||||
|
||||
// Perform housekeeping.
|
||||
mgr.ttlManager.clear(prefix)
|
||||
}
|
||||
|
||||
type ttlentry struct {
|
||||
// dbDelete transactionally deletes the provided keys.
|
||||
func (mgr *dsAddrBook) dbDelete(keys []ds.Key) error {
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
for _, key := range keys {
|
||||
if err := txn.Delete(key); err != nil {
|
||||
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// dbDeleteIter removes all entries whose keys are prefixed with the argument.
|
||||
// it returns a slice of the removed keys in case it's needed
|
||||
func (mgr *dsAddrBook) dbDeleteIter(prefix ds.Key) ([]ds.Key, error) {
|
||||
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
|
||||
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
results, err := txn.Query(q)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch all keys prefixed with: %s, cause: %v", prefix.String(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var keys []ds.Key
|
||||
for result := range results.Next() {
|
||||
key := ds.RawKey(result.Key)
|
||||
keys = append(keys, key)
|
||||
|
||||
if err = txn.Delete(key); err != nil {
|
||||
log.Errorf("failed to delete key: %s, cause: %v", key.String(), err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := results.Close(); err != nil {
|
||||
log.Errorf("failed to close cursor, cause: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = txn.Commit(); err != nil {
|
||||
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
type ttlEntry struct {
|
||||
TTL time.Duration
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
type ttlmanager struct {
|
||||
type ttlManager struct {
|
||||
sync.RWMutex
|
||||
entries map[ds.Key]*ttlentry
|
||||
entries map[ds.Key]*ttlEntry
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ticker *time.Ticker
|
||||
ds ds.Batching
|
||||
cache *lru.ARCCache
|
||||
ds ds.TxnDatastore
|
||||
cache cache
|
||||
}
|
||||
|
||||
func newTTLManager(parent context.Context, d ds.Datastore, c *lru.ARCCache, tick time.Duration) *ttlmanager {
|
||||
func newTTLManager(parent context.Context, d ds.Datastore, c *cache, tick time.Duration) *ttlManager {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
batching, ok := d.(ds.Batching)
|
||||
txnDs, ok := d.(ds.TxnDatastore)
|
||||
if !ok {
|
||||
panic("must construct ttlmanager with batching datastore")
|
||||
panic("must construct ttlManager with transactional datastore")
|
||||
}
|
||||
mgr := &ttlmanager{
|
||||
entries: make(map[ds.Key]*ttlentry),
|
||||
mgr := &ttlManager{
|
||||
entries: make(map[ds.Key]*ttlEntry),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
ticker: time.NewTicker(tick),
|
||||
ds: batching,
|
||||
cache: c,
|
||||
ds: txnDs,
|
||||
cache: *c,
|
||||
}
|
||||
|
||||
go func() {
|
||||
@ -309,57 +453,72 @@ func newTTLManager(parent context.Context, d ds.Datastore, c *lru.ARCCache, tick
|
||||
}
|
||||
|
||||
// To be called by TTL manager's coroutine only.
|
||||
func (mgr *ttlmanager) tick() {
|
||||
func (mgr *ttlManager) tick() {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
batch, err := mgr.ds.Batch()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
var toDel []ds.Key
|
||||
for key, entry := range mgr.entries {
|
||||
if entry.ExpiresAt.After(now) {
|
||||
continue
|
||||
}
|
||||
toDel = append(toDel, key)
|
||||
}
|
||||
|
||||
if len(toDel) == 0 {
|
||||
return
|
||||
}
|
||||
for key, entry := range mgr.entries {
|
||||
if entry.ExpiresAt.Before(now) {
|
||||
if err := batch.Delete(key); err != nil {
|
||||
log.Error(err)
|
||||
} else {
|
||||
mgr.cache.Remove(key)
|
||||
}
|
||||
delete(mgr.entries, key)
|
||||
|
||||
txn := mgr.ds.NewTransaction(false)
|
||||
defer txn.Discard()
|
||||
|
||||
for _, key := range toDel {
|
||||
if err := txn.Delete(key); err != nil {
|
||||
log.Error("failed to delete TTL key: %v, cause: %v", key.String(), err)
|
||||
break
|
||||
}
|
||||
mgr.cache.Remove(key.Parent().Name())
|
||||
delete(mgr.entries, key)
|
||||
}
|
||||
err = batch.Commit()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
||||
if err := txn.Commit(); err != nil {
|
||||
log.Error("failed to commit TTL deletion, cause: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlmanager) setTTLs(keys []ds.Key, ttl time.Duration, add bool) {
|
||||
func (mgr *ttlManager) deleteTTLs(keys []ds.Key) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
for _, key := range keys {
|
||||
delete(mgr.entries, key)
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlManager) insertOrExtendTTLs(keys []ds.Key, ttl time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
expiration := time.Now().Add(ttl)
|
||||
for _, key := range keys {
|
||||
update := true
|
||||
if add {
|
||||
if entry, ok := mgr.entries[key]; ok {
|
||||
if entry.ExpiresAt.After(expiration) {
|
||||
update = false
|
||||
}
|
||||
}
|
||||
}
|
||||
if update {
|
||||
if ttl <= 0 {
|
||||
delete(mgr.entries, key)
|
||||
} else {
|
||||
mgr.entries[key] = &ttlentry{TTL: ttl, ExpiresAt: expiration}
|
||||
}
|
||||
if entry, ok := mgr.entries[key]; !ok || (ok && entry.ExpiresAt.Before(expiration)) {
|
||||
mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlmanager) updateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
|
||||
func (mgr *ttlManager) setTTLs(keys []ds.Key, ttl time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
expiration := time.Now().Add(ttl)
|
||||
for _, key := range keys {
|
||||
mgr.entries[key] = &ttlEntry{TTL: ttl, ExpiresAt: expiration}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlManager) adjustTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
@ -374,7 +533,7 @@ func (mgr *ttlmanager) updateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlmanager) clear(prefix ds.Key) {
|
||||
func (mgr *ttlManager) clear(prefix ds.Key) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
|
33
pstoreds/cache.go
Normal file
33
pstoreds/cache.go
Normal file
@ -0,0 +1,33 @@
|
||||
package pstoreds
|
||||
|
||||
// cache abstracts all methods we access from ARCCache, to enable alternate
|
||||
// implementations such as a no-op one.
|
||||
type cache interface {
|
||||
Get(key interface{}) (value interface{}, ok bool)
|
||||
Add(key, value interface{})
|
||||
Remove(key interface{})
|
||||
Contains(key interface{}) bool
|
||||
Peek(key interface{}) (value interface{}, ok bool)
|
||||
}
|
||||
|
||||
// noopCache is a dummy implementation that's used when the cache is disabled.
|
||||
type noopCache struct {
|
||||
}
|
||||
|
||||
func (*noopCache) Get(key interface{}) (value interface{}, ok bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (*noopCache) Add(key, value interface{}) {
|
||||
}
|
||||
|
||||
func (*noopCache) Remove(key interface{}) {
|
||||
}
|
||||
|
||||
func (*noopCache) Contains(key interface{}) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (*noopCache) Peek(key interface{}) (value interface{}, ok bool) {
|
||||
return nil, false
|
||||
}
|
@ -2,19 +2,159 @@ package pstoreds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
bd "github.com/dgraph-io/badger"
|
||||
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-ds-badger"
|
||||
badger "github.com/ipfs/go-ds-badger"
|
||||
|
||||
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
pt "github.com/libp2p/go-libp2p-peerstore/test"
|
||||
)
|
||||
|
||||
func setupBadgerDatastore(t testing.TB) (ds.Batching, func()) {
|
||||
func BenchmarkBaselineBadgerDatastorePutEntry(b *testing.B) {
|
||||
bds, closer := badgerStore(b)
|
||||
defer closer()
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
txn := bds.NewTransaction(false)
|
||||
|
||||
key := ds.RawKey(fmt.Sprintf("/key/%d", i))
|
||||
txn.Put(key, []byte(fmt.Sprintf("/value/%d", i)))
|
||||
|
||||
txn.Commit()
|
||||
txn.Discard()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBaselineBadgerDatastoreGetEntry(b *testing.B) {
|
||||
bds, closer := badgerStore(b)
|
||||
defer closer()
|
||||
|
||||
txn := bds.NewTransaction(false)
|
||||
keys := make([]ds.Key, 1000)
|
||||
for i := 0; i < 1000; i++ {
|
||||
key := ds.RawKey(fmt.Sprintf("/key/%d", i))
|
||||
txn.Put(key, []byte(fmt.Sprintf("/value/%d", i)))
|
||||
keys[i] = key
|
||||
}
|
||||
if err := txn.Commit(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
txn := bds.NewTransaction(true)
|
||||
if _, err := txn.Get(keys[i%1000]); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
txn.Discard()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBaselineBadgerDirectPutEntry(b *testing.B) {
|
||||
opts := bd.DefaultOptions
|
||||
|
||||
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
opts.Dir = dataPath
|
||||
opts.ValueDir = dataPath
|
||||
opts.SyncWrites = false
|
||||
|
||||
db, err := bd.Open(opts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
txn := db.NewTransaction(true)
|
||||
txn.Set([]byte(fmt.Sprintf("/key/%d", i)), []byte(fmt.Sprintf("/value/%d", i)))
|
||||
txn.Commit(nil)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBaselineBadgerDirectGetEntry(b *testing.B) {
|
||||
opts := bd.DefaultOptions
|
||||
|
||||
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
opts.Dir = dataPath
|
||||
opts.ValueDir = dataPath
|
||||
|
||||
db, err := bd.Open(opts)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
txn := db.NewTransaction(true)
|
||||
for i := 0; i < 1000; i++ {
|
||||
txn.Set([]byte(fmt.Sprintf("/key/%d", i)), []byte(fmt.Sprintf("/value/%d", i)))
|
||||
}
|
||||
txn.Commit(nil)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
txn := db.NewTransaction(false)
|
||||
txn.Get([]byte(fmt.Sprintf("/key/%d", i%1000)))
|
||||
txn.Discard()
|
||||
}
|
||||
}
|
||||
|
||||
func TestBadgerDsPeerstore(t *testing.T) {
|
||||
pt.TestPeerstore(t, peerstoreFactory(t, DefaultOpts()))
|
||||
}
|
||||
|
||||
func TestBadgerDsAddrBook(t *testing.T) {
|
||||
t.Run("Cacheful", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
opts := DefaultOpts()
|
||||
opts.TTLInterval = 100 * time.Microsecond
|
||||
opts.CacheSize = 1024
|
||||
|
||||
pt.TestAddrBook(t, addressBookFactory(t, opts))
|
||||
})
|
||||
|
||||
t.Run("Cacheless", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
opts := DefaultOpts()
|
||||
opts.TTLInterval = 100 * time.Microsecond
|
||||
opts.CacheSize = 0
|
||||
|
||||
pt.TestAddrBook(t, addressBookFactory(t, opts))
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBadgerDsPeerstore(b *testing.B) {
|
||||
caching := DefaultOpts()
|
||||
caching.CacheSize = 1024
|
||||
|
||||
cacheless := DefaultOpts()
|
||||
cacheless.CacheSize = 0
|
||||
|
||||
pt.BenchmarkPeerstore(b, peerstoreFactory(b, caching), "Caching")
|
||||
pt.BenchmarkPeerstore(b, peerstoreFactory(b, cacheless), "Cacheless")
|
||||
}
|
||||
|
||||
func badgerStore(t testing.TB) (ds.TxnDatastore, func()) {
|
||||
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -30,11 +170,11 @@ func setupBadgerDatastore(t testing.TB) (ds.Batching, func()) {
|
||||
return ds, closer
|
||||
}
|
||||
|
||||
func newPeerstoreFactory(tb testing.TB) pt.PeerstoreFactory {
|
||||
func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory {
|
||||
return func() (pstore.Peerstore, func()) {
|
||||
ds, closeFunc := setupBadgerDatastore(tb)
|
||||
ds, closeFunc := badgerStore(tb)
|
||||
|
||||
ps, err := NewPeerstore(context.Background(), ds)
|
||||
ps, err := NewPeerstore(context.Background(), ds, opts)
|
||||
if err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
@ -43,17 +183,13 @@ func newPeerstoreFactory(tb testing.TB) pt.PeerstoreFactory {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBadgerDsPeerstore(t *testing.T) {
|
||||
pt.TestPeerstore(t, newPeerstoreFactory(t))
|
||||
}
|
||||
func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory {
|
||||
return func() (pstore.AddrBook, func()) {
|
||||
ds, closeDB := badgerStore(tb)
|
||||
|
||||
func TestBadgerDsAddrBook(t *testing.T) {
|
||||
pt.TestAddrBook(t, func() (pstore.AddrBook, func()) {
|
||||
ds, closeDB := setupBadgerDatastore(t)
|
||||
|
||||
mgr, err := NewAddrBook(context.Background(), ds, 100*time.Microsecond)
|
||||
mgr, err := NewAddrBook(context.Background(), ds, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
tb.Fatal(err)
|
||||
}
|
||||
|
||||
closeFunc := func() {
|
||||
@ -61,9 +197,5 @@ func TestBadgerDsAddrBook(t *testing.T) {
|
||||
closeDB()
|
||||
}
|
||||
return mgr, closeFunc
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBadgerDsPeerstore(b *testing.B) {
|
||||
pt.BenchmarkPeerstore(b, newPeerstoreFactory(b))
|
||||
}
|
||||
}
|
||||
|
@ -10,9 +10,34 @@ import (
|
||||
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
|
||||
)
|
||||
|
||||
// Configuration object for the peerstore.
|
||||
type Options struct {
|
||||
// The size of the in-memory cache. A value of 0 or lower disables the cache.
|
||||
CacheSize uint
|
||||
|
||||
// Sweep interval to expire entries, only used when TTL is *not* natively managed
|
||||
// by the underlying datastore.
|
||||
TTLInterval time.Duration
|
||||
|
||||
// Number of times to retry transactional writes.
|
||||
WriteRetries uint
|
||||
}
|
||||
|
||||
// DefaultOpts returns the default options for a persistent peerstore:
|
||||
// * Cache size: 1024
|
||||
// * TTL sweep interval: 1 second
|
||||
// * WriteRetries: 5
|
||||
func DefaultOpts() Options {
|
||||
return Options{
|
||||
CacheSize: 1024,
|
||||
TTLInterval: time.Second,
|
||||
WriteRetries: 5,
|
||||
}
|
||||
}
|
||||
|
||||
// NewPeerstore creates a peerstore backed by the provided persistent datastore.
|
||||
func NewPeerstore(ctx context.Context, ds ds.Batching) (pstore.Peerstore, error) {
|
||||
addrBook, err := NewAddrBook(ctx, ds, time.Second)
|
||||
func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pstore.Peerstore, error) {
|
||||
addrBook, err := NewAddrBook(ctx, store, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -162,10 +162,8 @@ func (mab *memoryAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL t
|
||||
}
|
||||
|
||||
exp := time.Now().Add(newTTL)
|
||||
// TODO: RK - Shorthand.
|
||||
for i := range addrs {
|
||||
aexp := &addrs[i]
|
||||
if oldTTL == aexp.TTL {
|
||||
if aexp := &addrs[i]; oldTTL == aexp.TTL {
|
||||
aexp.TTL = newTTL
|
||||
aexp.Expires = exp
|
||||
}
|
||||
|
@ -28,5 +28,5 @@ func TestInMemoryKeyBook(t *testing.T) {
|
||||
func BenchmarkInMemoryPeerstore(b *testing.B) {
|
||||
pt.BenchmarkPeerstore(b, func() (pstore.Peerstore, func()) {
|
||||
return NewPeerstore(), nil
|
||||
})
|
||||
}, "InMem")
|
||||
}
|
||||
|
@ -18,6 +18,8 @@ var addressBookSuite = map[string]func(book pstore.AddrBook) func(*testing.T){
|
||||
"UpdateTTLs": testUpdateTTLs,
|
||||
"NilAddrsDontBreak": testNilAddrsDontBreak,
|
||||
"AddressesExpire": testAddressesExpire,
|
||||
"ClearWithIter": testClearWithIterator,
|
||||
"PeersWithAddresses": testPeersWithAddrs,
|
||||
}
|
||||
|
||||
type AddrBookFactory func() (pstore.AddrBook, func())
|
||||
@ -152,25 +154,25 @@ func testUpdateTTLs(m pstore.AddrBook) func(t *testing.T) {
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
// Will only affect addrs1[0].
|
||||
m.UpdateAddrs(ids[0], time.Hour, time.Second)
|
||||
m.UpdateAddrs(ids[0], time.Hour, 100*time.Microsecond)
|
||||
|
||||
// No immediate effect.
|
||||
testHas(t, addrs1, m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
// After a wait, addrs[0] is gone.
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
// Will only affect addrs2[0].
|
||||
m.UpdateAddrs(ids[1], time.Hour, time.Second)
|
||||
m.UpdateAddrs(ids[1], time.Hour, 100*time.Microsecond)
|
||||
|
||||
// No immediate effect.
|
||||
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
time.Sleep(1200 * time.Millisecond)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// First addrs is gone in both.
|
||||
testHas(t, addrs1[1:], m.Addrs(ids[0]))
|
||||
@ -207,33 +209,86 @@ func testAddressesExpire(m pstore.AddrBook) func(t *testing.T) {
|
||||
testHas(t, addrs1, m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
m.SetAddr(ids[0], addrs1[0], time.Millisecond)
|
||||
<-time.After(time.Millisecond * 5)
|
||||
m.SetAddr(ids[0], addrs1[0], 100*time.Microsecond)
|
||||
<-time.After(100 * time.Millisecond)
|
||||
testHas(t, addrs1[1:3], m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
m.SetAddr(ids[0], addrs1[2], time.Millisecond)
|
||||
<-time.After(time.Millisecond * 5)
|
||||
m.SetAddr(ids[0], addrs1[2], 100*time.Microsecond)
|
||||
<-time.After(100 * time.Millisecond)
|
||||
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||
testHas(t, addrs2, m.Addrs(ids[1]))
|
||||
|
||||
m.SetAddr(ids[1], addrs2[0], time.Millisecond)
|
||||
<-time.After(time.Millisecond * 5)
|
||||
m.SetAddr(ids[1], addrs2[0], 100*time.Microsecond)
|
||||
<-time.After(100 * time.Millisecond)
|
||||
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||
testHas(t, addrs2[1:], m.Addrs(ids[1]))
|
||||
|
||||
m.SetAddr(ids[1], addrs2[1], time.Millisecond)
|
||||
<-time.After(time.Millisecond * 5)
|
||||
m.SetAddr(ids[1], addrs2[1], 100*time.Microsecond)
|
||||
<-time.After(100 * time.Millisecond)
|
||||
testHas(t, addrs1[1:2], m.Addrs(ids[0]))
|
||||
testHas(t, nil, m.Addrs(ids[1]))
|
||||
|
||||
m.SetAddr(ids[0], addrs1[1], time.Millisecond)
|
||||
<-time.After(time.Millisecond * 5)
|
||||
m.SetAddr(ids[0], addrs1[1], 100*time.Microsecond)
|
||||
<-time.After(100 * time.Millisecond)
|
||||
testHas(t, nil, m.Addrs(ids[0]))
|
||||
testHas(t, nil, m.Addrs(ids[1]))
|
||||
}
|
||||
}
|
||||
|
||||
func testClearWithIterator(m pstore.AddrBook) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
ids := generatePeerIds(2)
|
||||
addrs := generateAddrs(100)
|
||||
|
||||
// Add the peers with 50 addresses each.
|
||||
m.AddAddrs(ids[0], addrs[:50], pstore.PermanentAddrTTL)
|
||||
m.AddAddrs(ids[1], addrs[50:], pstore.PermanentAddrTTL)
|
||||
|
||||
if all := append(m.Addrs(ids[0]), m.Addrs(ids[1])...); len(all) != 100 {
|
||||
t.Fatal("expected pstore to contain both peers with all their maddrs")
|
||||
}
|
||||
|
||||
// Since we don't fetch these peers, they won't be present in cache.
|
||||
|
||||
m.ClearAddrs(ids[0])
|
||||
if all := append(m.Addrs(ids[0]), m.Addrs(ids[1])...); len(all) != 50 {
|
||||
t.Fatal("expected pstore to contain only addrs of peer 2")
|
||||
}
|
||||
|
||||
m.ClearAddrs(ids[1])
|
||||
if all := append(m.Addrs(ids[0]), m.Addrs(ids[1])...); len(all) != 0 {
|
||||
t.Fatal("expected pstore to contain no addresses")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testPeersWithAddrs(m pstore.AddrBook) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
// cannot run in parallel as the store is modified.
|
||||
// go runs sequentially in the specified order
|
||||
// see https://blog.golang.org/subtests
|
||||
|
||||
t.Run("empty addrbook", func(t *testing.T) {
|
||||
if peers := m.PeersWithAddrs(); len(peers) != 0 {
|
||||
t.Fatal("expected to find no peers")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("non-empty addrbook", func(t *testing.T) {
|
||||
ids := generatePeerIds(2)
|
||||
addrs := generateAddrs(10)
|
||||
|
||||
m.AddAddrs(ids[0], addrs[:5], pstore.PermanentAddrTTL)
|
||||
m.AddAddrs(ids[1], addrs[5:], pstore.PermanentAddrTTL)
|
||||
|
||||
if peers := m.PeersWithAddrs(); len(peers) != 2 {
|
||||
t.Fatal("expected to find 2 peers")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
|
||||
t.Helper()
|
||||
if len(exp) != len(act) {
|
||||
|
115
test/benchmarks_suite.go
Normal file
115
test/benchmarks_suite.go
Normal file
@ -0,0 +1,115 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
)
|
||||
|
||||
var peerstoreBenchmarks = map[string]func(pstore.Peerstore, chan *peerpair) func(*testing.B){
|
||||
"AddAddrs": benchmarkAddAddrs,
|
||||
"SetAddrs": benchmarkSetAddrs,
|
||||
"GetAddrs": benchmarkGetAddrs,
|
||||
// The in-between get allows us to benchmark the read-through cache.
|
||||
"AddGetAndClearAddrs": benchmarkAddGetAndClearAddrs,
|
||||
// Calls PeersWithAddr on a peerstore with 1000 peers.
|
||||
"Get1000PeersWithAddrs": benchmarkGet1000PeersWithAddrs,
|
||||
}
|
||||
|
||||
func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory, variant string) {
|
||||
// Parameterises benchmarks to tackle peers with 1, 10, 100 multiaddrs.
|
||||
params := []struct {
|
||||
n int
|
||||
ch chan *peerpair
|
||||
}{
|
||||
{1, make(chan *peerpair, 100)},
|
||||
{10, make(chan *peerpair, 100)},
|
||||
{100, make(chan *peerpair, 100)},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Start all test peer producing goroutines, where each produces peers with as many
|
||||
// multiaddrs as the n field in the param struct.
|
||||
for _, p := range params {
|
||||
go addressProducer(ctx, b, p.ch, p.n)
|
||||
}
|
||||
|
||||
for name, bench := range peerstoreBenchmarks {
|
||||
for _, p := range params {
|
||||
// Create a new peerstore.
|
||||
ps, closeFunc := factory()
|
||||
|
||||
// Run the test.
|
||||
b.Run(fmt.Sprintf("%s-%dAddrs-%s", name, p.n, variant), bench(ps, p.ch))
|
||||
|
||||
// Cleanup.
|
||||
if closeFunc != nil {
|
||||
closeFunc()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkAddAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) {
|
||||
return func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pp := <-addrs
|
||||
ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkSetAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) {
|
||||
return func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pp := <-addrs
|
||||
ps.SetAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkGetAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) {
|
||||
return func(b *testing.B) {
|
||||
pp := <-addrs
|
||||
ps.SetAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = ps.Addrs(pp.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkAddGetAndClearAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) {
|
||||
return func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pp := <-addrs
|
||||
ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
|
||||
ps.Addrs(pp.ID)
|
||||
ps.ClearAddrs(pp.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkGet1000PeersWithAddrs(ps pstore.Peerstore, addrs chan *peerpair) func(*testing.B) {
|
||||
return func(b *testing.B) {
|
||||
var peers = make([]*peerpair, 1000)
|
||||
for i, _ := range peers {
|
||||
pp := <-addrs
|
||||
ps.AddAddrs(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
|
||||
peers[i] = pp
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = ps.PeersWithAddrs()
|
||||
}
|
||||
}
|
||||
}
|
@ -15,7 +15,7 @@ import (
|
||||
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
)
|
||||
|
||||
var peerstoreSuite = map[string]func(book pstore.Peerstore) func(*testing.T){
|
||||
var peerstoreSuite = map[string]func(pstore.Peerstore) func(*testing.T){
|
||||
"AddrStream": testAddrStream,
|
||||
"GetStreamBeforePeerAdded": testGetStreamBeforePeerAdded,
|
||||
"AddStreamDuplicates": testAddrStreamDuplicates,
|
||||
@ -40,16 +40,6 @@ func TestPeerstore(t *testing.T, factory PeerstoreFactory) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPeerstore(b *testing.B, factory PeerstoreFactory) {
|
||||
ps, closeFunc := factory()
|
||||
|
||||
b.Run("Peerstore", benchmarkPeerstore(ps))
|
||||
|
||||
if closeFunc != nil {
|
||||
closeFunc()
|
||||
}
|
||||
}
|
||||
|
||||
func testAddrStream(ps pstore.Peerstore) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
addrs, pid := getAddrs(t, 100), peer.ID("testpeer")
|
||||
@ -289,22 +279,6 @@ func testBasicPeerstore(ps pstore.Peerstore) func(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkPeerstore(ps pstore.Peerstore) func(*testing.B) {
|
||||
return func(b *testing.B) {
|
||||
addrs := make(chan *peerpair, 100)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go addressProducer(ctx, b, addrs)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
pp := <-addrs
|
||||
ps.AddAddr(pp.ID, pp.Addr, pstore.PermanentAddrTTL)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func getAddrs(t *testing.T, n int) []ma.Multiaddr {
|
||||
var addrs []ma.Multiaddr
|
||||
for i := 0; i < n; i++ {
|
||||
|
@ -10,39 +10,6 @@ import (
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
type peerpair struct {
|
||||
ID peer.ID
|
||||
Addr ma.Multiaddr
|
||||
}
|
||||
|
||||
func randomPeer(b *testing.B) *peerpair {
|
||||
var pid peer.ID
|
||||
var err error
|
||||
var addr ma.Multiaddr
|
||||
|
||||
if pid, err = pt.RandPeerID(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
if addr, err = ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/6666/ipfs/%s", pid.Pretty())); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
return &peerpair{pid, addr}
|
||||
}
|
||||
|
||||
func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair) {
|
||||
defer close(addrs)
|
||||
for {
|
||||
p := randomPeer(b)
|
||||
select {
|
||||
case addrs <- p:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func multiaddr(m string) ma.Multiaddr {
|
||||
maddr, err := ma.NewMultiaddr(m)
|
||||
if err != nil {
|
||||
@ -50,3 +17,42 @@ func multiaddr(m string) ma.Multiaddr {
|
||||
}
|
||||
return maddr
|
||||
}
|
||||
|
||||
type peerpair struct {
|
||||
ID peer.ID
|
||||
Addr []ma.Multiaddr
|
||||
}
|
||||
|
||||
func randomPeer(b *testing.B, addrCount int) *peerpair {
|
||||
var (
|
||||
pid peer.ID
|
||||
err error
|
||||
addrs = make([]ma.Multiaddr, addrCount)
|
||||
aFmt = "/ip4/127.0.0.1/tcp/%d/ipfs/%s"
|
||||
)
|
||||
|
||||
b.Helper()
|
||||
if pid, err = pt.RandPeerID(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
for i := 0; i < addrCount; i++ {
|
||||
if addrs[i], err = ma.NewMultiaddr(fmt.Sprintf(aFmt, i, pid.Pretty())); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
return &peerpair{pid, addrs}
|
||||
}
|
||||
|
||||
func addressProducer(ctx context.Context, b *testing.B, addrs chan *peerpair, addrsPerPeer int) {
|
||||
b.Helper()
|
||||
defer close(addrs)
|
||||
for {
|
||||
p := randomPeer(b, addrsPerPeer)
|
||||
select {
|
||||
case addrs <- p:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user