diff --git a/addr_manager_ds.go b/addr_manager_ds.go index 7cb0b60..71ed061 100644 --- a/addr_manager_ds.go +++ b/addr_manager_ds.go @@ -12,6 +12,9 @@ import ( mh "github.com/multiformats/go-multihash" ) +// Number of times to retry transactional writes +var dsWriteRetries = 5 + // DatastoreAddrManager is an address manager backed by a Datastore with both an // in-memory TTL manager and an in-memory address stream manager. type DatastoreAddrManager struct { @@ -75,41 +78,51 @@ func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl t } func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) { - var keys []ds.Key - batch, err := mgr.ds.Batch() - if err != nil { - log.Error(err) - return - } - defer batch.Commit() - for _, addr := range addrs { - if addr == nil { - continue - } - - key, err := peerAddressKey(&p, &addr) + for i := 0; i < dsWriteRetries; i++ { + // keys to add to the TTL manager + var keys []ds.Key + batch, err := mgr.ds.Batch() if err != nil { log.Error(err) - continue - } - keys = append(keys, key) - - if ttl <= 0 { - batch.Delete(key) - continue - } - has, err := mgr.ds.Has(key) - if err != nil || !has { - mgr.subsManager.BroadcastAddr(p, addr) + return } - if !has || !add { - if err := batch.Put(key, addr.Bytes()); err != nil { + for _, addr := range addrs { + if addr == nil { + continue + } + + key, err := peerAddressKey(&p, &addr) + if err != nil { log.Error(err) + continue + } + keys = append(keys, key) + + if ttl <= 0 { + batch.Delete(key) + continue + } + has, err := mgr.ds.Has(key) + if err != nil || !has { + mgr.subsManager.BroadcastAddr(p, addr) + } + + // Allows us to support AddAddr and SetAddr in one function + if !has || !add { + if err := batch.Put(key, addr.Bytes()); err != nil { + log.Error(err) + } } } + if err := batch.Commit(); err != nil { + log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err) + continue + } + mgr.ttlManager.setTTLs(keys, ttl) + return } - mgr.ttlManager.setTTLs(keys, ttl) + log.Errorf("failed to avoid write conflict for peer %s after %d retries\n", p.Pretty(), dsWriteRetries) } // UpdateAddrs will update any addresses for a given peer and TTL combination to @@ -178,23 +191,30 @@ func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-ch // ClearAddrs will delete all known addresses for a peer ID. func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) { prefix := ds.NewKey(p.Pretty()) - q := query.Query{Prefix: prefix.String()} - results, err := mgr.ds.Query(q) - if err != nil { - log.Error(err) - return - } - batch, err := mgr.ds.Batch() - if err != nil { - log.Error(err) - return - } - defer batch.Commit() + for i := 0; i < dsWriteRetries; i++ { + q := query.Query{Prefix: prefix.String()} + results, err := mgr.ds.Query(q) + if err != nil { + log.Error(err) + return + } + batch, err := mgr.ds.Batch() + if err != nil { + log.Error(err) + return + } - for result := range results.Next() { - batch.Delete(ds.NewKey(result.Key)) + for result := range results.Next() { + batch.Delete(ds.NewKey(result.Key)) + } + if err = batch.Commit(); err != nil { + log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err) + continue + } + mgr.ttlManager.clear(ds.NewKey(p.Pretty())) + return } - mgr.ttlManager.clear(ds.NewKey(p.Pretty())) + log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries) } // ttlmanager