Destroy socket listener on error rather than trying to reconnect; add #derivings

Reconnecting an existing socket simply was not working; the Rust socket
did not expose any methods for reconnection, so I simply tried calling
connect() again. As near as I can tell, this was a no-op --- which makes
sense because both the sending and receiving threads had their own copy
of the Socket, and it's not clear what the synchronization behaviour
should have been.

Instead if the connection fails, we relay this information to the main
thread, wait for an acknowledgement, then simply destroy the listening
thread. The caller can then simply call `start()` again.
This commit is contained in:
Andrew Poelstra 2014-09-10 07:15:48 -05:00
parent 1f41a67194
commit ef11e8273b
8 changed files with 58 additions and 43 deletions

View File

@ -31,7 +31,7 @@ use blockdata::transaction::Transaction;
/// A block header, which contains all the block's information except
/// the actual transactions
#[deriving(PartialEq, Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct BlockHeader {
/// The protocol version. Should always be 1.
pub version: u32,
@ -50,7 +50,7 @@ pub struct BlockHeader {
/// A Bitcoin block, which is a collection of transactions with an attached
/// proof of work.
#[deriving(PartialEq, Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct Block {
/// The block header
pub header: BlockHeader,
@ -60,7 +60,7 @@ pub struct Block {
/// A block header with txcount attached, which is given in the `headers`
/// network message.
#[deriving(PartialEq, Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct LoneBlockHeader {
/// The actual block header
pub header: BlockHeader,

View File

@ -69,6 +69,30 @@ impl fmt::Show for Address {
}
}
impl Clone for Address {
fn clone(&self) -> Address {
unsafe {
use std::intrinsics::copy_nonoverlapping_memory;
use std::mem;
let mut ret = mem::uninitialized();
copy_nonoverlapping_memory(&mut ret,
self,
mem::size_of::<Address>());
ret
}
}
}
impl PartialEq for Address {
fn eq(&self, other: &Address) -> bool {
self.services == other.services &&
self.address.as_slice() == other.address.as_slice() &&
self.port == other.port
}
}
impl Eq for Address {}
#[cfg(test)]
mod test {
use super::Address;

View File

@ -51,11 +51,11 @@ pub trait ConsensusDecodable<D:SimpleDecoder<E>, E> {
}
/// A variable-length unsigned integer
#[deriving(PartialEq, Show)]
#[deriving(PartialEq, Eq, PartialOrd, Ord, Clone, Show)]
pub struct VarInt(pub u64);
/// Data which must be preceded by a 4-byte checksum
#[deriving(PartialEq, Clone, Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct CheckedData(pub Vec<u8>);
// Primitive types

View File

@ -19,11 +19,9 @@
//!
use std::io::{IoResult, standard_error, ConnectionFailed};
use std::io::timer;
use std::time::Duration;
use network::constants::Network;
use network::message::{NetworkMessage, Verack};
use network::message::{mod, SocketResponse, MessageReceived, Verack};
use network::socket::Socket;
/// A message which can be sent on the Bitcoin network
@ -35,7 +33,7 @@ pub trait Listener {
/// Return the network this `Listener` is operating on
fn network(&self) -> Network;
/// Main listen loop
fn start(&self) -> IoResult<(Receiver<NetworkMessage>, Socket)> {
fn start(&self) -> IoResult<(Receiver<SocketResponse>, Socket)> {
// Open socket
let mut ret_sock = Socket::new(self.network());
match ret_sock.connect(self.peer(), self.port()) {
@ -75,20 +73,17 @@ pub trait Listener {
// We have to pass the message to the main thread for processing,
// unfortunately, because sipa says we have to handle everything
// in order.
recv_tx.send(payload);
recv_tx.send(MessageReceived(payload));
}
Err(e) => {
println!("Received error {:} when decoding message. Pausing for 5 seconds then reconnecting.", e);
timer::sleep(Duration::seconds(5));
// Reconnect
sock.reconnect()
// Create version message
.and_then(|_| sock.version_message(0))
// Send it out
.and_then(|msg| sock.send_message(msg))
// For now, not much we can do on error
.unwrap_or_else(|e| println!("Error {} when reconnecting.", e));
handshake_complete = false;
// On failure we send an error message to the main thread, along with
// a channel to receive an acknowledgement that we may tear down this
// thread. (If we simply exited immediately, the channel would be torn
// down and the main thread would never see the error message.)
let (tx, rx) = channel();
recv_tx.send(message::ConnectionFailed(e, tx));
rx.recv();
break;
}
}
}

View File

@ -34,7 +34,7 @@ use network::serialize::{serialize, RawDecoder, SimpleEncoder, SimpleDecoder};
use util::misc::prepend_err;
/// Serializer for command string
#[deriving(PartialEq, Clone, Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct CommandString(pub String);
impl<S:SimpleEncoder<E>, E> ConsensusEncodable<S, E> for CommandString {
@ -64,10 +64,18 @@ pub struct RawNetworkMessage {
pub payload: NetworkMessage
}
#[deriving(Show)]
/// A response from the peer-connected socket
pub enum SocketResponse {
/// A message was received
MessageReceived(NetworkMessage),
/// An error occured and the socket needs to close
ConnectionFailed(IoError, Sender<()>)
}
#[deriving(Clone, PartialEq, Eq, Show)]
/// A Network message payload. Proper documentation is available on the Bitcoin
/// wiki https://en.bitcoin.it/wiki/Protocol_specification
pub enum NetworkMessage{
pub enum NetworkMessage {
/// `version`
Version(message_network::VersionMessage),
/// `verack`
@ -98,7 +106,7 @@ pub enum NetworkMessage{
/// `ping`
Ping(u64),
/// `pong`
Pong(u64),
Pong(u64)
// TODO: reject,
// TODO: bloom filtering
// TODO: alert
@ -119,7 +127,7 @@ impl RawNetworkMessage {
Block(_) => "block",
Headers(_) => "headers",
Ping(_) => "ping",
Pong(_) => "pong"
Pong(_) => "pong",
}.to_string()
}
}

View File

@ -23,7 +23,7 @@ use network::encodable::{ConsensusDecodable, ConsensusEncodable};
use network::serialize::{SimpleDecoder, SimpleEncoder};
use util::hash::Sha256dHash;
#[deriving(Clone, PartialEq, Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
/// The type of an inventory object
pub enum InvType {
/// Error --- these inventories can be ignored
@ -37,7 +37,7 @@ pub enum InvType {
// Some simple messages
/// The `getblocks` message
#[deriving(Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct GetBlocksMessage {
/// The protocol version
pub version: u32,
@ -50,7 +50,7 @@ pub struct GetBlocksMessage {
}
/// The `getheaders` message
#[deriving(Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct GetHeadersMessage {
/// The protocol version
pub version: u32,
@ -63,7 +63,7 @@ pub struct GetHeadersMessage {
}
/// An inventory object --- a reference to a Bitcoin object
#[deriving(Clone, Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct Inventory {
/// The type of object that is referenced
pub inv_type: InvType,

View File

@ -27,7 +27,7 @@ use network::socket::Socket;
/// Some simple messages
/// The `version` message
#[deriving(Show)]
#[deriving(PartialEq, Eq, Clone, Show)]
pub struct VersionMessage {
/// The P2P network protocol version
pub version: u32,

View File

@ -105,18 +105,6 @@ impl Socket {
}
}
/// Reset the connection to the peer
pub fn reconnect(&mut self) -> IoResult<()> {
let (host, port) = match self.socket {
Some(ref mut s) => match s.peer_name() {
Ok(addr) => (format!("{}", addr.ip), addr.port),
Err(e) => { return Err(e); }
},
None => { return Err(standard_error(NotConnected)); }
};
self.connect(host.as_slice(), port)
}
/// Peer address
pub fn receiver_address(&mut self) -> IoResult<Address> {
match self.socket {