Modify `WireMessages` to use peer public key.
This commit is contained in:
parent
66b3cf0f47
commit
bcaf7aa698
|
@ -314,7 +314,7 @@ impl<T: Contribution> Hydrabadger<T> {
|
|||
/// Returns a future that handles incoming connections on `socket`.
|
||||
fn handle_incoming(self, socket: TcpStream) -> impl Future<Item = (), Error = ()> {
|
||||
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
|
||||
let wire_msgs = WireMessages::new(socket, self.inner.secret_key.clone());
|
||||
let wire_msgs = WireMessages::new(socket, self.inner.secret_key.clone(), self.clone());
|
||||
|
||||
wire_msgs
|
||||
.into_future()
|
||||
|
@ -382,7 +382,7 @@ impl<T: Contribution> Hydrabadger<T> {
|
|||
.and_then(move |socket| {
|
||||
let local_pk = local_sk.public_key();
|
||||
// Wrap the socket with the frame delimiter and codec:
|
||||
let mut wire_msgs = WireMessages::new(socket, local_sk);
|
||||
let mut wire_msgs = WireMessages::new(socket, local_sk, self.clone());
|
||||
let wire_hello_result = wire_msgs.send_msg(WireMessage::hello_request_change_add(
|
||||
uid, in_addr, local_pk,
|
||||
));
|
||||
|
|
|
@ -26,6 +26,8 @@ pub enum InputOrMessage<T> {
|
|||
Message(Uid, Message),
|
||||
}
|
||||
|
||||
// TODO: Move this up to `lib.rs` or, preferably, create another error type
|
||||
// for general (lib) use.
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum Error {
|
||||
#[fail(display = "Io error: {}", _0)]
|
||||
|
@ -50,8 +52,8 @@ pub enum Error {
|
|||
VoteForNotValidator,
|
||||
#[fail(display = "Unable to transmit epoch status to listener, listener receiver dropped")]
|
||||
InstantiateHbListenerDropped,
|
||||
#[fail(display = "Received duplicate HelloRequestChangeAdd messages")]
|
||||
DuplicateHello,
|
||||
#[fail(display = "Message received from unknown peer")]
|
||||
MessageReceivedUnknownPeer,
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
|
|
44
src/lib.rs
44
src/lib.rs
|
@ -206,6 +206,9 @@ pub enum NetworkState {
|
|||
}
|
||||
|
||||
/// Messages sent over the network between nodes.
|
||||
///
|
||||
/// Only [`Message`](enum.WireMessageKind.html#variant.Message) variants are
|
||||
/// verified.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum WireMessageKind<T> {
|
||||
HelloFromValidator(Uid, InAddr, PublicKey, NetworkState),
|
||||
|
@ -215,12 +218,18 @@ pub enum WireMessageKind<T> {
|
|||
NetworkState(NetworkState),
|
||||
Goodbye,
|
||||
#[serde(with = "serde_bytes")]
|
||||
// TODO(c0gent): Remove.
|
||||
Bytes(Bytes),
|
||||
/// A Honey Badger message.
|
||||
///
|
||||
/// All received messages are verified against the senders public key
|
||||
/// using an attached signature.
|
||||
Message(Uid, Message),
|
||||
// TODO(c0gent): Remove.
|
||||
Transaction(Uid, T),
|
||||
KeyGenPart(Part),
|
||||
KeyGenAck(Ack),
|
||||
JoinPlan(JoinPlan<Uid>), // TargetedMessage(TargetedMessage<Uid>)
|
||||
JoinPlan(JoinPlan<Uid>),
|
||||
}
|
||||
|
||||
/// Messages sent over the network between nodes.
|
||||
|
@ -314,20 +323,19 @@ pub struct SignedWireMessage {
|
|||
}
|
||||
|
||||
/// A stream/sink of `WireMessage`s connected to a socket.
|
||||
#[derive(Debug)]
|
||||
pub struct WireMessages<T> {
|
||||
pub struct WireMessages<T: Contribution> {
|
||||
framed: Framed<TcpStream, LengthDelimitedCodec>,
|
||||
our_key: SecretKey,
|
||||
their_key: Option<PublicKey>,
|
||||
hdb: Hydrabadger<T>,
|
||||
_t: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T: Contribution> WireMessages<T> {
|
||||
pub fn new(socket: TcpStream, our_key: SecretKey) -> WireMessages<T> {
|
||||
pub fn new(socket: TcpStream, our_key: SecretKey, hdb: Hydrabadger<T>) -> WireMessages<T> {
|
||||
WireMessages {
|
||||
framed: Framed::new(socket, LengthDelimitedCodec::new()),
|
||||
our_key,
|
||||
their_key: None,
|
||||
hdb,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
@ -354,17 +362,19 @@ impl<T: Contribution> Stream for WireMessages<T> {
|
|||
bincode::deserialize(&frame.freeze()).map_err(Error::Serde)?;
|
||||
let msg: WireMessage<T> =
|
||||
bincode::deserialize(&s_msg.message).map_err(Error::Serde)?;
|
||||
if let WireMessageKind::HelloRequestChangeAdd(_, _, pk) = msg.kind {
|
||||
if self.their_key.is_some() {
|
||||
return Err(Error::DuplicateHello);
|
||||
|
||||
// Verify signature for `WireMessageKind::Message` variants.
|
||||
if let WireMessageKind::Message(uid, _) = &msg.kind {
|
||||
match self.hdb.peers().get_by_uid(uid).and_then(|peer| peer.public_key()) {
|
||||
Some(pk) => {
|
||||
if !pk.verify(&s_msg.sig, &s_msg.message) {
|
||||
return Err(Error::InvalidSignature);
|
||||
}
|
||||
},
|
||||
None => return Err(Error::MessageReceivedUnknownPeer),
|
||||
}
|
||||
self.their_key = Some(pk);
|
||||
}
|
||||
if !self.their_key.as_ref()
|
||||
.map_or(true, |pk| pk.verify(&s_msg.sig, &s_msg.message)) {
|
||||
return Err(Error::InvalidSignature);
|
||||
}
|
||||
// deserialize_from(frame.reader()).map_err(Error::Serde)?
|
||||
|
||||
Ok(Async::Ready(Some(msg)))
|
||||
}
|
||||
None => Ok(Async::Ready(None)),
|
||||
|
@ -383,10 +393,6 @@ impl<T: Contribution> Sink for WireMessages<T> {
|
|||
let message = bincode::serialize(&item).map_err(Error::Serde)?;
|
||||
let sig = self.our_key.sign(&message);
|
||||
|
||||
// Downgraded from bincode 1.0:
|
||||
//
|
||||
// Original: `bincode::serialize(&item)`
|
||||
//
|
||||
match bincode::serialize(&SignedWireMessage { message, sig }) {
|
||||
Ok(s) => serialized.extend_from_slice(&s),
|
||||
Err(err) => return Err(Error::Io(io::Error::new(io::ErrorKind::Other, err))),
|
||||
|
|
Loading…
Reference in New Issue