Finish initial pass (sans stream) of addr manager

This commit is contained in:
Cole Brown 2018-06-05 16:54:20 -04:00
parent b637498caf
commit 1016f6e956

View File

@ -6,7 +6,7 @@ import (
"github.com/libp2p/go-libp2p-peer" "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/hashicorp/golang-lru" //"github.com/hashicorp/golang-lru"
"github.com/dgraph-io/badger" "github.com/dgraph-io/badger"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"encoding/gob" "encoding/gob"
@ -15,7 +15,7 @@ import (
) )
type addrmanager_badger struct { type addrmanager_badger struct {
cache *lru.Cache //cache *lru.Cache
db *badger.DB db *badger.DB
} }
@ -24,6 +24,14 @@ type addrentry struct {
TTL time.Duration TTL time.Duration
} }
func NewBadgerAddrManager() *addrmanager_badger {
db, err := badger.Open(badger.DefaultOptions)
if err != nil {
panic(err)
}
return &addrmanager_badger{db: db}
}
func (mgr *addrmanager_badger) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { func (mgr *addrmanager_badger) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
} }
@ -66,6 +74,18 @@ func createUniqueKey(p *peer.ID, addr *ma.Multiaddr, ttl time.Duration) ([]byte,
return append(prefix, addrHash...), nil return append(prefix, addrHash...), nil
} }
func addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, txn *badger.Txn) {
for _, addr := range addrs {
key, err := createUniqueKey(&p, &addr, ttl)
if err != nil {
log.Error(err)
txn.Discard()
return
}
txn.SetWithTTL(key, addr.Bytes(), ttl)
}
}
func (mgr *addrmanager_badger) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { func (mgr *addrmanager_badger) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// if ttl is zero, exit. nothing to do. // if ttl is zero, exit. nothing to do.
if ttl <= 0 { if ttl <= 0 {
@ -76,17 +96,12 @@ func (mgr *addrmanager_badger) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl tim
txn := mgr.db.NewTransaction(true) txn := mgr.db.NewTransaction(true)
defer txn.Discard() defer txn.Discard()
for _, addr := range addrs { addAddrs(p, addrs, ttl, txn)
key, err := createUniqueKey(&p, &addr, ttl)
if err != nil {
log.Error(err)
return
}
txn.SetWithTTL(key, addr.Bytes(), ttl)
}
txn.Commit(func (err error) { txn.Commit(func (err error) {
if err != nil {
log.Error(err) log.Error(err)
}
}) })
} }
@ -102,29 +117,93 @@ func (mgr *addrmanager_badger) UpdateAddrs(p peer.ID, oldTTL time.Duration, newT
prefix, err := createKeyPrefix(&p, oldTTL) prefix, err := createKeyPrefix(&p, oldTTL)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return
} }
txn := mgr.db.NewTransaction(true) txn := mgr.db.NewTransaction(true)
defer txn.Discard()
opts := badger.DefaultIteratorOptions opts := badger.DefaultIteratorOptions
iter := txn.NewIterator(opts) iter := txn.NewIterator(opts)
var addrs []ma.Multiaddr
iter.Seek(prefix) iter.Seek(prefix)
for iter.Valid() && iter.ValidForPrefix(prefix) { for iter.ValidForPrefix(prefix) {
item := iter.Item() item := iter.Item()
// TODO: addrbytes, err := item.Value()
if err != nil {
log.Error(err)
return
}
addrs = append(addrs, ma.Cast(addrbytes))
iter.Next() iter.Next()
} }
addAddrs(p, addrs, newTTL, txn)
txn.Commit(func (err error) {
if err != nil {
log.Error(err)
}
})
} }
func (mgr *addrmanager_badger) Addrs(p peer.ID) []ma.Multiaddr { func (mgr *addrmanager_badger) Addrs(p peer.ID) []ma.Multiaddr {
panic("implement me") txn := mgr.db.NewTransaction(false)
defer txn.Discard()
prefix := []byte(p)
opts := badger.DefaultIteratorOptions
iter := txn.NewIterator(opts)
iter.Seek(prefix)
var addrs []ma.Multiaddr
for iter.ValidForPrefix(prefix) {
item := iter.Item()
if !item.IsDeletedOrExpired() {
value, err := item.Value()
if err != nil {
log.Error(err)
} else {
addrs = append(addrs, ma.Cast(value))
}
} }
func (mgr *addrmanager_badger) AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr { iter.Next()
panic("implement me") }
txn.Commit(nil)
return addrs
}
func (mgr *addrmanager_badger) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr {
addrs := make(chan ma.Multiaddr)
//mgr.db.View(func (txn *badger.Txn) error {
// defer close(addrs)
//
// return nil
//})
return addrs
} }
func (mgr *addrmanager_badger) ClearAddrs(p peer.ID) { func (mgr *addrmanager_badger) ClearAddrs(p peer.ID) {
panic("implement me") err := mgr.db.Update(func (txn *badger.Txn) error {
iter := txn.NewIterator(badger.DefaultIteratorOptions)
prefix := []byte(p)
iter.Seek(prefix)
for iter.ValidForPrefix(prefix) {
txn.Delete(iter.Item().Key())
iter.Next()
}
return nil
})
if err != nil {
log.Error(err)
}
} }