go-libp2p-peerstore/pstoreds/addr_book.go
2018-09-07 13:04:14 +01:00

387 lines
9.0 KiB
Go

package pstoreds
import (
"context"
"sync"
"time"
"github.com/hashicorp/golang-lru"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
)
var log = logging.Logger("peerstore/ds")
// Number of times to retry transactional writes
var dsWriteRetries = 5
var _ pstore.AddrBook = (*dsAddrBook)(nil)
// dsAddrBook is an address book backed by a Datastore with both an
// in-memory TTL manager and an in-memory address stream manager.
type dsAddrBook struct {
cache *lru.ARCCache
ds ds.Batching
ttlManager *ttlmanager
subsManager *pstoremem.AddrSubManager
}
// NewAddrBook initializes a new address book given a
// Datastore instance, a context for managing the TTL manager,
// and the interval at which the TTL manager should sweep the Datastore.
func NewAddrBook(ctx context.Context, ds ds.Batching, ttlInterval time.Duration) (*dsAddrBook, error) {
cache, err := lru.NewARC(1024)
if err != nil {
return nil, err
}
mgr := &dsAddrBook{
cache: cache,
ds: ds,
ttlManager: newTTLManager(ctx, ds, cache, ttlInterval),
subsManager: pstoremem.NewAddrSubManager(),
}
return mgr, nil
}
// Stop will signal the TTL manager to stop and block until it returns.
func (mgr *dsAddrBook) Stop() {
mgr.ttlManager.cancel()
}
func peerAddressKey(p *peer.ID, addr *ma.Multiaddr) (ds.Key, error) {
hash, err := mh.Sum((*addr).Bytes(), mh.MURMUR3, -1)
if err != nil {
return ds.Key{}, nil
}
return ds.NewKey(peer.IDB58Encode(*p)).ChildString(hash.B58String()), nil
}
func peerIDFromKey(key ds.Key) (peer.ID, error) {
idstring := key.Parent().Name()
return peer.IDB58Decode(idstring)
}
// AddAddr will add a new address if it's not already in the AddrBook.
func (mgr *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs will add many new addresses if they're not already in the AddrBook.
func (mgr *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
return
}
mgr.setAddrs(p, addrs, ttl, true)
}
// SetAddr will add or update the TTL of an address in the AddrBook.
func (mgr *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
func (mgr *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mgr.setAddrs(p, addrs, ttl, false)
}
func (mgr *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) {
for i := 0; i < dsWriteRetries; i++ {
// keys to add to the TTL manager
var keys []ds.Key
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
for _, addr := range addrs {
if addr == nil {
continue
}
key, err := peerAddressKey(&p, &addr)
if err != nil {
log.Error(err)
continue
}
keys = append(keys, key)
if ttl <= 0 {
if err := batch.Delete(key); err != nil {
log.Error(err)
} else {
mgr.cache.Remove(key)
}
continue
}
has := mgr.cache.Contains(key)
if !has {
has, err = mgr.ds.Has(key)
}
if err != nil || !has {
mgr.subsManager.BroadcastAddr(p, addr)
}
// Allows us to support AddAddr and SetAddr in one function
if !has {
if err := batch.Put(key, addr.Bytes()); err != nil {
log.Error(err)
} else {
mgr.cache.Add(key, addr.Bytes())
}
}
}
if err := batch.Commit(); err != nil {
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
continue
}
mgr.ttlManager.setTTLs(keys, ttl, add)
return
}
log.Errorf("failed to avoid write conflict for peer %s after %d retries\n", p.Pretty(), dsWriteRetries)
}
// UpdateAddrs will update any addresses for a given peer and TTL combination to
// have a new TTL.
func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
prefix := ds.NewKey(p.Pretty())
mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL)
}
// Addrs Returns all of the non-expired addresses for a given peer.
func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
prefix := ds.NewKey(p.Pretty())
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
results, err := mgr.ds.Query(q)
if err != nil {
log.Error(err)
return nil
}
var addrs []ma.Multiaddr
for result := range results.Next() {
key := ds.RawKey(result.Key)
var addri interface{}
addri, ok := mgr.cache.Get(key)
if !ok {
addri, err = mgr.ds.Get(key)
if err != nil {
log.Error(err)
continue
}
}
addrbytes := addri.([]byte)
addr, err := ma.NewMultiaddrBytes(addrbytes)
if err != nil {
log.Error(err)
continue
}
addrs = append(addrs, addr)
}
return addrs
}
// Peers returns all of the peer IDs for which the AddrBook has addresses.
func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice {
q := query.Query{KeysOnly: true}
results, err := mgr.ds.Query(q)
if err != nil {
log.Error(err)
return peer.IDSlice{}
}
idset := make(map[peer.ID]struct{})
for result := range results.Next() {
key := ds.RawKey(result.Key)
id, err := peerIDFromKey(key)
if err != nil {
continue
}
idset[id] = struct{}{}
}
ids := make(peer.IDSlice, 0, len(idset))
for id := range idset {
ids = append(ids, id)
}
return ids
}
// AddrStream returns a channel on which all new addresses discovered for a
// given peer ID will be published.
func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
initial := mgr.Addrs(p)
return mgr.subsManager.AddrStream(ctx, p, initial)
}
// ClearAddrs will delete all known addresses for a peer ID.
func (mgr *dsAddrBook) ClearAddrs(p peer.ID) {
prefix := ds.NewKey(p.Pretty())
for i := 0; i < dsWriteRetries; i++ {
q := query.Query{Prefix: prefix.String(), KeysOnly: true}
results, err := mgr.ds.Query(q)
if err != nil {
log.Error(err)
return
}
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
for result := range results.Next() {
key := ds.NewKey(result.Key)
err := batch.Delete(key)
if err != nil {
// From inspectin badger, errors here signify a problem with
// the transaction as a whole, so we can log and abort.
log.Error(err)
return
}
mgr.cache.Remove(key)
}
if err = batch.Commit(); err != nil {
log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err)
continue
}
mgr.ttlManager.clear(ds.NewKey(p.Pretty()))
return
}
log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries)
}
type ttlentry struct {
TTL time.Duration
ExpiresAt time.Time
}
type ttlmanager struct {
sync.RWMutex
entries map[ds.Key]*ttlentry
ctx context.Context
cancel context.CancelFunc
ticker *time.Ticker
ds ds.Batching
cache *lru.ARCCache
}
func newTTLManager(parent context.Context, d ds.Datastore, c *lru.ARCCache, tick time.Duration) *ttlmanager {
ctx, cancel := context.WithCancel(parent)
batching, ok := d.(ds.Batching)
if !ok {
panic("must construct ttlmanager with batching datastore")
}
mgr := &ttlmanager{
entries: make(map[ds.Key]*ttlentry),
ctx: ctx,
cancel: cancel,
ticker: time.NewTicker(tick),
ds: batching,
cache: c,
}
go func() {
for {
select {
case <-mgr.ctx.Done():
mgr.ticker.Stop()
return
case <-mgr.ticker.C:
mgr.tick()
}
}
}()
return mgr
}
// To be called by TTL manager's coroutine only.
func (mgr *ttlmanager) tick() {
mgr.Lock()
defer mgr.Unlock()
now := time.Now()
batch, err := mgr.ds.Batch()
if err != nil {
log.Error(err)
return
}
for key, entry := range mgr.entries {
if entry.ExpiresAt.Before(now) {
if err := batch.Delete(key); err != nil {
log.Error(err)
} else {
mgr.cache.Remove(key)
}
delete(mgr.entries, key)
}
}
err = batch.Commit()
if err != nil {
log.Error(err)
}
}
func (mgr *ttlmanager) setTTLs(keys []ds.Key, ttl time.Duration, add bool) {
mgr.Lock()
defer mgr.Unlock()
expiration := time.Now().Add(ttl)
for _, key := range keys {
update := true
if add {
if entry, ok := mgr.entries[key]; ok {
if entry.ExpiresAt.After(expiration) {
update = false
}
}
}
if update {
if ttl <= 0 {
delete(mgr.entries, key)
} else {
mgr.entries[key] = &ttlentry{TTL: ttl, ExpiresAt: expiration}
}
}
}
}
func (mgr *ttlmanager) updateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
mgr.Lock()
defer mgr.Unlock()
now := time.Now()
var keys []ds.Key
for key, entry := range mgr.entries {
if key.IsDescendantOf(prefix) && entry.TTL == oldTTL {
keys = append(keys, key)
entry.TTL = newTTL
entry.ExpiresAt = now.Add(newTTL)
}
}
}
func (mgr *ttlmanager) clear(prefix ds.Key) {
mgr.Lock()
defer mgr.Unlock()
for key := range mgr.entries {
if key.IsDescendantOf(prefix) {
delete(mgr.entries, key)
}
}
}