feat: plumb through datastore contexts (#176)

This commit is contained in:
Gus Eggert 2021-10-28 16:32:18 -04:00 committed by GitHub
parent 986d5ceedb
commit 566ed64074
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 48 additions and 47 deletions

6
go.mod
View File

@ -7,9 +7,9 @@ retract v0.2.9 // Contains backwards-incompatible changes. Use v0.3.0 instead.
require ( require (
github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ds-badger v0.2.7 github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/go-ds-leveldb v0.4.2 github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log v1.0.5 github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p-core v0.8.6 github.com/libp2p/go-libp2p-core v0.8.6

17
go.sum
View File

@ -60,14 +60,14 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY= github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= github.com/ipfs/go-datastore v0.5.0 h1:rQicVCEacWyk4JZ6G5bD9TKR7lZEG1MWcG7UdWYrFAU=
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk=
github.com/ipfs/go-datastore v0.4.5 h1:cwOUcGMLdLPWgu3SlrCckCMznaGADbPqE0r8h768/Dg= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-datastore v0.4.5/go.mod h1:eXTcaaiN6uOlVCLS9GjJUJtlvJfM3xk23w3fyfrmmJs= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.2.7 h1:ju5REfIm+v+wgVnQ19xGLYPHYHbYLR6qJfmMbCDSK1I= github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro=
github.com/ipfs/go-ds-badger v0.2.7/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA= github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek=
github.com/ipfs/go-ds-leveldb v0.4.2 h1:QmQoAJ9WkPMUfBLnu1sBVy0xWWlJPg0m4kRAiJL9iaw= github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8=
github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo= github.com/ipfs/go-log v1.0.5/go.mod h1:j0b8ZoR+7+R99LD9jZ6+AJsrzkPbSXbZfGakb5JPtIo=
@ -75,7 +75,6 @@ github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscw
github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk= github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=

View File

@ -50,7 +50,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID))) key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(r.Id.ID)))
if len(r.Addrs) == 0 { if len(r.Addrs) == 0 {
if err = write.Delete(key); err == nil { if err = write.Delete(context.TODO(), key); err == nil {
r.dirty = false r.dirty = false
} }
return err return err
@ -60,7 +60,7 @@ func (r *addrsRecord) flush(write ds.Write) (err error) {
if err != nil { if err != nil {
return err return err
} }
if err = write.Put(key, data); err != nil { if err = write.Put(context.TODO(), key, data); err != nil {
return err return err
} }
// write succeeded; record is no longer dirty. // write succeeded; record is no longer dirty.
@ -223,7 +223,7 @@ func (ab *dsAddrBook) loadRecord(id peer.ID, cache bool, update bool) (pr *addrs
pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} pr = &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id))) key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(id)))
data, err := ab.ds.Get(key) data, err := ab.ds.Get(context.TODO(), key)
switch err { switch err {
case ds.ErrNotFound: case ds.ErrNotFound:
@ -446,7 +446,7 @@ func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
ab.cache.Remove(p) ab.cache.Remove(p)
key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p))) key := addrBookBase.ChildString(b32.RawStdEncoding.EncodeToString([]byte(p)))
if err := ab.ds.Delete(key); err != nil { if err := ab.ds.Delete(context.TODO(), key); err != nil {
log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err) log.Errorf("failed to clear addresses for peer %s: %v", p.Pretty(), err)
} }
} }

View File

@ -152,7 +152,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
if err != nil { if err != nil {
log.Warnf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err) log.Warnf("failed while %s record with GC key: %v, err: %v; deleting", msg, key, err)
} }
if err = batch.Delete(key); err != nil { if err = batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err) log.Warnf("failed to delete corrupt GC lookahead entry: %v, err: %v", key, err)
} }
} }
@ -160,20 +160,20 @@ func (gc *dsAddrBookGc) purgeLookahead() {
// This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit // This function drops a GC key if the entry is cleaned correctly. It may reschedule another visit
// if the next earliest expiry falls within the current window again. // if the next earliest expiry falls within the current window again.
dropOrReschedule := func(key ds.Key, ar *addrsRecord) { dropOrReschedule := func(key ds.Key, ar *addrsRecord) {
if err := batch.Delete(key); err != nil { if err := batch.Delete(context.TODO(), key); err != nil {
log.Warnf("failed to delete lookahead entry: %v, err: %v", key, err) log.Warnf("failed to delete lookahead entry: %v, err: %v", key, err)
} }
// re-add the record if it needs to be visited again in this window. // re-add the record if it needs to be visited again in this window.
if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd { if len(ar.Addrs) != 0 && ar.Addrs[0].Expiry <= gc.currWindowEnd {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name())) gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", ar.Addrs[0].Expiry, key.Name()))
if err := batch.Put(gcKey, []byte{}); err != nil { if err := batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed to add new GC key: %v, err: %v", gcKey, err) log.Warnf("failed to add new GC key: %v, err: %v", gcKey, err)
} }
} }
} }
results, err := gc.ab.ds.Query(purgeLookaheadQuery) results, err := gc.ab.ds.Query(context.TODO(), purgeLookaheadQuery)
if err != nil { if err != nil {
log.Warnf("failed while fetching entries to purge: %v", err) log.Warnf("failed while fetching entries to purge: %v", err)
return return
@ -228,7 +228,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
// otherwise, fetch it from the store, clean it and flush it. // otherwise, fetch it from the store, clean it and flush it.
entryKey := addrBookBase.ChildString(gcKey.Name()) entryKey := addrBookBase.ChildString(gcKey.Name())
val, err := gc.ab.ds.Get(entryKey) val, err := gc.ab.ds.Get(context.TODO(), entryKey)
if err != nil { if err != nil {
// captures all errors, including ErrNotFound. // captures all errors, including ErrNotFound.
dropInError(gcKey, err, "fetching entry") dropInError(gcKey, err, "fetching entry")
@ -248,7 +248,7 @@ func (gc *dsAddrBookGc) purgeLookahead() {
dropOrReschedule(gcKey, record) dropOrReschedule(gcKey, record)
} }
if err = batch.Commit(); err != nil { if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err) log.Warnf("failed to commit GC purge batch: %v", err)
} }
} }
@ -268,7 +268,7 @@ func (gc *dsAddrBookGc) purgeStore() {
log.Warnf("failed while creating batch to purge GC entries: %v", err) log.Warnf("failed while creating batch to purge GC entries: %v", err)
} }
results, err := gc.ab.ds.Query(purgeStoreQuery) results, err := gc.ab.ds.Query(context.TODO(), purgeStoreQuery)
if err != nil { if err != nil {
log.Warnf("failed while opening iterator: %v", err) log.Warnf("failed while opening iterator: %v", err)
return return
@ -294,7 +294,7 @@ func (gc *dsAddrBookGc) purgeStore() {
gc.ab.cache.Remove(id) gc.ab.cache.Remove(id)
} }
if err = batch.Commit(); err != nil { if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err) log.Warnf("failed to commit GC purge batch: %v", err)
} }
} }
@ -321,7 +321,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
var id peer.ID var id peer.ID
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}}
results, err := gc.ab.ds.Query(populateLookaheadQuery) results, err := gc.ab.ds.Query(context.TODO(), populateLookaheadQuery)
if err != nil { if err != nil {
log.Warnf("failed while querying to populate lookahead GC window: %v", err) log.Warnf("failed while querying to populate lookahead GC window: %v", err)
return return
@ -354,7 +354,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
continue continue
} }
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32)) gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", cached.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil { if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err) log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
} }
cached.RUnlock() cached.RUnlock()
@ -363,7 +363,7 @@ func (gc *dsAddrBookGc) populateLookahead() {
record.Reset() record.Reset()
val, err := gc.ab.ds.Get(ds.RawKey(result.Key)) val, err := gc.ab.ds.Get(context.TODO(), ds.RawKey(result.Key))
if err != nil { if err != nil {
log.Warnf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err) log.Warnf("failed which getting record from store for peer: %v, err: %v", id.Pretty(), err)
continue continue
@ -374,13 +374,13 @@ func (gc *dsAddrBookGc) populateLookahead() {
} }
if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until { if len(record.Addrs) > 0 && record.Addrs[0].Expiry <= until {
gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32)) gcKey := gcLookaheadBase.ChildString(fmt.Sprintf("%d/%s", record.Addrs[0].Expiry, idb32))
if err = batch.Put(gcKey, []byte{}); err != nil { if err = batch.Put(context.TODO(), gcKey, []byte{}); err != nil {
log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err) log.Warnf("failed while inserting GC entry for peer: %v, err: %v", id.Pretty(), err)
} }
} }
} }
if err = batch.Commit(); err != nil { if err = batch.Commit(context.TODO()); err != nil {
log.Warnf("failed to commit GC lookahead batch: %v", err) log.Warnf("failed to commit GC lookahead batch: %v", err)
} }

View File

@ -1,6 +1,7 @@
package pstoreds package pstoreds
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -18,7 +19,7 @@ type testProbe struct {
} }
func (tp *testProbe) countLookaheadEntries() (i int) { func (tp *testProbe) countLookaheadEntries() (i int) {
results, err := tp.ab.(*dsAddrBook).ds.Query(lookaheadQuery) results, err := tp.ab.(*dsAddrBook).ds.Query(context.Background(), lookaheadQuery)
if err != nil { if err != nil {
tp.t.Fatal(err) tp.t.Fatal(err)
} }

View File

@ -1,6 +1,7 @@
package pstoreds package pstoreds
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
@ -23,7 +24,7 @@ type cyclicBatch struct {
} }
func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) { func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) {
batch, err := ds.Batch() batch, err := ds.Batch(context.TODO())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -39,36 +40,36 @@ func (cb *cyclicBatch) cycle() (err error) {
return nil return nil
} }
// commit and renew the batch. // commit and renew the batch.
if err = cb.Batch.Commit(); err != nil { if err = cb.Batch.Commit(context.TODO()); err != nil {
return fmt.Errorf("failed while committing cyclic batch: %w", err) return fmt.Errorf("failed while committing cyclic batch: %w", err)
} }
if cb.Batch, err = cb.ds.Batch(); err != nil { if cb.Batch, err = cb.ds.Batch(context.TODO()); err != nil {
return fmt.Errorf("failed while renewing cyclic batch: %w", err) return fmt.Errorf("failed while renewing cyclic batch: %w", err)
} }
return nil return nil
} }
func (cb *cyclicBatch) Put(key ds.Key, val []byte) error { func (cb *cyclicBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
if err := cb.cycle(); err != nil { if err := cb.cycle(); err != nil {
return err return err
} }
cb.pending++ cb.pending++
return cb.Batch.Put(key, val) return cb.Batch.Put(ctx, key, val)
} }
func (cb *cyclicBatch) Delete(key ds.Key) error { func (cb *cyclicBatch) Delete(ctx context.Context, key ds.Key) error {
if err := cb.cycle(); err != nil { if err := cb.cycle(); err != nil {
return err return err
} }
cb.pending++ cb.pending++
return cb.Batch.Delete(key) return cb.Batch.Delete(ctx, key)
} }
func (cb *cyclicBatch) Commit() error { func (cb *cyclicBatch) Commit(ctx context.Context) error {
if cb.Batch == nil { if cb.Batch == nil {
return errors.New("cyclic batch is closed") return errors.New("cyclic batch is closed")
} }
if err := cb.Batch.Commit(); err != nil { if err := cb.Batch.Commit(ctx); err != nil {
return err return err
} }
cb.pending = 0 cb.pending = 0

View File

@ -36,7 +36,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(pubSuffix) key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(pubSuffix)
var pk ic.PubKey var pk ic.PubKey
if value, err := kb.ds.Get(key); err == nil { if value, err := kb.ds.Get(context.TODO(), key); err == nil {
pk, err = ic.UnmarshalPublicKey(value) pk, err = ic.UnmarshalPublicKey(value)
if err != nil { if err != nil {
log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err) log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err)
@ -56,7 +56,7 @@ func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey {
log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err) log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err)
return nil return nil
} }
err = kb.ds.Put(key, pkb) err = kb.ds.Put(context.TODO(), key, pkb)
if err != nil { if err != nil {
log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err) log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err)
return nil return nil
@ -80,7 +80,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err) log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err)
return err return err
} }
err = kb.ds.Put(key, val) err = kb.ds.Put(context.TODO(), key, val)
if err != nil { if err != nil {
log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err) log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err)
} }
@ -89,7 +89,7 @@ func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error {
func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey { func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey {
key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(privSuffix) key := kbBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).Child(privSuffix)
value, err := kb.ds.Get(key) value, err := kb.ds.Get(context.TODO(), key)
if err != nil { if err != nil {
log.Errorf("error while fetching privkey from datastore for peer %s: %s\n", p.Pretty(), err) log.Errorf("error while fetching privkey from datastore for peer %s: %s\n", p.Pretty(), err)
return nil return nil
@ -116,7 +116,7 @@ func (kb *dsKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error {
log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err) log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err)
return err return err
} }
err = kb.ds.Put(key, val) err = kb.ds.Put(context.TODO(), key, val)
if err != nil { if err != nil {
log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err) log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err)
} }

View File

@ -45,7 +45,7 @@ func (pm *dsPeerMetadata) Get(p peer.ID, key string) (interface{}, error) {
return nil, err return nil, err
} }
k := pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).ChildString(key) k := pmBase.ChildString(base32.RawStdEncoding.EncodeToString([]byte(p))).ChildString(key)
value, err := pm.ds.Get(k) value, err := pm.ds.Get(context.TODO(), k)
if err != nil { if err != nil {
if err == ds.ErrNotFound { if err == ds.ErrNotFound {
err = pstore.ErrNotFound err = pstore.ErrNotFound
@ -69,5 +69,5 @@ func (pm *dsPeerMetadata) Put(p peer.ID, key string, val interface{}) error {
if err := gob.NewEncoder(&buf).Encode(&val); err != nil { if err := gob.NewEncoder(&buf).Encode(&val); err != nil {
return err return err
} }
return pm.ds.Put(k, buf.Bytes()) return pm.ds.Put(context.TODO(), k, buf.Bytes())
} }

View File

@ -95,7 +95,7 @@ func uniquePeerIds(ds ds.Datastore, prefix ds.Key, extractor func(result query.R
err error err error
) )
if results, err = ds.Query(q); err != nil { if results, err = ds.Query(context.TODO(), q); err != nil {
log.Error(err) log.Error(err)
return nil, err return nil, err
} }