Parallel syntax change; SecretConnection implements net.Conn

This commit is contained in:
Jae Kwon 2015-07-15 12:13:10 -07:00
parent 5b41cc4fa5
commit d13a593afd
3 changed files with 93 additions and 80 deletions

View File

@ -27,14 +27,16 @@ func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo,
var peerNodeInfo = new(types.NodeInfo)
var err1 error
var err2 error
Parallel(func() {
var n int64
binary.WriteBinary(ourNodeInfo, conn, &n, &err1)
}, func() {
var n int64
binary.ReadBinary(peerNodeInfo, conn, &n, &err2)
log.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
})
Parallel(
func() {
var n int64
binary.WriteBinary(ourNodeInfo, conn, &n, &err1)
},
func() {
var n int64
binary.ReadBinary(peerNodeInfo, conn, &n, &err2)
log.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
})
if err1 != nil {
return nil, err1
}

View File

@ -7,6 +7,8 @@ import (
"crypto/sha256"
"encoding/binary"
"errors"
"net"
"time"
//"fmt"
"io"
"sync"
@ -17,6 +19,7 @@ import (
acm "github.com/tendermint/tendermint/account"
bm "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common"
)
// 2 + 1024 == 1026 total frame size
@ -25,6 +28,7 @@ const dataMaxSize = 1024
const totalFrameSize = dataMaxSize + dataLenSize
const sealedFrameSize = totalFrameSize + secretbox.Overhead
// Implements net.Conn
type SecretConnection struct {
conn io.ReadWriteCloser
recvBuffer []byte
@ -157,8 +161,16 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
return
}
func (sc *SecretConnection) Close() error {
return sc.conn.Close()
// Implements net.Conn
func (sc *SecretConnection) Close() error { return sc.conn.Close() }
func (sc *SecretConnection) LocalAddr() net.Addr { return sc.conn.(net.Conn).LocalAddr() }
func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.conn.(net.Conn).RemoteAddr() }
func (sc *SecretConnection) SetDeadline(t time.Time) error { return sc.conn.(net.Conn).SetDeadline(t) }
func (sc *SecretConnection) SetReadDeadline(t time.Time) error {
return sc.conn.(net.Conn).SetReadDeadline(t)
}
func (sc *SecretConnection) SetWriteDeadline(t time.Time) error {
return sc.conn.(net.Conn).SetWriteDeadline(t)
}
func genEphKeys() (ephPub, ephPriv *[32]byte) {
@ -246,28 +258,23 @@ type authSigMessage struct {
func shareAuthSignature(sc *SecretConnection, pubKey acm.PubKeyEd25519, signature acm.SignatureEd25519) (acm.PubKeyEd25519, acm.SignatureEd25519, error) {
var recvMsg authSigMessage
var err1, err2 error
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
msgBytes := bm.BinaryBytes(authSigMessage{pubKey, signature})
_, err1 = sc.Write(msgBytes)
}()
Parallel(
func() {
msgBytes := bm.BinaryBytes(authSigMessage{pubKey, signature})
_, err1 = sc.Write(msgBytes)
},
func() {
// NOTE relies on atomicity of small data.
readBuffer := make([]byte, dataMaxSize)
_, err2 = sc.Read(readBuffer)
if err2 != nil {
return
}
n := int64(0) // not used.
recvMsg = bm.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), &n, &err2).(authSigMessage)
})
go func() {
defer wg.Done()
// NOTE relies on atomicity of small data.
readBuffer := make([]byte, dataMaxSize)
_, err2 = sc.Read(readBuffer)
if err2 != nil {
return
}
n := int64(0) // not used.
recvMsg = bm.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), &n, &err2).(authSigMessage)
}()
wg.Wait()
if err1 != nil {
return nil, nil, err1
}

View File

@ -37,29 +37,31 @@ func makeSecretConnPair(tb testing.TB) (fooSecConn, barSecConn *SecretConnection
barPrvKey := acm.PrivKeyEd25519(CRandBytes(32))
barPubKey := barPrvKey.PubKey().(acm.PubKeyEd25519)
Parallel(func() {
var err error
fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey)
if err != nil {
tb.Errorf("Failed to establish SecretConnection for foo: %v", err)
return
}
if !bytes.Equal(fooSecConn.RemotePubKey(), barPubKey) {
tb.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v",
barPubKey, fooSecConn.RemotePubKey())
}
}, func() {
var err error
barSecConn, err = MakeSecretConnection(barConn, barPrvKey)
if barSecConn == nil {
tb.Errorf("Failed to establish SecretConnection for bar: %v", err)
return
}
if !bytes.Equal(barSecConn.RemotePubKey(), fooPubKey) {
tb.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v",
fooPubKey, barSecConn.RemotePubKey())
}
})
Parallel(
func() {
var err error
fooSecConn, err = MakeSecretConnection(fooConn, fooPrvKey)
if err != nil {
tb.Errorf("Failed to establish SecretConnection for foo: %v", err)
return
}
if !bytes.Equal(fooSecConn.RemotePubKey(), barPubKey) {
tb.Errorf("Unexpected fooSecConn.RemotePubKey. Expected %v, got %v",
barPubKey, fooSecConn.RemotePubKey())
}
},
func() {
var err error
barSecConn, err = MakeSecretConnection(barConn, barPrvKey)
if barSecConn == nil {
tb.Errorf("Failed to establish SecretConnection for bar: %v", err)
return
}
if !bytes.Equal(barSecConn.RemotePubKey(), fooPubKey) {
tb.Errorf("Unexpected barSecConn.RemotePubKey. Expected %v, got %v",
fooPubKey, barSecConn.RemotePubKey())
}
})
return
}
@ -92,35 +94,37 @@ func TestSecretConnectionReadWrite(t *testing.T) {
return
}
// In parallel, handle reads and writes
Parallel(func() {
// Node writes
for _, nodeWrite := range nodeWrites {
n, err := nodeSecretConn.Write([]byte(nodeWrite))
if err != nil {
t.Errorf("Failed to write to nodeSecretConn: %v", err)
return
Parallel(
func() {
// Node writes
for _, nodeWrite := range nodeWrites {
n, err := nodeSecretConn.Write([]byte(nodeWrite))
if err != nil {
t.Errorf("Failed to write to nodeSecretConn: %v", err)
return
}
if n != len(nodeWrite) {
t.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n)
return
}
}
if n != len(nodeWrite) {
t.Errorf("Failed to write all bytes. Expected %v, wrote %v", len(nodeWrite), n)
return
nodeConn.PipeWriter.Close()
},
func() {
// Node reads
readBuffer := make([]byte, dataMaxSize)
for {
n, err := nodeSecretConn.Read(readBuffer)
if err == io.EOF {
return
} else if err != nil {
t.Errorf("Failed to read from nodeSecretConn: %v", err)
return
}
*nodeReads = append(*nodeReads, string(readBuffer[:n]))
}
}
nodeConn.PipeWriter.Close()
}, func() {
// Node reads
readBuffer := make([]byte, dataMaxSize)
for {
n, err := nodeSecretConn.Read(readBuffer)
if err == io.EOF {
return
} else if err != nil {
t.Errorf("Failed to read from nodeSecretConn: %v", err)
return
}
*nodeReads = append(*nodeReads, string(readBuffer[:n]))
}
nodeConn.PipeReader.Close()
})
nodeConn.PipeReader.Close()
})
}
}