mirror of
https://github.com/libp2p/go-libp2p-peerstore.git
synced 2025-03-31 14:00:06 +08:00
Implement and test Datastore-backed AddrManager
This commit is contained in:
parent
d06a57e781
commit
ba6ac2671b
239
addr_manager_ds.go
Normal file
239
addr_manager_ds.go
Normal file
@ -0,0 +1,239 @@
|
||||
package peerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
"github.com/libp2p/go-libp2p-peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
mh "github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
type DatastoreAddrManager struct {
|
||||
ds ds.Datastore
|
||||
ttlManager *ttlmanager
|
||||
addrSubs map[peer.ID][]*addrSub
|
||||
}
|
||||
|
||||
func NewDatastoreAddrManager(ds ds.Datastore, ttlInterval time.Duration) *DatastoreAddrManager {
|
||||
mgr := &DatastoreAddrManager{
|
||||
ds: ds,
|
||||
ttlManager: newTTLManager(context.Background(), ds, ttlInterval),
|
||||
addrSubs: make(map[peer.ID][]*addrSub),
|
||||
}
|
||||
return mgr
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) Stop() {
|
||||
mgr.ttlManager.Stop()
|
||||
}
|
||||
|
||||
func peerAddressKey(p *peer.ID, addr *ma.Multiaddr) (ds.Key, error) {
|
||||
hash, err := mh.Sum((*addr).Bytes(), mh.MURMUR3, -1)
|
||||
if err != nil {
|
||||
return ds.Key{}, nil
|
||||
}
|
||||
return ds.NewKey(p.Pretty()).ChildString(hash.B58String()), nil
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||
if ttl <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
mgr.SetAddrs(p, addrs, ttl)
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
|
||||
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
|
||||
var keys []ds.Key
|
||||
for _, addr := range addrs {
|
||||
if addr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
key, err := peerAddressKey(&p, &addr)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
keys = append(keys, key)
|
||||
|
||||
if ttl <= 0 {
|
||||
mgr.ds.Delete(key)
|
||||
} else {
|
||||
if err := mgr.ds.Put(key, addr.Bytes()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
mgr.ttlManager.SetTTLs(keys, ttl)
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
|
||||
prefix := ds.NewKey(p.Pretty())
|
||||
mgr.ttlManager.UpdateTTLs(prefix, oldTTL, newTTL)
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) Addrs(p peer.ID) []ma.Multiaddr {
|
||||
prefix := ds.NewKey(p.Pretty())
|
||||
q := query.Query{Prefix: prefix.String()}
|
||||
results, err := mgr.ds.Query(q)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return []ma.Multiaddr{}
|
||||
}
|
||||
|
||||
var addrs []ma.Multiaddr
|
||||
for result := range results.Next() {
|
||||
addrbytes := result.Value.([]byte)
|
||||
addr, err := ma.NewMultiaddrBytes(addrbytes)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
|
||||
return addrs
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr {
|
||||
panic("implement me")
|
||||
stream := make(chan ma.Multiaddr)
|
||||
return stream
|
||||
}
|
||||
|
||||
func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
|
||||
prefix := ds.NewKey(p.Pretty())
|
||||
q := query.Query{Prefix: prefix.String()}
|
||||
results, err := mgr.ds.Query(q)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
for result := range results.Next() {
|
||||
mgr.ds.Delete(ds.NewKey(result.Key))
|
||||
}
|
||||
mgr.ttlManager.Clear(ds.NewKey(p.Pretty()))
|
||||
}
|
||||
|
||||
// ttlmanager
|
||||
|
||||
type ttlentry struct {
|
||||
TTL time.Duration
|
||||
ExpiresAt time.Time
|
||||
}
|
||||
|
||||
type ttlmanager struct {
|
||||
sync.Mutex
|
||||
entries map[ds.Key]*ttlentry
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ticker *time.Ticker
|
||||
done chan struct{}
|
||||
ds ds.Datastore
|
||||
}
|
||||
|
||||
func newTTLManager(parent context.Context, d ds.Datastore, tick time.Duration) *ttlmanager {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
mgr := &ttlmanager{
|
||||
Mutex: sync.Mutex{},
|
||||
entries: make(map[ds.Key]*ttlentry),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
ticker: time.NewTicker(tick),
|
||||
ds: d,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
go func() {
|
||||
stop := false
|
||||
for {
|
||||
select {
|
||||
case <-mgr.ctx.Done():
|
||||
stop = true
|
||||
case <-mgr.ticker.C:
|
||||
mgr.tick()
|
||||
}
|
||||
|
||||
if stop {
|
||||
break
|
||||
}
|
||||
}
|
||||
mgr.done <- struct{}{}
|
||||
}()
|
||||
|
||||
return mgr
|
||||
}
|
||||
|
||||
func (mgr *ttlmanager) Stop() {
|
||||
mgr.cancel()
|
||||
<-mgr.done
|
||||
}
|
||||
|
||||
// For internal use only
|
||||
func (mgr *ttlmanager) tick() {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
for key, entry := range mgr.entries {
|
||||
if entry.ExpiresAt.Before(now) {
|
||||
if err := mgr.ds.Delete(key); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
delete(mgr.entries, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlmanager) SetTTLs(keys []ds.Key, ttl time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
expiration := time.Now().Add(ttl)
|
||||
for _, key := range keys {
|
||||
if ttl <= 0 {
|
||||
delete(mgr.entries, key)
|
||||
} else {
|
||||
mgr.entries[key] = &ttlentry{TTL: ttl, ExpiresAt: expiration}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlmanager) UpdateTTLs(prefix ds.Key, oldTTL, newTTL time.Duration) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
var keys []ds.Key
|
||||
for key, entry := range mgr.entries {
|
||||
if key.IsDescendantOf(prefix) && entry.TTL == oldTTL {
|
||||
keys = append(keys, key)
|
||||
entry.TTL = newTTL
|
||||
entry.ExpiresAt = now.Add(newTTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *ttlmanager) Clear(prefix ds.Key) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
|
||||
for key := range mgr.entries {
|
||||
if key.IsDescendantOf(prefix) {
|
||||
delete(mgr.entries, key)
|
||||
}
|
||||
}
|
||||
}
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p-peer"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"os"
|
||||
"github.com/ipfs/go-ds-badger"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
@ -66,6 +67,24 @@ func setupBadgerAddrManager(t *testing.T) (*BadgerAddrManager, func ()) {
|
||||
return mgr, closer
|
||||
}
|
||||
|
||||
func setupDatastoreAddrManager(t *testing.T) (*DatastoreAddrManager, func ()) {
|
||||
dataPath, err := ioutil.TempDir(os.TempDir(), "badger")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ds, err := badger.NewDatastore(dataPath, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mgr := NewDatastoreAddrManager(ds, 100 * time.Microsecond)
|
||||
closer := func () {
|
||||
mgr.Stop()
|
||||
ds.Close()
|
||||
os.RemoveAll(dataPath)
|
||||
}
|
||||
return mgr, closer
|
||||
}
|
||||
|
||||
func testAddresses(t *testing.T, m AddrBook) {
|
||||
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
|
||||
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
|
||||
@ -125,6 +144,11 @@ func TestAddresses(t *testing.T) {
|
||||
mgr2, closer2 := setupBadgerAddrManager(t)
|
||||
defer closer2()
|
||||
testAddresses(t, mgr2)
|
||||
|
||||
t.Log("DatastoreAddrManager")
|
||||
mgr3, closer3 := setupDatastoreAddrManager(t)
|
||||
defer closer3()
|
||||
testAddresses(t, mgr3)
|
||||
}
|
||||
|
||||
func testAddressesExpire(t *testing.T, m AddrBook) {
|
||||
@ -189,6 +213,11 @@ func TestAddressesExpire(t *testing.T) {
|
||||
m2, closer2 := setupBadgerAddrManager(t)
|
||||
defer closer2()
|
||||
testAddressesExpire(t, m2)
|
||||
|
||||
t.Log("DatastoreAddrManager")
|
||||
m3, closer3 := setupDatastoreAddrManager(t)
|
||||
defer closer3()
|
||||
testAddressesExpire(t, m3)
|
||||
}
|
||||
|
||||
func testClearWorks(t *testing.T, m AddrBook) {
|
||||
@ -225,6 +254,11 @@ func TestClearWorks(t *testing.T) {
|
||||
m2, closer2 := setupBadgerAddrManager(t)
|
||||
defer closer2()
|
||||
testClearWorks(t, m2)
|
||||
|
||||
t.Log("DatastoreAddrManager")
|
||||
m3, closer3 := setupDatastoreAddrManager(t)
|
||||
defer closer3()
|
||||
testClearWorks(t, m3)
|
||||
}
|
||||
|
||||
func testSetNegativeTTLClears(t *testing.T, m AddrBook) {
|
||||
@ -248,6 +282,11 @@ func TestSetNegativeTTLClears(t *testing.T) {
|
||||
m2, closer2 := setupBadgerAddrManager(t)
|
||||
defer closer2()
|
||||
testSetNegativeTTLClears(t, m2)
|
||||
|
||||
t.Log("DatastoreAddrManager")
|
||||
m3, closer3 := setupDatastoreAddrManager(t)
|
||||
defer closer3()
|
||||
testSetNegativeTTLClears(t, m3)
|
||||
}
|
||||
|
||||
func testUpdateTTLs(t *testing.T, m AddrBook) {
|
||||
@ -303,6 +342,11 @@ func TestUpdateTTLs(t *testing.T) {
|
||||
m2, closer2 := setupBadgerAddrManager(t)
|
||||
defer closer2()
|
||||
testUpdateTTLs(t, m2)
|
||||
|
||||
t.Log("DatastoreAddrManager")
|
||||
m3, closer3 := setupDatastoreAddrManager(t)
|
||||
defer closer3()
|
||||
testUpdateTTLs(t, m3)
|
||||
}
|
||||
|
||||
func testNilAddrsDontBreak(t *testing.T, m AddrBook) {
|
||||
@ -322,4 +366,10 @@ func TestNilAddrsDontBreak(t *testing.T) {
|
||||
defer closer2()
|
||||
testNilAddrsDontBreak(t, m2)
|
||||
t.Log("OK")
|
||||
|
||||
t.Log("DatastoreAddrManager")
|
||||
m3, closer3 := setupDatastoreAddrManager(t)
|
||||
defer closer3()
|
||||
testNilAddrsDontBreak(t, m3)
|
||||
t.Log("OK")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user