mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2025-01-04 00:50:12 +08:00
4c43736fe9
Introduces custom types in protobuf to serde directly into multiaddrs and peer IDs. Simplify purge by ordering addrs by expiry. In general, getting this readier for merge.
540 lines
14 KiB
Go
540 lines
14 KiB
Go
package pstoreds
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
ds "github.com/ipfs/go-datastore"
|
|
query "github.com/ipfs/go-datastore/query"
|
|
logging "github.com/ipfs/go-log"
|
|
|
|
pool "github.com/libp2p/go-buffer-pool"
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
|
pb "github.com/libp2p/go-libp2p-peerstore/pb"
|
|
pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem"
|
|
|
|
lru "github.com/hashicorp/golang-lru"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
b32 "github.com/whyrusleeping/base32"
|
|
)
|
|
|
|
type ttlWriteMode int
|
|
|
|
const (
|
|
ttlOverride ttlWriteMode = iota
|
|
ttlExtend
|
|
)
|
|
|
|
var (
|
|
log = logging.Logger("peerstore/ds")
|
|
// Peer addresses are stored under the following db key pattern:
|
|
// /peers/addr/<b32 peer id no padding>/<hash of maddr>
|
|
addrBookBase = ds.NewKey("/peers/addrs")
|
|
)
|
|
|
|
// addrsRecord decorates the AddrBookRecord with locks and metadata.
|
|
type addrsRecord struct {
|
|
sync.RWMutex
|
|
*pb.AddrBookRecord
|
|
dirty bool
|
|
}
|
|
|
|
// FlushInTxn writes the record to the datastore by calling ds.Put, unless the record is
|
|
// marked for deletion, in which case the deletion is executed via ds.Delete.
|
|
func (r *addrsRecord) FlushInTxn(txn ds.Txn) (err error) {
|
|
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID)))
|
|
if len(r.Addrs) == 0 {
|
|
return txn.Delete(key)
|
|
}
|
|
data := pool.Get(r.Size())
|
|
defer pool.Put(data)
|
|
|
|
// i is the number of bytes that were effectively written.
|
|
i, err := r.MarshalTo(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := txn.Put(key, data[:i]); err != nil {
|
|
return err
|
|
}
|
|
// write succeeded; record is no longer dirty.
|
|
r.dirty = false
|
|
return nil
|
|
}
|
|
|
|
// Flush creates a ds.Txn, and calls FlushInTxn with it.
|
|
func (r *addrsRecord) Flush(ds ds.TxnDatastore) (err error) {
|
|
txn, err := ds.NewTransaction(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer txn.Discard()
|
|
|
|
if err = r.FlushInTxn(txn); err != nil {
|
|
return err
|
|
}
|
|
return txn.Commit()
|
|
}
|
|
|
|
// Refresh is called on records to perform housekeeping. The return value signals if the record was changed
|
|
// as a result of the refresh.
|
|
//
|
|
// Refresh does the following:
|
|
// * sorts the addresses by expiration (soonest expiring first).
|
|
// * removes the addresses that have expired.
|
|
//
|
|
// It short-circuits optimistically when we know there's nothing to do.
|
|
//
|
|
// Refresh is called from several points:
|
|
// * when accessing and loading an entry.
|
|
// * when performing periodic GC.
|
|
// * after an entry has been modified (e.g. addresses have been added or removed,
|
|
// TTLs updated, etc.)
|
|
//
|
|
// If the return value is true, the caller can perform a flush immediately, or can schedule an async
|
|
// flush, depending on the context.
|
|
func (r *addrsRecord) Refresh() (chgd bool) {
|
|
now := time.Now().Unix()
|
|
if !r.dirty && len(r.Addrs) > 0 && r.Addrs[0].Expiry > now {
|
|
// record is not dirty, and we have no expired entries to purge.
|
|
return false
|
|
}
|
|
|
|
if len(r.Addrs) == 0 {
|
|
// this is a ghost record; let's signal it has to be written.
|
|
// Flush() will take care of doing the deletion.
|
|
return true
|
|
}
|
|
|
|
if r.dirty && len(r.Addrs) > 1 {
|
|
// the record has been modified, so it may need resorting.
|
|
// we keep addresses sorted by expiration, where 0 is the soonest expiring.
|
|
sort.Slice(r.Addrs, func(i, j int) bool {
|
|
return r.Addrs[i].Expiry < r.Addrs[j].Expiry
|
|
})
|
|
}
|
|
|
|
// since addresses are sorted by expiration, we find the first survivor and split the
|
|
// slice on its index.
|
|
pivot := -1
|
|
for i, addr := range r.Addrs {
|
|
if addr.Expiry > now {
|
|
break
|
|
}
|
|
pivot = i
|
|
}
|
|
|
|
r.Addrs = r.Addrs[pivot+1:]
|
|
return r.dirty || pivot >= 0
|
|
}
|
|
|
|
// dsAddrBook is an address book backed by a Datastore with a GC-like procedure
|
|
// to purge expired entries. It uses an in-memory address stream manager.
|
|
type dsAddrBook struct {
|
|
ctx context.Context
|
|
gcInterval time.Duration
|
|
gcMaxPurgePerCycle int
|
|
|
|
cache cache
|
|
ds ds.TxnDatastore
|
|
subsManager *pstoremem.AddrSubManager
|
|
|
|
flushJobCh chan *addrsRecord
|
|
cancelFn func()
|
|
closedCh chan struct{}
|
|
}
|
|
|
|
var _ pstore.AddrBook = (*dsAddrBook)(nil)
|
|
|
|
// NewAddrBook initializes a new address book given a Datastore instance, a context for managing the TTL manager,
|
|
// and the interval at which the TTL manager should sweep the Datastore.
|
|
func NewAddrBook(ctx context.Context, store ds.TxnDatastore, opts Options) (ab *dsAddrBook, err error) {
|
|
var cache cache = new(noopCache)
|
|
if opts.CacheSize > 0 {
|
|
if cache, err = lru.NewARC(int(opts.CacheSize)); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
ctx, cancelFn := context.WithCancel(ctx)
|
|
mgr := &dsAddrBook{
|
|
ctx: ctx,
|
|
cancelFn: cancelFn,
|
|
gcInterval: opts.GCInterval,
|
|
cache: cache,
|
|
ds: store,
|
|
subsManager: pstoremem.NewAddrSubManager(),
|
|
flushJobCh: make(chan *addrsRecord, 32),
|
|
closedCh: make(chan struct{}),
|
|
}
|
|
|
|
// kick off periodic GC.
|
|
go mgr.background()
|
|
|
|
return mgr, nil
|
|
}
|
|
|
|
func (ab *dsAddrBook) Close() {
|
|
ab.cancelFn()
|
|
<-ab.closedCh
|
|
}
|
|
|
|
func (ab *dsAddrBook) asyncFlush(pr *addrsRecord) {
|
|
select {
|
|
case ab.flushJobCh <- pr:
|
|
default:
|
|
log.Warningf("flush queue is full; could not flush peer %v", pr.Id.ID.Pretty())
|
|
}
|
|
}
|
|
|
|
// loadRecord is a read-through fetch. It fetches a record from cache, falling back to the
|
|
// datastore upon a miss, and returning an newly initialized record if the peer doesn't exist.
|
|
//
|
|
// loadRecord calls Refresh() on the record before returning it. If the record changes
|
|
// as a result and `update=true`, an async flush is scheduled.
|
|
//
|
|
// If `cache=true`, the record is inserted in the cache when loaded from the datastore.
|
|
func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrsRecord, err error) {
|
|
if e, ok := ab.cache.Get(id); ok {
|
|
pr = e.(*addrsRecord)
|
|
if pr.Refresh() && update {
|
|
ab.asyncFlush(pr)
|
|
}
|
|
return pr, nil
|
|
}
|
|
|
|
txn, err := ab.ds.NewTransaction(true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer txn.Discard()
|
|
|
|
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
|
|
data, err := txn.Get(key)
|
|
|
|
if err != nil && err != ds.ErrNotFound {
|
|
return nil, err
|
|
}
|
|
|
|
if err == nil {
|
|
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
|
|
if err = pr.Unmarshal(data); err != nil {
|
|
return nil, err
|
|
}
|
|
if pr.Refresh() && update {
|
|
ab.asyncFlush(pr)
|
|
}
|
|
} else {
|
|
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{Id: &pb.ProtoPeerID{ID: id}}}
|
|
}
|
|
|
|
if cache {
|
|
ab.cache.Add(id, pr)
|
|
}
|
|
return pr, nil
|
|
}
|
|
|
|
// background runs the housekeeping process that takes care of:
|
|
//
|
|
// * purging expired addresses from the datastore at regular intervals.
|
|
// * persisting asynchronous flushes to the datastore.
|
|
func (ab *dsAddrBook) background() {
|
|
timer := time.NewTicker(ab.gcInterval)
|
|
for {
|
|
select {
|
|
case fj := <-ab.flushJobCh:
|
|
if cached, ok := ab.cache.Peek(fj.Id.ID); ok {
|
|
// Only continue flushing if the record we have in memory is the same as for which the flush
|
|
// job was requested. If it's not in memory, it has been evicted and we don't know if we hold
|
|
// the latest state or not. Similarly, if it's cached but the pointer is different, it means
|
|
// it was evicted and has been reloaded, so we're also uncertain if we hold the latest state.
|
|
if pr := cached.(*addrsRecord); pr == fj {
|
|
pr.RLock()
|
|
pr.Flush(ab.ds)
|
|
pr.RUnlock()
|
|
}
|
|
}
|
|
|
|
case <-timer.C:
|
|
ab.purgeCycle()
|
|
|
|
case <-ab.ctx.Done():
|
|
timer.Stop()
|
|
close(ab.closedCh)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
var purgeQuery = query.Query{Prefix: addrBookBase.String()}
|
|
|
|
// purgeCycle runs a GC cycle
|
|
func (ab *dsAddrBook) purgeCycle() {
|
|
var id peer.ID
|
|
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
|
|
txn, err := ab.ds.NewTransaction(false)
|
|
if err != nil {
|
|
log.Warningf("failed while purging entries: %v\n", err)
|
|
return
|
|
}
|
|
defer txn.Discard()
|
|
|
|
results, err := txn.Query(purgeQuery)
|
|
if err != nil {
|
|
log.Warningf("failed while purging entries: %v\n", err)
|
|
return
|
|
}
|
|
defer results.Close()
|
|
|
|
for result := range results.Next() {
|
|
k, err := b32.RawStdEncoding.DecodeString(ds.RawKey(result.Key).Name())
|
|
if err != nil {
|
|
// TODO: drop the record? this will keep failing forever.
|
|
log.Warningf("failed while purging record: %v, err: %v\n", result.Key, err)
|
|
continue
|
|
}
|
|
id, err = peer.IDFromBytes(k)
|
|
if err != nil {
|
|
// TODO: drop the record? this will keep failing forever.
|
|
log.Warningf("failed to get extract peer ID from bytes (hex): %x, err: %v\n", k, err)
|
|
continue
|
|
}
|
|
// if the record is in cache, we refresh it and flush it if necessary.
|
|
if e, ok := ab.cache.Peek(id); ok {
|
|
cached := e.(*addrsRecord)
|
|
cached.Lock()
|
|
if cached.Refresh() {
|
|
cached.FlushInTxn(txn)
|
|
}
|
|
cached.Unlock()
|
|
continue
|
|
}
|
|
|
|
if err := record.Unmarshal(result.Value); err != nil {
|
|
// TODO: drop the record? this will keep failing forever.
|
|
log.Warningf("failed while deserializing entry with key: %v, err: %v\n", result.Key, err)
|
|
continue
|
|
}
|
|
if record.Refresh() {
|
|
record.FlushInTxn(txn)
|
|
}
|
|
record.Reset()
|
|
}
|
|
|
|
if err = txn.Commit(); err != nil {
|
|
log.Warningf("failed to commit GC transaction: %v\n", err)
|
|
}
|
|
}
|
|
|
|
// AddAddr will add a new address if it's not already in the AddrBook.
|
|
func (ab *dsAddrBook) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
|
ab.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
|
}
|
|
|
|
// AddAddrs will add many new addresses if they're not already in the AddrBook.
|
|
func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
|
if ttl <= 0 {
|
|
return
|
|
}
|
|
addrs = cleanAddrs(addrs)
|
|
ab.setAddrs(p, addrs, ttl, ttlExtend)
|
|
}
|
|
|
|
// SetAddr will add or update the TTL of an address in the AddrBook.
|
|
func (ab *dsAddrBook) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
|
ab.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
|
}
|
|
|
|
// SetAddrs will add or update the TTLs of addresses in the AddrBook.
|
|
func (ab *dsAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
|
addrs = cleanAddrs(addrs)
|
|
if ttl <= 0 {
|
|
ab.deleteAddrs(p, addrs)
|
|
return
|
|
}
|
|
ab.setAddrs(p, addrs, ttl, ttlOverride)
|
|
}
|
|
|
|
// UpdateAddrs will update any addresses for a given peer and TTL combination to
|
|
// have a new TTL.
|
|
func (ab *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
|
pr, err := ab.loadRecord(p, true, false)
|
|
if err != nil {
|
|
log.Errorf("failed to update ttls for peer %s: %s\n", p.Pretty(), err)
|
|
}
|
|
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
|
|
newExp := time.Now().Add(newTTL).Unix()
|
|
for _, entry := range pr.Addrs {
|
|
if entry.Ttl != int64(oldTTL) {
|
|
continue
|
|
}
|
|
entry.Ttl, entry.Expiry = int64(newTTL), newExp
|
|
pr.dirty = true
|
|
}
|
|
|
|
if pr.Refresh() {
|
|
pr.Flush(ab.ds)
|
|
}
|
|
}
|
|
|
|
// Addrs returns all of the non-expired addresses for a given peer.
|
|
func (ab *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
|
|
pr, err := ab.loadRecord(p, true, true)
|
|
if err != nil {
|
|
log.Warning("failed to load peerstore entry for peer %v while querying addrs, err: %v", p, err)
|
|
return nil
|
|
}
|
|
|
|
pr.RLock()
|
|
defer pr.RUnlock()
|
|
|
|
addrs := make([]ma.Multiaddr, 0, len(pr.Addrs))
|
|
for _, a := range pr.Addrs {
|
|
addrs = append(addrs, a.Addr)
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
// Peers returns all of the peer IDs for which the AddrBook has addresses.
|
|
func (ab *dsAddrBook) PeersWithAddrs() peer.IDSlice {
|
|
ids, err := uniquePeerIds(ab.ds, addrBookBase, func(result query.Result) string {
|
|
return ds.RawKey(result.Key).Name()
|
|
})
|
|
if err != nil {
|
|
log.Errorf("error while retrieving peers with addresses: %v", err)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
// AddrStream returns a channel on which all new addresses discovered for a
|
|
// given peer ID will be published.
|
|
func (ab *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
|
|
initial := ab.Addrs(p)
|
|
return ab.subsManager.AddrStream(ctx, p, initial)
|
|
}
|
|
|
|
// ClearAddrs will delete all known addresses for a peer ID.
|
|
func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
|
|
ab.cache.Remove(p)
|
|
|
|
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p)))
|
|
txn, err := ab.ds.NewTransaction(false)
|
|
if err != nil {
|
|
log.Errorf("failed to clear addresses for peer %s: %v\n", p.Pretty(), err)
|
|
}
|
|
defer txn.Discard()
|
|
|
|
if err := txn.Delete(key); err != nil {
|
|
log.Errorf("failed to clear addresses for peer %s: %v\n", p.Pretty(), err)
|
|
}
|
|
|
|
if err = txn.Commit(); err != nil {
|
|
log.Errorf("failed to commit transaction when deleting keys, cause: %v", err)
|
|
}
|
|
}
|
|
|
|
func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode) (err error) {
|
|
pr, err := ab.loadRecord(p, true, false)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load peerstore entry for peer %v while setting addrs, err: %v", p, err)
|
|
}
|
|
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
|
|
newExp := time.Now().Add(ttl).Unix()
|
|
existed := make([]bool, len(addrs)) // keeps track of which addrs we found
|
|
|
|
Outer:
|
|
for i, incoming := range addrs {
|
|
for _, have := range pr.Addrs {
|
|
if incoming.Equal(have.Addr) {
|
|
existed[i] = true
|
|
if mode == ttlExtend && have.Expiry > newExp {
|
|
// if we're only extending TTLs but the addr already has a longer one, we skip it.
|
|
continue Outer
|
|
}
|
|
have.Expiry = newExp
|
|
// we found the address, and addresses cannot be duplicate,
|
|
// so let's move on to the next.
|
|
continue Outer
|
|
}
|
|
}
|
|
}
|
|
|
|
// add addresses we didn't hold.
|
|
var added []*pb.AddrBookRecord_AddrEntry
|
|
for i, e := range existed {
|
|
if e {
|
|
continue
|
|
}
|
|
addr := addrs[i]
|
|
entry := &pb.AddrBookRecord_AddrEntry{
|
|
Addr: &pb.ProtoAddr{Multiaddr: addr},
|
|
Ttl: int64(ttl),
|
|
Expiry: newExp,
|
|
}
|
|
added = append(added, entry)
|
|
// TODO: should we only broadcast if we updated the store successfully?
|
|
// we have no way of rolling back the state of the in-memory record, although we
|
|
// could at the expense of allocs. But is it worthwhile?
|
|
ab.subsManager.BroadcastAddr(p, addr)
|
|
}
|
|
|
|
pr.Addrs = append(pr.Addrs, added...)
|
|
pr.dirty = true
|
|
pr.Refresh()
|
|
return pr.Flush(ab.ds)
|
|
}
|
|
|
|
func (ab *dsAddrBook) deleteAddrs(p peer.ID, addrs []ma.Multiaddr) (err error) {
|
|
pr, err := ab.loadRecord(p, false, false)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load peerstore entry for peer %v while deleting addrs, err: %v", p, err)
|
|
}
|
|
|
|
if pr.Addrs == nil {
|
|
return nil
|
|
}
|
|
|
|
pr.Lock()
|
|
defer pr.Unlock()
|
|
|
|
// deletes addresses in place, and avoiding copies until we encounter the first deletion.
|
|
survived := 0
|
|
for i, addr := range pr.Addrs {
|
|
for _, del := range addrs {
|
|
if addr.Addr.Equal(del) {
|
|
continue
|
|
}
|
|
if i != survived {
|
|
pr.Addrs[survived] = pr.Addrs[i]
|
|
}
|
|
survived++
|
|
}
|
|
}
|
|
pr.Addrs = pr.Addrs[:survived]
|
|
|
|
pr.dirty = true
|
|
pr.Refresh()
|
|
return pr.Flush(ab.ds)
|
|
}
|
|
|
|
func cleanAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
|
|
clean := make([]ma.Multiaddr, 0, len(addrs))
|
|
for _, addr := range addrs {
|
|
if addr == nil {
|
|
continue
|
|
}
|
|
clean = append(clean, addr)
|
|
}
|
|
return clean
|
|
}
|