Implement badger backed addr manager, add to tests

This commit is contained in:
Cole Brown 2018-06-08 13:26:53 -04:00
parent 1016f6e956
commit 49769cb606
2 changed files with 218 additions and 118 deletions

View File

@ -6,17 +6,15 @@ import (
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
//"github.com/hashicorp/golang-lru"
"github.com/dgraph-io/badger"
"github.com/multiformats/go-multihash"
"encoding/gob"
"bytes"
"encoding/binary"
)
type addrmanager_badger struct {
//cache *lru.Cache
db *badger.DB
type AddrManagerBadger struct {
DB *badger.DB
addrSubs map[peer.ID][]*addrSub
}
type addrentry struct {
@ -24,15 +22,33 @@ type addrentry struct {
TTL time.Duration
}
func NewBadgerAddrManager() *addrmanager_badger {
db, err := badger.Open(badger.DefaultOptions)
if err != nil {
panic(err)
func (mgr *AddrManagerBadger) sendSubscriptionUpdates(p *peer.ID, addrs []ma.Multiaddr) {
subs := mgr.addrSubs[*p]
for _, sub := range subs {
for _, addr := range addrs {
sub.pubAddr(addr)
}
}
return &addrmanager_badger{db: db}
}
func (mgr *addrmanager_badger) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
func (mgr *AddrManagerBadger) Close() {
if err := mgr.DB.Close(); err != nil {
log.Error(err)
}
}
func NewBadgerAddrManager(dataPath string) (*AddrManagerBadger, error) {
opts := badger.DefaultOptions
opts.Dir = dataPath
opts.ValueDir = dataPath
db, err := badger.Open(opts)
if err != nil {
return nil, err
}
return &AddrManagerBadger{DB: db}, nil
}
func (mgr *AddrManagerBadger) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
@ -41,31 +57,8 @@ func hashMultiaddr(addr *ma.Multiaddr) ([]byte, error) {
return multihash.Encode((*addr).Bytes(), multihash.MURMUR3)
}
// not relevant w/ key prefixing
func createAddrEntry(addr ma.Multiaddr, ttl time.Duration) ([]byte, error) {
entry := addrentry{Addr: addr.Bytes(), TTL: ttl}
buf := bytes.Buffer{}
enc := gob.NewEncoder(&buf)
if err := enc.Encode(entry); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func createKeyPrefix(p *peer.ID, ttl time.Duration) ([]byte, error) {
buf := bytes.NewBufferString(string(*p))
if err := binary.Write(buf, binary.LittleEndian, ttl); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func createUniqueKey(p *peer.ID, addr *ma.Multiaddr, ttl time.Duration) ([]byte, error) {
prefix, err := createKeyPrefix(p, ttl)
if err != nil {
return nil, err
}
func createUniqueKey(p *peer.ID, addr *ma.Multiaddr) ([]byte, error) {
prefix := []byte(*p)
addrHash, err := hashMultiaddr(addr)
if err != nil {
return nil, err
@ -76,55 +69,86 @@ func createUniqueKey(p *peer.ID, addr *ma.Multiaddr, ttl time.Duration) ([]byte,
func addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, txn *badger.Txn) {
for _, addr := range addrs {
key, err := createUniqueKey(&p, &addr, ttl)
if addr == nil {
continue
}
entry := &addrentry{Addr: addr.Bytes(), TTL: ttl}
key, err := createUniqueKey(&p, &addr)
if err != nil {
log.Error(err)
txn.Discard()
return
}
txn.SetWithTTL(key, addr.Bytes(), ttl)
buf := &bytes.Buffer{}
enc := gob.NewEncoder(buf)
if err := enc.Encode(entry); err != nil {
log.Error(err)
txn.Discard()
return
}
txn.SetWithTTL(key, buf.Bytes(), ttl)
}
}
func (mgr *addrmanager_badger) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// if ttl is zero, exit. nothing to do.
func (mgr *AddrManagerBadger) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
if ttl <= 0 {
log.Debugf("short circuiting AddAddrs with ttl %d", ttl)
return
}
txn := mgr.db.NewTransaction(true)
txn := mgr.DB.NewTransaction(true)
defer txn.Discard()
go mgr.sendSubscriptionUpdates(&p, addrs)
addAddrs(p, addrs, ttl, txn)
txn.Commit(func (err error) {
txn.Commit(nil)
}
func (mgr *AddrManagerBadger) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
func (mgr *AddrManagerBadger) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
txn := mgr.DB.NewTransaction(true)
defer txn.Discard()
for _, addr := range addrs {
if addr == nil {
continue
}
key, err := createUniqueKey(&p, &addr)
if err != nil {
log.Error(err)
continue
}
if ttl <= 0 {
if err := txn.Delete(key); err != nil {
log.Error(err)
}
} else {
entry := &addrentry{Addr: addr.Bytes(), TTL: ttl}
buf := &bytes.Buffer{}
enc := gob.NewEncoder(buf)
if err := enc.Encode(entry); err != nil {
log.Error(err)
continue
}
txn.SetWithTTL(key, buf.Bytes(), ttl)
}
})
}
func (mgr *addrmanager_badger) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddr(p, addr, ttl)
}
func (mgr *addrmanager_badger) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, addrs, ttl)
}
func (mgr *addrmanager_badger) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
prefix, err := createKeyPrefix(&p, oldTTL)
if err != nil {
log.Error(err)
return
}
txn := mgr.db.NewTransaction(true)
txn.Commit(nil)
}
func (mgr *AddrManagerBadger) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.Duration) {
prefix := []byte(p)
txn := mgr.DB.NewTransaction(true)
defer txn.Discard()
opts := badger.DefaultIteratorOptions
iter := txn.NewIterator(opts)
var addrs []ma.Multiaddr
iter.Seek(prefix)
for iter.ValidForPrefix(prefix) {
item := iter.Item()
@ -133,22 +157,32 @@ func (mgr *addrmanager_badger) UpdateAddrs(p peer.ID, oldTTL time.Duration, newT
log.Error(err)
return
}
addrs = append(addrs, ma.Cast(addrbytes))
entry := &addrentry{}
buf := bytes.NewBuffer(addrbytes)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&entry); err != nil {
log.Error(err)
return
}
if entry.TTL == oldTTL {
entry.TTL = newTTL
buf := &bytes.Buffer{}
enc := gob.NewEncoder(buf)
if err := enc.Encode(&entry); err != nil {
log.Error(err)
return
}
txn.SetWithTTL(item.Key(), buf.Bytes(), newTTL)
}
iter.Next()
}
addAddrs(p, addrs, newTTL, txn)
txn.Commit(func (err error) {
if err != nil {
log.Error(err)
}
})
txn.Commit(nil)
}
func (mgr *addrmanager_badger) Addrs(p peer.ID) []ma.Multiaddr {
txn := mgr.db.NewTransaction(false)
func (mgr *AddrManagerBadger) Addrs(p peer.ID) []ma.Multiaddr {
txn := mgr.DB.NewTransaction(false)
defer txn.Discard()
prefix := []byte(p)
@ -166,7 +200,15 @@ func (mgr *addrmanager_badger) Addrs(p peer.ID) []ma.Multiaddr {
if err != nil {
log.Error(err)
} else {
addrs = append(addrs, ma.Cast(value))
entry := &addrentry{}
buf := bytes.NewBuffer(value)
dec := gob.NewDecoder(buf)
if err := dec.Decode(&entry); err != nil {
log.Error("deleting bad entry in peerstore for peer", p.String())
txn.Delete(item.Key())
} else {
addrs = append(addrs, ma.Cast(entry.Addr))
}
}
}
@ -178,32 +220,28 @@ func (mgr *addrmanager_badger) Addrs(p peer.ID) []ma.Multiaddr {
return addrs
}
func (mgr *addrmanager_badger) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
func (mgr *AddrManagerBadger) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
addrs := make(chan ma.Multiaddr)
//mgr.db.View(func (txn *badger.Txn) error {
// defer close(addrs)
//
// return nil
//})
// TODO: impl
return addrs
}
func (mgr *addrmanager_badger) ClearAddrs(p peer.ID) {
err := mgr.db.Update(func (txn *badger.Txn) error {
iter := txn.NewIterator(badger.DefaultIteratorOptions)
prefix := []byte(p)
iter.Seek(prefix)
func (mgr *AddrManagerBadger) ClearAddrs(p peer.ID) {
txn := mgr.DB.NewTransaction(true)
defer txn.Discard()
it := txn.NewIterator(badger.DefaultIteratorOptions)
prefix := []byte(p)
it.Seek(prefix)
for iter.ValidForPrefix(prefix) {
txn.Delete(iter.Item().Key())
iter.Next()
count := 0
for it.ValidForPrefix(prefix) {
count++
if err := txn.Delete(it.Item().Key()); err != nil {
log.Error(err)
}
return nil
})
if err != nil {
log.Error(err)
it.Next()
}
txn.Commit(nil)
}

View File

@ -6,6 +6,7 @@ import (
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
"os"
)
func IDS(t *testing.T, ids string) peer.ID {
@ -29,7 +30,7 @@ func MA(t *testing.T, m string) ma.Multiaddr {
func testHas(t *testing.T, exp, act []ma.Multiaddr) {
t.Helper()
if len(exp) != len(act) {
t.Fatal("lengths not the same")
t.Fatalf("lengths not the same. expected %d, got %d\n", len(exp), len(act))
}
for _, a := range exp {
@ -48,8 +49,7 @@ func testHas(t *testing.T, exp, act []ma.Multiaddr) {
}
}
func TestAddresses(t *testing.T) {
func testAddresses(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
@ -73,7 +73,6 @@ func TestAddresses(t *testing.T) {
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
ttl := time.Hour
m := AddrManager{}
m.AddAddr(id1, ma11, ttl)
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl)
@ -92,21 +91,40 @@ func TestAddresses(t *testing.T) {
m.ClearAddrs(id5)
m.AddAddrs(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ttl) // clearing
if len(m.Peers()) != 5 {
t.Fatal("should have exactly two peers in the address book")
}
// test the Addresses return value
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
testHas(t, []ma.Multiaddr{ma31, ma32, ma33}, m.Addrs(id3))
testHas(t, []ma.Multiaddr{ma41, ma42, ma43, ma44}, m.Addrs(id4))
testHas(t, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, m.Addrs(id5))
}
func TestAddressesExpire(t *testing.T) {
func setupBadgerAddrManager(t *testing.T) (*AddrManagerBadger, func ()) {
dataPath := os.TempDir()
mgr, err := NewBadgerAddrManager(dataPath)
if err != nil {
t.Fatal(err)
}
closer := func () {
mgr.Close()
os.RemoveAll(dataPath)
}
return mgr, closer
}
func TestAddresses(t *testing.T) {
t.Log("AddrManager")
mgr1 := AddrManager{}
testAddresses(t, &mgr1)
t.Log("AddrManager")
mgr2, closer := setupBadgerAddrManager(t)
defer closer()
testAddresses(t, mgr2)
}
func testAddressesExpire(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
@ -115,17 +133,12 @@ func TestAddressesExpire(t *testing.T) {
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
m := AddrManager{}
m.AddAddr(id1, ma11, time.Hour)
m.AddAddr(id1, ma12, time.Hour)
m.AddAddr(id1, ma13, time.Hour)
m.AddAddr(id2, ma24, time.Hour)
m.AddAddr(id2, ma25, time.Hour)
if len(m.Peers()) != 2 {
t.Fatal("should have exactly two peers in the address book")
}
testHas(t, []ma.Multiaddr{ma11, ma12, ma13}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma24, ma25}, m.Addrs(id2))
@ -164,8 +177,18 @@ func TestAddressesExpire(t *testing.T) {
testHas(t, nil, m.Addrs(id2))
}
func TestClearWorks(t *testing.T) {
func TestAddressesExpire(t *testing.T) {
t.Log("AddrManager")
m1 := &AddrManager{}
testAddressesExpire(t, m1)
t.Log("AddrManagerBadger")
m2, closer := setupBadgerAddrManager(t)
defer closer()
testAddressesExpire(t, m2)
}
func testClearWorks(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
@ -174,7 +197,6 @@ func TestClearWorks(t *testing.T) {
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
m := AddrManager{}
m.AddAddr(id1, ma11, time.Hour)
m.AddAddr(id1, ma12, time.Hour)
m.AddAddr(id1, ma13, time.Hour)
@ -191,11 +213,21 @@ func TestClearWorks(t *testing.T) {
testHas(t, nil, m.Addrs(id2))
}
func TestSetNegativeTTLClears(t *testing.T) {
func TestClearWorks(t *testing.T) {
t.Log("AddrManager")
m1 := &AddrManager{}
testClearWorks(t, m1)
t.Log("AddrManagerBadger")
m2, closer := setupBadgerAddrManager(t)
defer closer()
testClearWorks(t, m2)
}
func testSetNegativeTTLClears(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
m := AddrManager{}
m.SetAddr(id1, ma11, time.Hour)
testHas(t, []ma.Multiaddr{ma11}, m.Addrs(id1))
@ -204,8 +236,18 @@ func TestSetNegativeTTLClears(t *testing.T) {
testHas(t, nil, m.Addrs(id1))
}
func TestSetNegativeTTLClears(t *testing.T) {
t.Log("AddrManager")
m1 := &AddrManager{}
testSetNegativeTTLClears(t, m1)
func TestUpdateTTLs(t *testing.T) {
t.Log("AddrManagerBadger")
m2, closer := setupBadgerAddrManager(t)
defer closer()
testSetNegativeTTLClears(t, m2)
}
func testUpdateTTLs(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQM")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
@ -213,8 +255,6 @@ func TestUpdateTTLs(t *testing.T) {
ma21 := MA(t, "/ip4/1.2.3.1/tcp/1121")
ma22 := MA(t, "/ip4/1.2.3.1/tcp/1122")
m := AddrManager{}
// Shouldn't panic.
m.UpdateAddrs(id1, time.Hour, time.Minute)
@ -230,30 +270,52 @@ func TestUpdateTTLs(t *testing.T) {
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
m.UpdateAddrs(id1, time.Hour, time.Millisecond)
m.UpdateAddrs(id1, time.Hour, time.Second)
testHas(t, []ma.Multiaddr{ma11, ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
time.Sleep(time.Millisecond)
time.Sleep(time.Second)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
m.UpdateAddrs(id2, time.Hour, time.Millisecond)
m.UpdateAddrs(id2, time.Hour, time.Second)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma21, ma22}, m.Addrs(id2))
time.Sleep(time.Millisecond)
time.Sleep(time.Second)
testHas(t, []ma.Multiaddr{ma12}, m.Addrs(id1))
testHas(t, []ma.Multiaddr{ma22}, m.Addrs(id2))
}
func TestNilAddrsDontBreak(t *testing.T) {
func TestUpdateTTLs(t *testing.T) {
t.Log("AddrManager")
m1 := &AddrManager{}
testUpdateTTLs(t, m1)
t.Log("AddrManagerBadger")
m2, closer := setupBadgerAddrManager(t)
defer closer()
testUpdateTTLs(t, m2)
}
func testNilAddrsDontBreak(t *testing.T, m AddrBook) {
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
m := AddrManager{}
m.SetAddr(id1, nil, time.Hour)
m.AddAddr(id1, nil, time.Hour)
}
func TestNilAddrsDontBreak(t *testing.T) {
t.Log("AddrManager")
m1 := &AddrManager{}
testNilAddrsDontBreak(t, m1)
t.Log("OK")
t.Log("AddrManagerBadger")
m2, closer := setupBadgerAddrManager(t)
defer closer()
testNilAddrsDontBreak(t, m2)
t.Log("OK")
}