1
0
mirror of https://github.com/libp2p/go-libp2p-peerstore.git synced 2025-03-24 13:10:06 +08:00

introduce struct for persisted value.

This commit is contained in:
Raúl Kripalani 2018-09-18 19:01:24 +01:00
parent 5b9ad98cc2
commit 0f81bdf419

View File

@ -1,7 +1,6 @@
package pstoreds
import (
"bytes"
"context"
"encoding/binary"
"errors"
@ -52,6 +51,24 @@ type cacheEntry struct {
addrs []ma.Multiaddr
}
type addrRecord struct {
ttl time.Duration
addr ma.Multiaddr
}
func (ar *addrRecord) MarshalBinary() ([]byte, error) {
ttlB := make([]byte, 8)
binary.LittleEndian.PutUint64(ttlB, uint64(ar.ttl))
return append(ttlB, ar.addr.Bytes()...), nil
}
func (ar *addrRecord) UnmarshalBinary(b []byte) error {
ar.ttl = time.Duration(binary.LittleEndian.Uint64(b))
// this had been serialized by us, no need to check for errors
ar.addr, _ = ma.NewMultiaddrBytes(b[8:])
return nil
}
// NewAddrBook initializes a new address book given a
// Datastore instance, a context for managing the TTL manager,
// and the interval at which the TTL manager should sweep the Datastore.
@ -194,11 +211,8 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Du
err error
existed = make([]bool, len(keys))
exp = time.Now().Add(ttl)
ttlB = make([]byte, 8)
)
binary.LittleEndian.PutUint64(ttlB, uint64(ttl))
txn := mgr.ds.NewTransaction(false)
defer txn.Discard()
@ -230,8 +244,11 @@ func (mgr *dsAddrBook) dbInsert(keys []ds.Key, addrs []ma.Multiaddr, ttl time.Du
continue
}
// format: bytes(ttl) || bytes(multiaddr)
value := append(ttlB, addrs[i].Bytes()...)
r := &addrRecord{
ttl: ttl,
addr: addrs[i],
}
value, _ := r.MarshalBinary()
if err = ttltxn.PutWithTTL(key, value, ttl); err != nil {
log.Errorf("transaction failed and aborted while setting key: %s, cause: %v", key.String(), err)
return nil, err
@ -267,16 +284,12 @@ func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time.
func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error {
var (
prefix = ds.NewKey(p.Pretty())
q = query.Query{Prefix: prefix.String(), KeysOnly: false}
oldb, newb = make([]byte, 8), make([]byte, 8)
results query.Results
err error
prefix = ds.NewKey(p.Pretty())
q = query.Query{Prefix: prefix.String(), KeysOnly: false}
results query.Results
err error
)
binary.LittleEndian.PutUint64(oldb, uint64(oldTTL))
binary.LittleEndian.PutUint64(newb, uint64(newTTL))
txn := mgr.ds.NewTransaction(false)
defer txn.Discard()
@ -286,13 +299,16 @@ func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.
defer results.Close()
ttltxn := txn.(ds.TTLDatastore)
r := &addrRecord{}
for result := range results.Next() {
// format: bytes(ttl) || bytes(multiaddr)
if curr := result.Value[:8]; !bytes.Equal(curr, oldb) {
r.UnmarshalBinary(result.Value)
if r.ttl != oldTTL {
continue
}
newVal := append(newb, result.Value[8:]...)
if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), newVal, newTTL); err != nil {
r.ttl = newTTL
value, _ := r.MarshalBinary()
if err = ttltxn.PutWithTTL(ds.RawKey(result.Key), value, newTTL); err != nil {
return err
}
}
@ -336,12 +352,12 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr {
defer results.Close()
var addrs []ma.Multiaddr
var r addrRecord
// used to set the expiration for the entire cache entry
earliestExp := maxTime
for result := range results.Next() {
// extract multiaddr from value: bytes(ttl) || bytes(multiaddr)
if addr, err := ma.NewMultiaddrBytes(result.Value[8:]); err == nil {
addrs = append(addrs, addr)
if err = r.UnmarshalBinary(result.Value); err == nil {
addrs = append(addrs, r.addr)
}
if exp := result.Expiration; !exp.IsZero() && exp.Before(earliestExp) {