From ef11e8273ba4932844fb547f7bb322f95f08e50d Mon Sep 17 00:00:00 2001 From: Andrew Poelstra Date: Wed, 10 Sep 2014 07:15:48 -0500 Subject: [PATCH] Destroy socket listener on error rather than trying to reconnect; add #derivings Reconnecting an existing socket simply was not working; the Rust socket did not expose any methods for reconnection, so I simply tried calling connect() again. As near as I can tell, this was a no-op --- which makes sense because both the sending and receiving threads had their own copy of the Socket, and it's not clear what the synchronization behaviour should have been. Instead if the connection fails, we relay this information to the main thread, wait for an acknowledgement, then simply destroy the listening thread. The caller can then simply call `start()` again. --- src/blockdata/block.rs | 6 +++--- src/network/address.rs | 24 ++++++++++++++++++++++++ src/network/encodable.rs | 4 ++-- src/network/listener.rs | 27 +++++++++++---------------- src/network/message.rs | 18 +++++++++++++----- src/network/message_blockdata.rs | 8 ++++---- src/network/message_network.rs | 2 +- src/network/socket.rs | 12 ------------ 8 files changed, 58 insertions(+), 43 deletions(-) diff --git a/src/blockdata/block.rs b/src/blockdata/block.rs index bcde959..2687c5a 100644 --- a/src/blockdata/block.rs +++ b/src/blockdata/block.rs @@ -31,7 +31,7 @@ use blockdata::transaction::Transaction; /// A block header, which contains all the block's information except /// the actual transactions -#[deriving(PartialEq, Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct BlockHeader { /// The protocol version. Should always be 1. pub version: u32, @@ -50,7 +50,7 @@ pub struct BlockHeader { /// A Bitcoin block, which is a collection of transactions with an attached /// proof of work. -#[deriving(PartialEq, Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct Block { /// The block header pub header: BlockHeader, @@ -60,7 +60,7 @@ pub struct Block { /// A block header with txcount attached, which is given in the `headers` /// network message. -#[deriving(PartialEq, Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct LoneBlockHeader { /// The actual block header pub header: BlockHeader, diff --git a/src/network/address.rs b/src/network/address.rs index 8736254..7a10269 100644 --- a/src/network/address.rs +++ b/src/network/address.rs @@ -69,6 +69,30 @@ impl fmt::Show for Address { } } +impl Clone for Address { + fn clone(&self) -> Address { + unsafe { + use std::intrinsics::copy_nonoverlapping_memory; + use std::mem; + let mut ret = mem::uninitialized(); + copy_nonoverlapping_memory(&mut ret, + self, + mem::size_of::
()); + ret + } + } +} + +impl PartialEq for Address { + fn eq(&self, other: &Address) -> bool { + self.services == other.services && + self.address.as_slice() == other.address.as_slice() && + self.port == other.port + } +} + +impl Eq for Address {} + #[cfg(test)] mod test { use super::Address; diff --git a/src/network/encodable.rs b/src/network/encodable.rs index 1491b86..9f02581 100644 --- a/src/network/encodable.rs +++ b/src/network/encodable.rs @@ -51,11 +51,11 @@ pub trait ConsensusDecodable, E> { } /// A variable-length unsigned integer -#[deriving(PartialEq, Show)] +#[deriving(PartialEq, Eq, PartialOrd, Ord, Clone, Show)] pub struct VarInt(pub u64); /// Data which must be preceded by a 4-byte checksum -#[deriving(PartialEq, Clone, Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct CheckedData(pub Vec); // Primitive types diff --git a/src/network/listener.rs b/src/network/listener.rs index 6e4b594..3152df3 100644 --- a/src/network/listener.rs +++ b/src/network/listener.rs @@ -19,11 +19,9 @@ //! use std::io::{IoResult, standard_error, ConnectionFailed}; -use std::io::timer; -use std::time::Duration; use network::constants::Network; -use network::message::{NetworkMessage, Verack}; +use network::message::{mod, SocketResponse, MessageReceived, Verack}; use network::socket::Socket; /// A message which can be sent on the Bitcoin network @@ -35,7 +33,7 @@ pub trait Listener { /// Return the network this `Listener` is operating on fn network(&self) -> Network; /// Main listen loop - fn start(&self) -> IoResult<(Receiver, Socket)> { + fn start(&self) -> IoResult<(Receiver, Socket)> { // Open socket let mut ret_sock = Socket::new(self.network()); match ret_sock.connect(self.peer(), self.port()) { @@ -75,20 +73,17 @@ pub trait Listener { // We have to pass the message to the main thread for processing, // unfortunately, because sipa says we have to handle everything // in order. - recv_tx.send(payload); + recv_tx.send(MessageReceived(payload)); } Err(e) => { - println!("Received error {:} when decoding message. Pausing for 5 seconds then reconnecting.", e); - timer::sleep(Duration::seconds(5)); - // Reconnect - sock.reconnect() - // Create version message - .and_then(|_| sock.version_message(0)) - // Send it out - .and_then(|msg| sock.send_message(msg)) - // For now, not much we can do on error - .unwrap_or_else(|e| println!("Error {} when reconnecting.", e)); - handshake_complete = false; + // On failure we send an error message to the main thread, along with + // a channel to receive an acknowledgement that we may tear down this + // thread. (If we simply exited immediately, the channel would be torn + // down and the main thread would never see the error message.) + let (tx, rx) = channel(); + recv_tx.send(message::ConnectionFailed(e, tx)); + rx.recv(); + break; } } } diff --git a/src/network/message.rs b/src/network/message.rs index 5418dca..c0ebd7a 100644 --- a/src/network/message.rs +++ b/src/network/message.rs @@ -34,7 +34,7 @@ use network::serialize::{serialize, RawDecoder, SimpleEncoder, SimpleDecoder}; use util::misc::prepend_err; /// Serializer for command string -#[deriving(PartialEq, Clone, Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct CommandString(pub String); impl, E> ConsensusEncodable for CommandString { @@ -64,10 +64,18 @@ pub struct RawNetworkMessage { pub payload: NetworkMessage } -#[deriving(Show)] +/// A response from the peer-connected socket +pub enum SocketResponse { + /// A message was received + MessageReceived(NetworkMessage), + /// An error occured and the socket needs to close + ConnectionFailed(IoError, Sender<()>) +} + +#[deriving(Clone, PartialEq, Eq, Show)] /// A Network message payload. Proper documentation is available on the Bitcoin /// wiki https://en.bitcoin.it/wiki/Protocol_specification -pub enum NetworkMessage{ +pub enum NetworkMessage { /// `version` Version(message_network::VersionMessage), /// `verack` @@ -98,7 +106,7 @@ pub enum NetworkMessage{ /// `ping` Ping(u64), /// `pong` - Pong(u64), + Pong(u64) // TODO: reject, // TODO: bloom filtering // TODO: alert @@ -119,7 +127,7 @@ impl RawNetworkMessage { Block(_) => "block", Headers(_) => "headers", Ping(_) => "ping", - Pong(_) => "pong" + Pong(_) => "pong", }.to_string() } } diff --git a/src/network/message_blockdata.rs b/src/network/message_blockdata.rs index 7021d07..70a73ad 100644 --- a/src/network/message_blockdata.rs +++ b/src/network/message_blockdata.rs @@ -23,7 +23,7 @@ use network::encodable::{ConsensusDecodable, ConsensusEncodable}; use network::serialize::{SimpleDecoder, SimpleEncoder}; use util::hash::Sha256dHash; -#[deriving(Clone, PartialEq, Show)] +#[deriving(PartialEq, Eq, Clone, Show)] /// The type of an inventory object pub enum InvType { /// Error --- these inventories can be ignored @@ -37,7 +37,7 @@ pub enum InvType { // Some simple messages /// The `getblocks` message -#[deriving(Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct GetBlocksMessage { /// The protocol version pub version: u32, @@ -50,7 +50,7 @@ pub struct GetBlocksMessage { } /// The `getheaders` message -#[deriving(Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct GetHeadersMessage { /// The protocol version pub version: u32, @@ -63,7 +63,7 @@ pub struct GetHeadersMessage { } /// An inventory object --- a reference to a Bitcoin object -#[deriving(Clone, Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct Inventory { /// The type of object that is referenced pub inv_type: InvType, diff --git a/src/network/message_network.rs b/src/network/message_network.rs index 11de60a..996de6a 100644 --- a/src/network/message_network.rs +++ b/src/network/message_network.rs @@ -27,7 +27,7 @@ use network::socket::Socket; /// Some simple messages /// The `version` message -#[deriving(Show)] +#[deriving(PartialEq, Eq, Clone, Show)] pub struct VersionMessage { /// The P2P network protocol version pub version: u32, diff --git a/src/network/socket.rs b/src/network/socket.rs index 74ec9a6..7481590 100644 --- a/src/network/socket.rs +++ b/src/network/socket.rs @@ -105,18 +105,6 @@ impl Socket { } } - /// Reset the connection to the peer - pub fn reconnect(&mut self) -> IoResult<()> { - let (host, port) = match self.socket { - Some(ref mut s) => match s.peer_name() { - Ok(addr) => (format!("{}", addr.ip), addr.port), - Err(e) => { return Err(e); } - }, - None => { return Err(standard_error(NotConnected)); } - }; - self.connect(host.as_slice(), port) - } - /// Peer address pub fn receiver_address(&mut self) -> IoResult
{ match self.socket {