2018-06-12 05:58:10 +08:00
|
|
|
package peerstore
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
ds "github.com/ipfs/go-datastore"
|
|
|
|
"github.com/ipfs/go-datastore/query"
|
|
|
|
"github.com/libp2p/go-libp2p-peer"
|
|
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
|
|
mh "github.com/multiformats/go-multihash"
|
|
|
|
)
|
|
|
|
|
|
|
|
type DatastoreAddrManager struct {
|
2018-06-13 06:02:42 +08:00
|
|
|
ds ds.Datastore
|
|
|
|
ttlManager *ttlmanager
|
|
|
|
subsManager *AddrSubManager
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
|
2018-06-12 07:54:17 +08:00
|
|
|
func NewDatastoreAddrManager(ctx context.Context, ds ds.Datastore, ttlInterval time.Duration) *DatastoreAddrManager {
|
2018-06-12 05:58:10 +08:00
|
|
|
mgr := &DatastoreAddrManager{
|
2018-06-13 06:02:42 +08:00
|
|
|
ds: ds,
|
|
|
|
ttlManager: newTTLManager(ctx, ds, ttlInterval),
|
|
|
|
subsManager: NewAddrSubManager(),
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
return mgr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mgr *DatastoreAddrManager) Stop() {
|
2018-06-12 07:59:50 +08:00
|
|
|
mgr.ttlManager.stop()
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func peerAddressKey(p *peer.ID, addr *ma.Multiaddr) (ds.Key, error) {
|
|
|
|
hash, err := mh.Sum((*addr).Bytes(), mh.MURMUR3, -1)
|
|
|
|
if err != nil {
|
|
|
|
return ds.Key{}, nil
|
|
|
|
}
|
2018-06-15 04:17:50 +08:00
|
|
|
return ds.NewKey(peer.IDB58Encode(*p)).ChildString(hash.B58String()), nil
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
|
2018-06-14 07:27:14 +08:00
|
|
|
func peerIDFromKey(key ds.Key) (peer.ID, error) {
|
2018-06-15 04:17:50 +08:00
|
|
|
idstring := key.Parent().Name()
|
2018-06-14 07:27:14 +08:00
|
|
|
return peer.IDB58Decode(idstring)
|
|
|
|
}
|
|
|
|
|
2018-06-12 05:58:10 +08:00
|
|
|
func (mgr *DatastoreAddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
|
|
|
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
|
|
|
if ttl <= 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
mgr.SetAddrs(p, addrs, ttl)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mgr *DatastoreAddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
|
|
|
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
|
|
|
var keys []ds.Key
|
|
|
|
for _, addr := range addrs {
|
|
|
|
if addr == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
key, err := peerAddressKey(&p, &addr)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
keys = append(keys, key)
|
|
|
|
|
|
|
|
if ttl <= 0 {
|
|
|
|
mgr.ds.Delete(key)
|
2018-06-13 05:58:57 +08:00
|
|
|
continue
|
|
|
|
}
|
2018-06-15 04:17:50 +08:00
|
|
|
if has, err := mgr.ds.Has(key); err != nil || !has {
|
2018-06-13 06:02:42 +08:00
|
|
|
mgr.subsManager.BroadcastAddr(p, addr)
|
|
|
|
}
|
2018-06-13 05:58:57 +08:00
|
|
|
if err := mgr.ds.Put(key, addr.Bytes()); err != nil {
|
|
|
|
log.Error(err)
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
}
|
2018-06-12 07:59:50 +08:00
|
|
|
mgr.ttlManager.setTTLs(keys, ttl)
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (mgr *DatastoreAddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
|
|
|
prefix := ds.NewKey(p.Pretty())
|
2018-06-12 07:59:50 +08:00
|
|
|
mgr.ttlManager.updateTTLs(prefix, oldTTL, newTTL)
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr {
|
|
|
|
prefix := ds.NewKey(p.Pretty())
|
|
|
|
q := query.Query{Prefix: prefix.String()}
|
|
|
|
results, err := mgr.ds.Query(q)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
return []ma.Multiaddr{}
|
|
|
|
}
|
|
|
|
|
|
|
|
var addrs []ma.Multiaddr
|
|
|
|
for result := range results.Next() {
|
|
|
|
addrbytes := result.Value.([]byte)
|
|
|
|
addr, err := ma.NewMultiaddrBytes(addrbytes)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
addrs = append(addrs, addr)
|
|
|
|
}
|
|
|
|
|
|
|
|
return addrs
|
|
|
|
}
|
|
|
|
|
2018-06-14 07:27:14 +08:00
|
|
|
func (mgr *DatastoreAddrManager) Peers() []peer.ID {
|
|
|
|
q := query.Query{}
|
|
|
|
results, err := mgr.ds.Query(q)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
return []peer.ID{}
|
|
|
|
}
|
|
|
|
|
|
|
|
idset := make(map[peer.ID]struct{})
|
|
|
|
for result := range results.Next() {
|
|
|
|
key := ds.RawKey(result.Key)
|
|
|
|
id, err := peerIDFromKey(key)
|
|
|
|
if err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
idset[id] = struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
ids := make([]peer.ID, 0, len(idset))
|
|
|
|
for id := range idset {
|
|
|
|
ids = append(ids, id)
|
|
|
|
}
|
|
|
|
return ids
|
|
|
|
}
|
|
|
|
|
2018-06-13 06:02:42 +08:00
|
|
|
func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
|
|
|
|
initial := mgr.Addrs(p)
|
|
|
|
return mgr.subsManager.AddrStream(ctx, p, initial)
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
|
|
|
|
prefix := ds.NewKey(p.Pretty())
|
|
|
|
q := query.Query{Prefix: prefix.String()}
|
|
|
|
results, err := mgr.ds.Query(q)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for result := range results.Next() {
|
|
|
|
mgr.ds.Delete(ds.NewKey(result.Key))
|
|
|
|
}
|
2018-06-12 07:59:50 +08:00
|
|
|
mgr.ttlManager.clear(ds.NewKey(p.Pretty()))
|
2018-06-12 05:58:10 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// ttlmanager
|
|
|
|
|
|
|
|
type ttlentry struct {
|
|
|
|
TTL time.Duration
|
|
|
|
ExpiresAt time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
type ttlmanager struct {
|
|
|
|
sync.Mutex
|
|
|
|
entries map[ds.Key]*ttlentry
|
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
|
|
|
ticker *time.Ticker
|
|
|
|
done chan struct{}
|
|
|
|
ds ds.Datastore
|
|
|
|
}
|
|
|
|
|
|
|
|
func newTTLManager(parent context.Context, d ds.Datastore, tick time.Duration) *ttlmanager {
|
|
|
|
ctx, cancel := context.WithCancel(parent)
|
|
|
|
mgr := &ttlmanager{
|
|
|
|
Mutex: sync.Mutex{},
|
|
|
|
entries: make(map[ds.Key]*ttlentry),
|
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
ticker: time.NewTicker(tick),
|
|
|
|
ds: d,
|
|
|
|
done: make(chan struct{}),
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-mgr.ctx.Done():
|
2018-06-12 07:54:17 +08:00
|
|
|
mgr.ticker.Stop()
|
|
|
|
mgr.done <- struct{}{}
|
|
|
|
return
|
2018-06-12 05:58:10 +08:00
|
|
|
case <-mgr.ticker.C:
|
|
|
|
mgr.tick()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return mgr
|
|
|
|
}
|
|
|
|
|
2018-06-12 07:59:50 +08:00
|
|
|
func (mgr *ttlmanager) stop() {
|
2018-06-12 05:58:10 +08:00
|
|
|
mgr.cancel()
|
|
|
|
<-mgr.done
|
|
|
|
}
|
|
|
|
|
|
|
|
// For internal use only
|
|
|
|
func (mgr *ttlmanager) tick() {
|
|
|
|
mgr.Lock()
|
|
|
|
defer mgr.Unlock()
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
for key, entry := range mgr.entries {
|
|
|
|
if entry.ExpiresAt.Before(now) {
|
|
|
|
if err := mgr.ds.Delete(key); err != nil {
|
|
|
|
log.Error(err)
|
|
|
|
}
|
|
|
|
delete(mgr.entries, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-12 07:59:50 +08:00
|
|
|
func (mgr *ttlmanager) setTTLs(keys []ds.Key, ttl time.Duration) {
|
2018-06-12 05:58:10 +08:00
|
|
|
mgr.Lock()
|
|
|
|
defer mgr.Unlock()
|
|
|
|
|
|
|
|
expiration := time.Now().Add(ttl)
|
|
|
|
for _, key := range keys {
|
|
|
|
if ttl <= 0 {
|
|
|
|
delete(mgr.entries, key)
|
|
|
|
} else {
|
|
|
|
mgr.entries[key] = &ttlentry{TTL: ttl, ExpiresAt: expiration}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-12 07:59:50 +08:00
|
|
|
func (mgr *ttlmanager) updateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
|
2018-06-12 05:58:10 +08:00
|
|
|
mgr.Lock()
|
|
|
|
defer mgr.Unlock()
|
|
|
|
|
|
|
|
now := time.Now()
|
|
|
|
var keys []ds.Key
|
|
|
|
for key, entry := range mgr.entries {
|
|
|
|
if key.IsDescendantOf(prefix) && entry.TTL == oldTTL {
|
|
|
|
keys = append(keys, key)
|
|
|
|
entry.TTL = newTTL
|
|
|
|
entry.ExpiresAt = now.Add(newTTL)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-06-12 07:59:50 +08:00
|
|
|
func (mgr *ttlmanager) clear(prefix ds.Key) {
|
2018-06-12 05:58:10 +08:00
|
|
|
mgr.Lock()
|
|
|
|
defer mgr.Unlock()
|
|
|
|
|
|
|
|
for key := range mgr.entries {
|
|
|
|
if key.IsDescendantOf(prefix) {
|
|
|
|
delete(mgr.entries, key)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|