Sign messages.

This commit is contained in:
Andreas Fackler 2018-11-14 14:13:01 +01:00 committed by Nick Sanders
parent 51d526c51c
commit 66b3cf0f47
4 changed files with 45 additions and 14 deletions

View File

@ -453,10 +453,10 @@ impl<T: Contribution> Handler<T> {
&& !peers.contains_in_addr(&peer_info.in_addr)
&& peers.get(&OutAddr(peer_info.in_addr.0)).is_none()
{
let local_pk = self.hdb.secret_key().public_key();
let local_sk = self.hdb.secret_key().clone();
tokio::spawn(self.hdb.clone().connect_outgoing(
peer_info.in_addr.0,
local_pk,
local_sk,
Some((peer_info.uid, peer_info.in_addr, peer_info.pk)),
false,
));

View File

@ -314,7 +314,7 @@ impl<T: Contribution> Hydrabadger<T> {
/// Returns a future that handles incoming connections on `socket`.
fn handle_incoming(self, socket: TcpStream) -> impl Future<Item = (), Error = ()> {
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
let wire_msgs = WireMessages::new(socket);
let wire_msgs = WireMessages::new(socket, self.inner.secret_key.clone());
wire_msgs
.into_future()
@ -368,7 +368,7 @@ impl<T: Contribution> Hydrabadger<T> {
pub(super) fn connect_outgoing(
self,
remote_addr: SocketAddr,
local_pk: PublicKey,
local_sk: SecretKey,
pub_info: Option<(Uid, InAddr, PublicKey)>,
is_optimistic: bool,
) -> impl Future<Item = (), Error = ()> {
@ -380,8 +380,9 @@ impl<T: Contribution> Hydrabadger<T> {
TcpStream::connect(&remote_addr)
.map_err(Error::from)
.and_then(move |socket| {
let local_pk = local_sk.public_key();
// Wrap the socket with the frame delimiter and codec:
let mut wire_msgs = WireMessages::new(socket);
let mut wire_msgs = WireMessages::new(socket, local_sk);
let wire_hello_result = wire_msgs.send_msg(WireMessage::hello_request_change_add(
uid, in_addr, local_pk,
));
@ -511,12 +512,12 @@ impl<T: Contribution> Hydrabadger<T> {
});
let hdb = self.clone();
let local_pk = hdb.inner.secret_key.public_key();
let local_sk = hdb.inner.secret_key.clone();
let connect = future::lazy(move || {
for &remote_addr in remotes.iter().filter(|&&ra| ra != hdb.inner.addr.0) {
tokio::spawn(
hdb.clone()
.connect_outgoing(remote_addr, local_pk, None, true),
.connect_outgoing(remote_addr, local_sk.clone(), None, true),
);
}
Ok(())

View File

@ -32,6 +32,8 @@ pub enum Error {
Io(std::io::Error),
#[fail(display = "Serde error: {}", _0)]
Serde(bincode::Error),
#[fail(display = "Received a message with invalid signature")]
InvalidSignature,
#[fail(display = "Error polling hydrabadger internal receiver")]
HydrabadgerHandlerPoll,
#[fail(display = "DynamicHoneyBadger error")]
@ -48,6 +50,8 @@ pub enum Error {
VoteForNotValidator,
#[fail(display = "Unable to transmit epoch status to listener, listener receiver dropped")]
InstantiateHbListenerDropped,
#[fail(display = "Received duplicate HelloRequestChangeAdd messages")]
DuplicateHello,
}
impl From<std::io::Error> for Error {

View File

@ -64,7 +64,7 @@ use std::{
use tokio::{io, net::TcpStream, prelude::*, codec::{Framed, LengthDelimitedCodec}};
use uuid::Uuid;
use hbbft::{
crypto::{PublicKey, PublicKeySet},
crypto::{PublicKey, PublicKeySet, SecretKey, Signature},
dynamic_honey_badger::{JoinPlan, Message as DhbMessage, DynamicHoneyBadger, Change as DhbChange},
sync_key_gen::{Ack, Part},
DaStep as MessagingStep,
@ -306,17 +306,28 @@ impl<T: Contribution> From<WireMessageKind<T>> for WireMessage<T> {
}
}
/// A serialized `WireMessage` signed by the sender.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SignedWireMessage {
message: Vec<u8>,
sig: Signature,
}
/// A stream/sink of `WireMessage`s connected to a socket.
#[derive(Debug)]
pub struct WireMessages<T> {
framed: Framed<TcpStream, LengthDelimitedCodec>,
our_key: SecretKey,
their_key: Option<PublicKey>,
_t: PhantomData<T>,
}
impl<T: Contribution> WireMessages<T> {
pub fn new(socket: TcpStream) -> WireMessages<T> {
pub fn new(socket: TcpStream, our_key: SecretKey) -> WireMessages<T> {
WireMessages {
framed: Framed::new(socket, LengthDelimitedCodec::new()),
our_key,
their_key: None,
_t: PhantomData,
}
}
@ -339,10 +350,22 @@ impl<T: Contribution> Stream for WireMessages<T> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.framed.poll()) {
Some(frame) => {
Ok(Async::Ready(Some(
// deserialize_from(frame.reader()).map_err(Error::Serde)?
bincode::deserialize(&frame.freeze()).map_err(Error::Serde)?,
)))
let s_msg: SignedWireMessage =
bincode::deserialize(&frame.freeze()).map_err(Error::Serde)?;
let msg: WireMessage<T> =
bincode::deserialize(&s_msg.message).map_err(Error::Serde)?;
if let WireMessageKind::HelloRequestChangeAdd(_, _, pk) = msg.kind {
if self.their_key.is_some() {
return Err(Error::DuplicateHello);
}
self.their_key = Some(pk);
}
if !self.their_key.as_ref()
.map_or(true, |pk| pk.verify(&s_msg.sig, &s_msg.message)) {
return Err(Error::InvalidSignature);
}
// deserialize_from(frame.reader()).map_err(Error::Serde)?
Ok(Async::Ready(Some(msg)))
}
None => Ok(Async::Ready(None)),
}
@ -357,11 +380,14 @@ impl<T: Contribution> Sink for WireMessages<T> {
// TODO: Reuse buffer:
let mut serialized = BytesMut::new();
let message = bincode::serialize(&item).map_err(Error::Serde)?;
let sig = self.our_key.sign(&message);
// Downgraded from bincode 1.0:
//
// Original: `bincode::serialize(&item)`
//
match bincode::serialize(&item) {
match bincode::serialize(&SignedWireMessage { message, sig }) {
Ok(s) => serialized.extend_from_slice(&s),
Err(err) => return Err(Error::Io(io::Error::new(io::ErrorKind::Other, err))),
}