diff --git a/addrmanager_badger.go b/addr_manager_badger.go similarity index 61% rename from addrmanager_badger.go rename to addr_manager_badger.go index 55ebbec..03efa90 100644 --- a/addrmanager_badger.go +++ b/addr_manager_badger.go @@ -6,7 +6,7 @@ import ( "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" - "github.com/hashicorp/golang-lru" + //"github.com/hashicorp/golang-lru" "github.com/dgraph-io/badger" "github.com/multiformats/go-multihash" "encoding/gob" @@ -15,7 +15,7 @@ import ( ) type addrmanager_badger struct { - cache *lru.Cache + //cache *lru.Cache db *badger.DB } @@ -24,6 +24,14 @@ type addrentry struct { TTL time.Duration } +func NewBadgerAddrManager() *addrmanager_badger { + db, err := badger.Open(badger.DefaultOptions) + if err != nil { + panic(err) + } + return &addrmanager_badger{db: db} +} + func (mgr *addrmanager_badger) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) { mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl) } @@ -66,6 +74,18 @@ func createUniqueKey(p *peer.ID, addr *ma.Multiaddr, ttl time.Duration) ([]byte, return append(prefix, addrHash...), nil } +func addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, txn *badger.Txn) { + for _, addr := range addrs { + key, err := createUniqueKey(&p, &addr, ttl) + if err != nil { + log.Error(err) + txn.Discard() + return + } + txn.SetWithTTL(key, addr.Bytes(), ttl) + } +} + func (mgr *addrmanager_badger) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { // if ttl is zero, exit. nothing to do. if ttl <= 0 { @@ -76,17 +96,12 @@ func (mgr *addrmanager_badger) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl tim txn := mgr.db.NewTransaction(true) defer txn.Discard() - for _, addr := range addrs { - key, err := createUniqueKey(&p, &addr, ttl) - if err != nil { - log.Error(err) - return - } - txn.SetWithTTL(key, addr.Bytes(), ttl) - } + addAddrs(p, addrs, ttl, txn) txn.Commit(func (err error) { - log.Error(err) + if err != nil { + log.Error(err) + } }) } @@ -102,29 +117,93 @@ func (mgr *addrmanager_badger) UpdateAddrs(p peer.ID, oldTTL time.Duration, newT prefix, err := createKeyPrefix(&p, oldTTL) if err != nil { log.Error(err) + return } txn := mgr.db.NewTransaction(true) + defer txn.Discard() opts := badger.DefaultIteratorOptions iter := txn.NewIterator(opts) + var addrs []ma.Multiaddr iter.Seek(prefix) - for iter.Valid() && iter.ValidForPrefix(prefix) { + for iter.ValidForPrefix(prefix) { item := iter.Item() - // TODO: + addrbytes, err := item.Value() + if err != nil { + log.Error(err) + return + } + addrs = append(addrs, ma.Cast(addrbytes)) iter.Next() } + addAddrs(p, addrs, newTTL, txn) + + txn.Commit(func (err error) { + if err != nil { + log.Error(err) + } + }) } func (mgr *addrmanager_badger) Addrs(p peer.ID) []ma.Multiaddr { - panic("implement me") + txn := mgr.db.NewTransaction(false) + defer txn.Discard() + + prefix := []byte(p) + opts := badger.DefaultIteratorOptions + iter := txn.NewIterator(opts) + iter.Seek(prefix) + + var addrs []ma.Multiaddr + + for iter.ValidForPrefix(prefix) { + item := iter.Item() + + if !item.IsDeletedOrExpired() { + value, err := item.Value() + if err != nil { + log.Error(err) + } else { + addrs = append(addrs, ma.Cast(value)) + } + } + + iter.Next() + } + + txn.Commit(nil) + + return addrs } -func (mgr *addrmanager_badger) AddrStream(context.Context, peer.ID) <-chan ma.Multiaddr { - panic("implement me") +func (mgr *addrmanager_badger) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Multiaddr { + addrs := make(chan ma.Multiaddr) + + //mgr.db.View(func (txn *badger.Txn) error { + // defer close(addrs) + // + // return nil + //}) + + return addrs } func (mgr *addrmanager_badger) ClearAddrs(p peer.ID) { - panic("implement me") + err := mgr.db.Update(func (txn *badger.Txn) error { + iter := txn.NewIterator(badger.DefaultIteratorOptions) + prefix := []byte(p) + iter.Seek(prefix) + + for iter.ValidForPrefix(prefix) { + txn.Delete(iter.Item().Key()) + iter.Next() + } + + return nil + }) + if err != nil { + log.Error(err) + } }