Implement error retry logic for transactions

This commit is contained in:
Cole Brown 2018-06-15 17:48:39 -04:00
parent b550e5b2c6
commit 78cd2c5abd

View File

@ -12,6 +12,9 @@ import (
mh "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash"
) )
// Number of times to retry transactional writes
var dsWriteRetries = 5
// DatastoreAddrManager is an address manager backed by a Datastore with both an // DatastoreAddrManager is an address manager backed by a Datastore with both an
// in-memory TTL manager and an in-memory address stream manager. // in-memory TTL manager and an in-memory address stream manager.
type DatastoreAddrManager struct { type DatastoreAddrManager struct {
@ -75,41 +78,51 @@ func (mgr *DatastoreAddrManager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl t
} }
func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) { func (mgr *DatastoreAddrManager) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, add bool) {
var keys []ds.Key for i := 0; i < dsWriteRetries; i++ {
batch, err := mgr.ds.Batch() // keys to add to the TTL manager
if err != nil { var keys []ds.Key
log.Error(err) batch, err := mgr.ds.Batch()
return
}
defer batch.Commit()
for _, addr := range addrs {
if addr == nil {
continue
}
key, err := peerAddressKey(&p, &addr)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
continue return
}
keys = append(keys, key)
if ttl <= 0 {
batch.Delete(key)
continue
}
has, err := mgr.ds.Has(key)
if err != nil || !has {
mgr.subsManager.BroadcastAddr(p, addr)
} }
if !has || !add { for _, addr := range addrs {
if err := batch.Put(key, addr.Bytes()); err != nil { if addr == nil {
continue
}
key, err := peerAddressKey(&p, &addr)
if err != nil {
log.Error(err) log.Error(err)
continue
}
keys = append(keys, key)
if ttl <= 0 {
batch.Delete(key)
continue
}
has, err := mgr.ds.Has(key)
if err != nil || !has {
mgr.subsManager.BroadcastAddr(p, addr)
}
// Allows us to support AddAddr and SetAddr in one function
if !has || !add {
if err := batch.Put(key, addr.Bytes()); err != nil {
log.Error(err)
}
} }
} }
if err := batch.Commit(); err != nil {
log.Errorf("failed to write addresses for peer %s: %s\n", p.Pretty(), err)
continue
}
mgr.ttlManager.setTTLs(keys, ttl)
return
} }
mgr.ttlManager.setTTLs(keys, ttl) log.Errorf("failed to avoid write conflict for peer %s after %d retries\n", p.Pretty(), dsWriteRetries)
} }
// UpdateAddrs will update any addresses for a given peer and TTL combination to // UpdateAddrs will update any addresses for a given peer and TTL combination to
@ -178,23 +191,30 @@ func (mgr *DatastoreAddrManager) AddrStream(ctx context.Context, p peer.ID) <-ch
// ClearAddrs will delete all known addresses for a peer ID. // ClearAddrs will delete all known addresses for a peer ID.
func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) { func (mgr *DatastoreAddrManager) ClearAddrs(p peer.ID) {
prefix := ds.NewKey(p.Pretty()) prefix := ds.NewKey(p.Pretty())
q := query.Query{Prefix: prefix.String()} for i := 0; i < dsWriteRetries; i++ {
results, err := mgr.ds.Query(q) q := query.Query{Prefix: prefix.String()}
if err != nil { results, err := mgr.ds.Query(q)
log.Error(err) if err != nil {
return log.Error(err)
} return
batch, err := mgr.ds.Batch() }
if err != nil { batch, err := mgr.ds.Batch()
log.Error(err) if err != nil {
return log.Error(err)
} return
defer batch.Commit() }
for result := range results.Next() { for result := range results.Next() {
batch.Delete(ds.NewKey(result.Key)) batch.Delete(ds.NewKey(result.Key))
}
if err = batch.Commit(); err != nil {
log.Errorf("failed to clear addresses for peer %s: %s\n", p.Pretty(), err)
continue
}
mgr.ttlManager.clear(ds.NewKey(p.Pretty()))
return
} }
mgr.ttlManager.clear(ds.NewKey(p.Pretty())) log.Errorf("failed to clear addresses for peer %s after %d attempts\n", p.Pretty(), dsWriteRetries)
} }
// ttlmanager // ttlmanager