From 5205b2f19b9173580f9a9e727d74e202b8dd0f67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 14 Apr 2015 12:12:47 +0300 Subject: [PATCH] whisper: fix anonymous broadcast drop, add broadcast tests --- whisper/envelope.go | 7 ++++- whisper/whisper.go | 58 +++++++++++++++++++++------------- whisper/whisper_test.go | 70 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 105 insertions(+), 30 deletions(-) diff --git a/whisper/envelope.go b/whisper/envelope.go index 93e3ea1a3..9daaf6490 100644 --- a/whisper/envelope.go +++ b/whisper/envelope.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/ecies" "github.com/ethereum/go-ethereum/rlp" ) @@ -96,10 +97,14 @@ func (self *Envelope) Open(key *ecdsa.PrivateKey) (msg *Message, err error) { if key == nil { return message, nil } - switch message.decrypt(key) { + err = message.decrypt(key) + switch err { case nil: return message, nil + case ecies.ErrInvalidPublicKey: // Payload isn't encrypted + return message, err + default: return nil, fmt.Errorf("unable to open envelope, decrypt failed: %v", err) } diff --git a/whisper/whisper.go b/whisper/whisper.go index f3b539d2c..a4ec943e8 100644 --- a/whisper/whisper.go +++ b/whisper/whisper.go @@ -136,8 +136,8 @@ func (self *Whisper) Messages(id int) (messages []*Message) { filter := self.filters.Get(id) if filter != nil { for _, e := range self.messages { - if msg, key := self.open(e); msg != nil { - f := createFilter(msg, e.Topics, key) + if msg := self.open(e); msg != nil { + f := createFilter(msg, e.Topics) if self.filters.Match(filter, f) { messages = append(messages, msg) } @@ -251,31 +251,45 @@ func (self *Whisper) envelopes() (envelopes []*Envelope) { return } -func (self *Whisper) postEvent(envelope *Envelope) { - if message, key := self.open(envelope); message != nil { - self.filters.Notify(createFilter(message, envelope.Topics, key), message) - } -} - -func (self *Whisper) open(envelope *Envelope) (*Message, *ecdsa.PrivateKey) { - for _, key := range self.keys { - if message, err := envelope.Open(key); err == nil || (err != nil && err == ecies.ErrInvalidPublicKey) { - message.To = &key.PublicKey - - return message, key - } - } - - return nil, nil -} - func (self *Whisper) Protocol() p2p.Protocol { return self.protocol } -func createFilter(message *Message, topics []Topic, key *ecdsa.PrivateKey) filter.Filter { +// postEvent opens an envelope with the configured identities and delivers the +// message upstream from application processing. +func (self *Whisper) postEvent(envelope *Envelope) { + if message := self.open(envelope); message != nil { + self.filters.Notify(createFilter(message, envelope.Topics), message) + } +} + +// open tries to decrypt a whisper envelope with all the configured identities, +// returning the decrypted message and the key used to achieve it. If not keys +// are configured, open will return the payload as if non encrypted. +func (self *Whisper) open(envelope *Envelope) *Message { + // Short circuit if no identity is set, and assume clear-text + if len(self.keys) == 0 { + if message, err := envelope.Open(nil); err == nil { + return message + } + } + // Iterate over the keys and try to decrypt the message + for _, key := range self.keys { + message, err := envelope.Open(key) + if err == nil || err == ecies.ErrInvalidPublicKey { + message.To = &key.PublicKey + return message + } + } + // Failed to decrypt, don't return anything + return nil +} + +// createFilter creates a message filter to check against installed handlers. +func createFilter(message *Message, topics []Topic) filter.Filter { return filter.Generic{ - Str1: string(crypto.FromECDSAPub(&key.PublicKey)), Str2: string(crypto.FromECDSAPub(message.Recover())), + Str1: string(crypto.FromECDSAPub(message.To)), + Str2: string(crypto.FromECDSAPub(message.Recover())), Data: NewTopicSet(topics), } } diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go index 5c29956cf..3f903a9dc 100644 --- a/whisper/whisper_test.go +++ b/whisper/whisper_test.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/nat" ) @@ -83,7 +82,7 @@ func TestSelfMessage(t *testing.T) { }, }) // Send a dummy message to oneself - msg := NewMessage([]byte("hello whisper")) + msg := NewMessage([]byte("self whisper")) envelope, err := msg.Wrap(DefaultProofOfWork, Options{ From: self, To: &self.PublicKey, @@ -104,9 +103,6 @@ func TestSelfMessage(t *testing.T) { } func TestDirectMessage(t *testing.T) { - glog.SetV(6) - glog.SetToStderr(true) - // Start the sender-recipient cluster cluster, err := startNodes(2) if err != nil { @@ -129,7 +125,7 @@ func TestDirectMessage(t *testing.T) { }, }) // Send a dummy message from the sender - msg := NewMessage([]byte("hello whisper")) + msg := NewMessage([]byte("direct whisper")) envelope, err := msg.Wrap(DefaultProofOfWork, Options{ From: senderId, To: &recipientId.PublicKey, @@ -139,7 +135,7 @@ func TestDirectMessage(t *testing.T) { t.Fatalf("failed to wrap message: %v", err) } if err := sender.Send(envelope); err != nil { - t.Fatalf("failed to send direct message: %v", err) + t.Fatalf("failed to send direct message: %v", err) } // Wait for an arrival or a timeout select { @@ -148,3 +144,63 @@ func TestDirectMessage(t *testing.T) { t.Fatalf("direct message receive timeout") } } + +func TestAnonymousBroadcast(t *testing.T) { + testBroadcast(true, t) +} + +func TestIdentifiedBroadcast(t *testing.T) { + testBroadcast(false, t) +} + +func testBroadcast(anonymous bool, t *testing.T) { + // Start the single sender multi recipient cluster + cluster, err := startNodes(3) + if err != nil { + t.Fatalf("failed to boot test cluster: %v", err) + } + defer stopNodes(cluster) + + sender := cluster[0].client + targets := make([]*Whisper, len(cluster)-1) + for i, node := range cluster[1:] { + targets[i] = node.client + if !anonymous { + targets[i].NewIdentity() + } + } + // Watch for arriving messages on the recipients + dones := make([]chan struct{}, len(targets)) + for i := 0; i < len(targets); i++ { + done := make(chan struct{}) // need for the closure + dones[i] = done + + targets[i].Watch(Filter{ + Topics: NewTopicsFromStrings("broadcast topic"), + Fn: func(msg *Message) { + close(done) + }, + }) + } + // Send a dummy message from the sender + msg := NewMessage([]byte("broadcast whisper")) + envelope, err := msg.Wrap(DefaultProofOfWork, Options{ + Topics: NewTopicsFromStrings("broadcast topic"), + TTL: DefaultTimeToLive, + }) + if err != nil { + t.Fatalf("failed to wrap message: %v", err) + } + if err := sender.Send(envelope); err != nil { + t.Fatalf("failed to send broadcast message: %v", err) + } + // Wait for an arrival on each recipient, or timeouts + timeout := time.After(time.Second) + for _, done := range dones { + select { + case <-done: + case <-timeout: + t.Fatalf("broadcast message receive timeout") + } + } +}