Streamline message verification.
This commit is contained in:
parent
963be89e14
commit
5fab376f9b
|
@ -57,7 +57,8 @@ clear_on_drop = "0.2"
|
|||
version = "*"
|
||||
# git = "https://github.com/c0gent/hbbft"
|
||||
git = "https://github.com/poanetwork/hbbft"
|
||||
branch = "master"
|
||||
# branch = "master"
|
||||
rev = "9049dd17"
|
||||
# path = "../hbbft"
|
||||
|
||||
[profile.release]
|
||||
|
|
|
@ -309,7 +309,7 @@ impl<T: Contribution> Hydrabadger<T> {
|
|||
/// Returns a future that handles incoming connections on `socket`.
|
||||
fn handle_incoming(self, socket: TcpStream) -> impl Future<Item = (), Error = ()> {
|
||||
info!("Incoming connection from '{}'", socket.peer_addr().unwrap());
|
||||
let wire_msgs = WireMessages::new(socket, self.inner.secret_key.clone(), self.clone());
|
||||
let wire_msgs = WireMessages::new(socket, self.inner.secret_key.clone());
|
||||
|
||||
wire_msgs
|
||||
.into_future()
|
||||
|
@ -377,7 +377,7 @@ impl<T: Contribution> Hydrabadger<T> {
|
|||
.and_then(move |socket| {
|
||||
let local_pk = local_sk.public_key();
|
||||
// Wrap the socket with the frame delimiter and codec:
|
||||
let mut wire_msgs = WireMessages::new(socket, local_sk, self.clone());
|
||||
let mut wire_msgs = WireMessages::new(socket, local_sk);
|
||||
let wire_hello_result = wire_msgs.send_msg(WireMessage::hello_request_change_add(
|
||||
uid, in_addr, local_pk,
|
||||
));
|
||||
|
@ -396,9 +396,9 @@ impl<T: Contribution> Hydrabadger<T> {
|
|||
})
|
||||
.map_err(move |err| {
|
||||
if is_optimistic {
|
||||
warn!("Unable to connect to: {}", remote_addr);
|
||||
warn!("Unable to connect to: {} ({e:?}: {e})", remote_addr, e=err);
|
||||
} else {
|
||||
error!("Error connecting to: {} \n{:?}", remote_addr, err);
|
||||
error!("Error connecting to: {} ({e:?}: {e})", remote_addr, e=err);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -52,8 +52,8 @@ pub enum Error {
|
|||
VoteForNotValidator,
|
||||
#[fail(display = "Unable to transmit epoch status to listener, listener receiver dropped")]
|
||||
InstantiateHbListenerDropped,
|
||||
#[fail(display = "Message received from unknown peer")]
|
||||
MessageReceivedUnknownPeer,
|
||||
#[fail(display = "Message received from unknown peer while attempting to verify")]
|
||||
VerificationMessageReceivedUnknownPeer,
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
|
|
|
@ -63,6 +63,14 @@ impl From<usize> for StateDsct {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// pub struct KeyGen {
|
||||
// ack_queue: Option<SegQueue<(Uid, Ack)>>,
|
||||
// iom_queue: Option<SegQueue<InputOrMessage<T>>>,
|
||||
|
||||
// }
|
||||
|
||||
|
||||
/// The current hydrabadger state.
|
||||
//
|
||||
pub enum State<T: Contribution> {
|
||||
|
|
60
src/lib.rs
60
src/lib.rs
|
@ -209,8 +209,8 @@ pub enum NetworkState {
|
|||
|
||||
/// Messages sent over the network between nodes.
|
||||
///
|
||||
/// Only [`Message`](enum.WireMessageKind.html#variant.Message) variants are
|
||||
/// verified.
|
||||
/// [`Message`](enum.WireMessageKind.html#variant.Message) variants are among
|
||||
/// those verified.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum WireMessageKind<T> {
|
||||
HelloFromValidator(Uid, InAddr, PublicKey, NetworkState),
|
||||
|
@ -256,9 +256,7 @@ impl<T: Contribution> WireMessage<T> {
|
|||
in_addr: InAddr,
|
||||
pk: PublicKey,
|
||||
) -> WireMessage<T> {
|
||||
WireMessage {
|
||||
kind: WireMessageKind::HelloRequestChangeAdd(src_uid, in_addr, pk),
|
||||
}
|
||||
WireMessageKind::HelloRequestChangeAdd(src_uid, in_addr, pk).into()
|
||||
}
|
||||
|
||||
/// Returns a `WelcomeReceivedChangeAdd` variant.
|
||||
|
@ -267,29 +265,21 @@ impl<T: Contribution> WireMessage<T> {
|
|||
pk: PublicKey,
|
||||
net_state: NetworkState,
|
||||
) -> WireMessage<T> {
|
||||
WireMessage {
|
||||
kind: WireMessageKind::WelcomeReceivedChangeAdd(src_uid, pk, net_state),
|
||||
}
|
||||
WireMessageKind::WelcomeReceivedChangeAdd(src_uid, pk, net_state).into()
|
||||
}
|
||||
|
||||
/// Returns an `Input` variant.
|
||||
pub fn transaction(src_uid: Uid, txn: T) -> WireMessage<T> {
|
||||
WireMessage {
|
||||
kind: WireMessageKind::Transaction(src_uid, txn),
|
||||
}
|
||||
WireMessageKind::Transaction(src_uid, txn).into()
|
||||
}
|
||||
|
||||
/// Returns a `Message` variant.
|
||||
pub fn message(src_uid: Uid, msg: Message) -> WireMessage<T> {
|
||||
WireMessage {
|
||||
kind: WireMessageKind::Message(src_uid, msg),
|
||||
}
|
||||
WireMessageKind::Message(src_uid, msg).into()
|
||||
}
|
||||
|
||||
pub fn key_gen_part(part: Part) -> WireMessage<T> {
|
||||
WireMessage {
|
||||
kind: WireMessageKind::KeyGenPart(part),
|
||||
}
|
||||
WireMessageKind::KeyGenPart(part).into()
|
||||
}
|
||||
|
||||
pub fn key_gen_part_ack(outcome: Ack) -> WireMessage<T> {
|
||||
|
@ -327,21 +317,26 @@ pub struct SignedWireMessage {
|
|||
/// A stream/sink of `WireMessage`s connected to a socket.
|
||||
pub struct WireMessages<T: Contribution> {
|
||||
framed: Framed<TcpStream, LengthDelimitedCodec>,
|
||||
our_key: SecretKey,
|
||||
hdb: Hydrabadger<T>,
|
||||
local_sk: SecretKey,
|
||||
peer_pk: Option<PublicKey>,
|
||||
_t: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T: Contribution> WireMessages<T> {
|
||||
pub fn new(socket: TcpStream, our_key: SecretKey, hdb: Hydrabadger<T>) -> WireMessages<T> {
|
||||
pub fn new(socket: TcpStream, local_sk: SecretKey) -> WireMessages<T> {
|
||||
WireMessages {
|
||||
framed: Framed::new(socket, LengthDelimitedCodec::new()),
|
||||
our_key,
|
||||
hdb,
|
||||
local_sk,
|
||||
peer_pk: None,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_peer_public_key(&mut self, peer_pk: PublicKey) {
|
||||
assert!(self.peer_pk.map(|pk| pk == peer_pk).unwrap_or(true));
|
||||
self.peer_pk = Some(peer_pk);
|
||||
}
|
||||
|
||||
pub fn socket(&self) -> &TcpStream {
|
||||
self.framed.get_ref()
|
||||
}
|
||||
|
@ -365,16 +360,17 @@ impl<T: Contribution> Stream for WireMessages<T> {
|
|||
let msg: WireMessage<T> =
|
||||
bincode::deserialize(&s_msg.message).map_err(Error::Serde)?;
|
||||
|
||||
// Verify signature for `WireMessageKind::Message` variants.
|
||||
if let WireMessageKind::Message(uid, _) = &msg.kind {
|
||||
match self.hdb.peers().get_by_uid(uid).and_then(|peer| peer.public_key()) {
|
||||
Some(pk) => {
|
||||
if !pk.verify(&s_msg.sig, &s_msg.message) {
|
||||
return Err(Error::InvalidSignature);
|
||||
}
|
||||
},
|
||||
None => return Err(Error::MessageReceivedUnknownPeer),
|
||||
// Verify signature for certain variants.
|
||||
match msg.kind {
|
||||
| WireMessageKind::Message(..)
|
||||
| WireMessageKind::KeyGenAck(..)
|
||||
| WireMessageKind::KeyGenPart(..) => {
|
||||
let peer_pk = self.peer_pk.ok_or(Error::VerificationMessageReceivedUnknownPeer)?;
|
||||
if !peer_pk.verify(&s_msg.sig, &s_msg.message) {
|
||||
return Err(Error::InvalidSignature);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(Async::Ready(Some(msg)))
|
||||
|
@ -393,7 +389,7 @@ impl<T: Contribution> Sink for WireMessages<T> {
|
|||
let mut serialized = BytesMut::new();
|
||||
|
||||
let message = bincode::serialize(&item).map_err(Error::Serde)?;
|
||||
let sig = self.our_key.sign(&message);
|
||||
let sig = self.local_sk.sign(&message);
|
||||
|
||||
match bincode::serialize(&SignedWireMessage { message, sig }) {
|
||||
Ok(s) => serialized.extend_from_slice(&s),
|
||||
|
|
22
src/peer.rs
22
src/peer.rs
|
@ -45,7 +45,7 @@ impl<T: Contribution> PeerHandler<T> {
|
|||
pub fn new(
|
||||
pub_info: Option<(Uid, InAddr, PublicKey)>,
|
||||
hdb: Hydrabadger<T>,
|
||||
wire_msgs: WireMessages<T>,
|
||||
mut wire_msgs: WireMessages<T>,
|
||||
) -> PeerHandler<T> {
|
||||
// Get the client socket address
|
||||
let out_addr = OutAddr(wire_msgs.socket().peer_addr().unwrap());
|
||||
|
@ -53,7 +53,15 @@ impl<T: Contribution> PeerHandler<T> {
|
|||
// Create a channel for this peer
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
let uid = pub_info.as_ref().map(|(uid, _, _)| *uid);
|
||||
pub_info.as_ref().map(|(uid, _, _)| *uid);
|
||||
|
||||
let uid = match pub_info {
|
||||
Some((uid, _, pk)) => {
|
||||
wire_msgs.set_peer_public_key(pk);
|
||||
Some(uid)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
// Add an entry for this `Peer` in the shared state map.
|
||||
hdb.peers_mut().add(out_addr, tx, pub_info);
|
||||
|
@ -121,12 +129,22 @@ impl<T: Contribution> Future for PeerHandler<T> {
|
|||
}
|
||||
WireMessageKind::WelcomeReceivedChangeAdd(src_uid, pk, net_state) => {
|
||||
self.uid = Some(src_uid);
|
||||
self.wire_msgs.set_peer_public_key(pk);
|
||||
self.hdb.send_internal(InternalMessage::wire(
|
||||
Some(src_uid),
|
||||
self.out_addr,
|
||||
WireMessage::welcome_received_change_add(src_uid, pk, net_state),
|
||||
));
|
||||
}
|
||||
WireMessageKind::HelloFromValidator(src_uid, in_addr, pk, net_state) => {
|
||||
self.uid = Some(src_uid);
|
||||
self.wire_msgs.set_peer_public_key(pk);
|
||||
self.hdb.send_internal(InternalMessage::wire(
|
||||
Some(src_uid),
|
||||
self.out_addr,
|
||||
WireMessage::hello_from_validator(src_uid, in_addr, pk, net_state),
|
||||
));
|
||||
}
|
||||
WireMessageKind::Message(src_uid, msg) => {
|
||||
if let Some(peer_uid) = self.uid.as_ref() {
|
||||
debug_assert_eq!(src_uid, *peer_uid);
|
||||
|
|
Loading…
Reference in New Issue