From 9df5e4fcafd2efaad44185b99ae9ea92b72fa7c7 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 2 Apr 2020 20:44:44 -0700 Subject: [PATCH] fix: don't drop bytes in the insecure transport gogo's varint reader buffers internally. If you use it then throw it away, you'll drop data. This commit reverts to using msgio, but uses the varint reader/writers instead. --- go.mod | 1 + go.sum | 5 +++-- sec/insecure/insecure.go | 38 ++++++++++++++++++++++---------------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index a7736c2..8afc0ec 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-flow-metrics v0.0.3 + github.com/libp2p/go-msgio v0.0.4 github.com/libp2p/go-openssl v0.0.4 github.com/minio/sha256-simd v0.1.1 github.com/mr-tron/base58 v1.1.3 diff --git a/go.sum b/go.sum index facec19..e5a2103 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU= github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= -github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= -github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -40,10 +38,13 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= +github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA= +github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg= github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= diff --git a/sec/insecure/insecure.go b/sec/insecure/insecure.go index b9a9394..1f40924 100644 --- a/sec/insecure/insecure.go +++ b/sec/insecure/insecure.go @@ -9,10 +9,9 @@ import ( "io" "net" - gogoio "github.com/gogo/protobuf/io" - "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/sec" + "github.com/libp2p/go-msgio" ci "github.com/libp2p/go-libp2p-core/crypto" pb "github.com/libp2p/go-libp2p-core/sec/insecure/pb" @@ -183,27 +182,34 @@ func (ic *Conn) runHandshakeSync() error { // read and write a message at the same time. func readWriteMsg(rw io.ReadWriter, out *pb.Exchange) (*pb.Exchange, error) { - const maxMsgSize = 1 << 16 - r := gogoio.NewDelimitedReader(rw, maxMsgSize) - w := gogoio.NewDelimitedWriter(rw) - wresult := make(chan error) - go func() { - wresult <- w.WriteMsg(out) - }() - - inMsg := pb.Exchange{} - err := r.ReadMsg(&inMsg) - - // Always wait for the write to finish. - err2 := <-wresult + const maxMessageSize = 1 << 16 + outBytes, err := out.Marshal() if err != nil { return nil, err } + wresult := make(chan error) + go func() { + w := msgio.NewVarintWriter(rw) + wresult <- w.WriteMsg(outBytes) + }() + + r := msgio.NewVarintReaderSize(rw, maxMessageSize) + msg, err1 := r.ReadMsg() + + // Always wait for the read to finish. + err2 := <-wresult + + if err1 != nil { + return nil, err1 + } if err2 != nil { + r.ReleaseMsg(msg) return nil, err2 } - return &inMsg, err + inMsg := new(pb.Exchange) + err = inMsg.Unmarshal(msg) + return inMsg, err } // LocalPeer returns the local peer ID.