Merge branch 'master' of github.com:ethcore/parity-bitcoin into generic_reader
This commit is contained in:
commit
08a2a93d9c
|
@ -399,6 +399,7 @@ dependencies = [
|
|||
"parking_lot 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"primitives 0.1.0",
|
||||
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serialization 0.1.0",
|
||||
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
|
|
@ -31,9 +31,22 @@ impl From<InventoryType> for u32 {
|
|||
}
|
||||
}
|
||||
|
||||
impl Serializable for InventoryType {
|
||||
fn serialize(&self, stream: &mut Stream) {
|
||||
stream.append(&u32::from(*self));
|
||||
}
|
||||
}
|
||||
|
||||
impl Deserializable for InventoryType {
|
||||
fn deserialize(reader: &mut Reader) -> Result<Self, ReaderError> where Self: Sized {
|
||||
let t: u32 = try!(reader.read());
|
||||
InventoryType::from_u32(t).ok_or(ReaderError::MalformedData)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct InventoryVector {
|
||||
pub inv_type: u32, // TODO: change to InventoryType as discussed in #37
|
||||
pub inv_type: InventoryType,
|
||||
pub hash: H256,
|
||||
}
|
||||
|
||||
|
@ -56,12 +69,6 @@ impl Deserializable for InventoryVector {
|
|||
}
|
||||
}
|
||||
|
||||
impl InventoryVector {
|
||||
pub fn inventory_type(&self) -> Option<InventoryType> {
|
||||
InventoryType::from_u32(self.inv_type)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::Bytes;
|
||||
|
@ -73,7 +80,7 @@ mod tests {
|
|||
let expected = "020000000000000000000000000000000000000000000000000000000000000000000004".into();
|
||||
|
||||
let inventory = InventoryVector {
|
||||
inv_type: 2,
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: 4u8.into(),
|
||||
};
|
||||
|
||||
|
@ -85,7 +92,7 @@ mod tests {
|
|||
let raw: Bytes = "020000000000000000000000000000000000000000000000000000000000000000000004".into();
|
||||
|
||||
let expected = InventoryVector {
|
||||
inv_type: 2,
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: 4u8.into(),
|
||||
};
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ log = "0.3"
|
|||
abstract-ns = "0.2.1"
|
||||
ns-dns-tokio = "0.1.0"
|
||||
|
||||
primitives = { path = "../primitives" }
|
||||
primitives = { path = "../primitives"}
|
||||
bitcrypto = { path = "../crypto" }
|
||||
message = { path = "../message" }
|
||||
serialization = { path = "../serialization"}
|
||||
|
|
|
@ -28,7 +28,7 @@ pub fn negotiate_version(local: u32, other: u32) -> u32 {
|
|||
cmp::min(local, other)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct HandshakeResult {
|
||||
pub version: Version,
|
||||
pub negotiated_version: u32,
|
||||
|
@ -180,3 +180,133 @@ impl<A> Future for AcceptHandshake<A> where A: io::Read + io::Write {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io;
|
||||
use futures::Future;
|
||||
use bytes::Bytes;
|
||||
use ser::Stream;
|
||||
use message::{Magic, Message};
|
||||
use message::types::Verack;
|
||||
use message::types::version::{Version, V0, V106, V70001};
|
||||
use super::{handshake, accept_handshake, HandshakeResult};
|
||||
|
||||
pub struct TestIo {
|
||||
read: io::Cursor<Bytes>,
|
||||
write: Bytes,
|
||||
}
|
||||
|
||||
impl io::Read for TestIo {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
io::Read::read(&mut self.read, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for TestIo {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
io::Write::write(&mut self.write, buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
io::Write::flush(&mut self.write)
|
||||
}
|
||||
}
|
||||
|
||||
fn local_version() -> Version {
|
||||
Version::V70001(V0 {
|
||||
version: 70001,
|
||||
services: 1u64.into(),
|
||||
timestamp: 0x4d1015e6,
|
||||
// address and port of remote
|
||||
// services set to 0, cause we know nothing about the node
|
||||
receiver: "00000000000000000000000000000000000000002f5a0808208d".into(),
|
||||
}, V106 {
|
||||
// our local address (not sure if it is valid, or if it is checked at all
|
||||
// services set to 0, because we support nothing
|
||||
from: "00000000000000000000000000000000000000007f000001208d".into(),
|
||||
nonce: 0x3c76a409eb48a227,
|
||||
user_agent: "pbtc".into(),
|
||||
start_height: 0,
|
||||
}, V70001 {
|
||||
relay: true,
|
||||
})
|
||||
}
|
||||
|
||||
fn remote_version() -> Version {
|
||||
Version::V70001(V0 {
|
||||
version: 70012,
|
||||
services: 1u64.into(),
|
||||
timestamp: 0x4d1015e6,
|
||||
// services set to 1, house receiver supports at least the network
|
||||
receiver: "010000000000000000000000000000000000ffffc2b5936adde9".into(),
|
||||
}, V106 {
|
||||
// remote address, port
|
||||
// and supported protocols
|
||||
from: "050000000000000000000000000000000000ffff2f5a0808208d".into(),
|
||||
nonce: 0x3c76a409eb48a227,
|
||||
user_agent: "/Satoshi:0.12.1/".into(),
|
||||
start_height: 0,
|
||||
}, V70001 {
|
||||
relay: true,
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handshake() {
|
||||
let magic = Magic::Mainnet;
|
||||
let version = 70012;
|
||||
let local_version = local_version();
|
||||
let remote_version = remote_version();
|
||||
|
||||
let mut remote_stream = Stream::new();
|
||||
remote_stream.append_slice(Message::new(magic, version, &remote_version).unwrap().as_ref());
|
||||
remote_stream.append_slice(Message::new(magic, version, &Verack).unwrap().as_ref());
|
||||
|
||||
let expected = HandshakeResult {
|
||||
version: remote_version,
|
||||
negotiated_version: 70001,
|
||||
};
|
||||
|
||||
let mut expected_stream = Stream::new();
|
||||
expected_stream.append_slice(Message::new(magic, version, &local_version).unwrap().as_ref());
|
||||
|
||||
let test_io = TestIo {
|
||||
read: io::Cursor::new(remote_stream.out()),
|
||||
write: Bytes::default(),
|
||||
};
|
||||
|
||||
let hs = handshake(test_io, magic, local_version).wait().unwrap();
|
||||
assert_eq!(hs.0.write, expected_stream.out());
|
||||
assert_eq!(hs.1.unwrap(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accept_handshake() {
|
||||
let magic = Magic::Mainnet;
|
||||
let version = 70012;
|
||||
let local_version = local_version();
|
||||
let remote_version = remote_version();
|
||||
|
||||
let mut remote_stream = Stream::new();
|
||||
remote_stream.append_slice(Message::new(magic, version, &remote_version).unwrap().as_ref());
|
||||
|
||||
let test_io = TestIo {
|
||||
read: io::Cursor::new(remote_stream.out()),
|
||||
write: Bytes::default(),
|
||||
};
|
||||
|
||||
let expected = HandshakeResult {
|
||||
version: remote_version,
|
||||
negotiated_version: 70001,
|
||||
};
|
||||
|
||||
let mut expected_stream = Stream::new();
|
||||
expected_stream.append_slice(Message::new(magic, version, &local_version).unwrap().as_ref());
|
||||
expected_stream.append_slice(Message::new(magic, version, &Verack).unwrap().as_ref());
|
||||
|
||||
let hs = accept_handshake(test_io, magic, local_version).wait().unwrap();
|
||||
assert_eq!(hs.0.write, expected_stream.out());
|
||||
assert_eq!(hs.1.unwrap(), expected);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ extern crate ns_dns_tokio;
|
|||
extern crate bitcrypto as crypto;
|
||||
extern crate message;
|
||||
extern crate primitives;
|
||||
extern crate serialization as ser;
|
||||
|
||||
mod io;
|
||||
mod net;
|
||||
|
|
|
@ -8,9 +8,20 @@ use util::nonce::{NonceGenerator, RandomNonce};
|
|||
use p2p::Context;
|
||||
use PeerId;
|
||||
|
||||
pub struct PingProtocol<T = RandomNonce> {
|
||||
pub trait PingContext: Send + Sync {
|
||||
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where Self: Sized, T: Payload;
|
||||
}
|
||||
|
||||
impl PingContext for Context {
|
||||
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
|
||||
let send = Context::send_to_peer(context.clone(), peer, payload);
|
||||
context.spawn(send);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PingProtocol<T = RandomNonce, C = Context> {
|
||||
/// Context
|
||||
context: Arc<Context>,
|
||||
context: Arc<C>,
|
||||
/// Connected peer id.
|
||||
peer: PeerId,
|
||||
/// Nonce generator.
|
||||
|
@ -30,22 +41,20 @@ impl PingProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Protocol for PingProtocol<T> where T: NonceGenerator + Send {
|
||||
impl<T, C> Protocol for PingProtocol<T, C> where T: NonceGenerator + Send, C: PingContext {
|
||||
fn initialize(&mut self, _direction: Direction, _version: u32) {
|
||||
// bitcoind always sends ping, let's do the same
|
||||
let nonce = self.nonce_generator.get();
|
||||
self.last_ping_nonce = Some(nonce);
|
||||
let ping = Ping::new(nonce);
|
||||
let send = Context::send_to_peer(self.context.clone(), self.peer, &ping);
|
||||
self.context.spawn(send);
|
||||
PingContext::send_to_peer(self.context.clone(), self.peer, &ping);
|
||||
}
|
||||
|
||||
fn on_message(&mut self, command: &Command, payload: &Bytes, version: u32) -> Result<(), Error> {
|
||||
if command == &Ping::command() {
|
||||
let ping: Ping = try!(deserialize_payload(payload, version));
|
||||
let pong = Pong::new(ping.nonce);
|
||||
let send = Context::send_to_peer(self.context.clone(), self.peer, &pong);
|
||||
self.context.spawn(send);
|
||||
PingContext::send_to_peer(self.context.clone(), self.peer, &pong);
|
||||
} else if command == &Pong::command() {
|
||||
let pong: Pong = try!(deserialize_payload(payload, version));
|
||||
if Some(pong.nonce) != self.last_ping_nonce.take() {
|
||||
|
@ -57,3 +66,92 @@ impl<T> Protocol for PingProtocol<T> where T: NonceGenerator + Send {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use parking_lot::Mutex;
|
||||
use bytes::Bytes;
|
||||
use message::{Payload, serialize_payload};
|
||||
use message::types::{Ping, Pong};
|
||||
use util::nonce::StaticNonce;
|
||||
use protocol::{Protocol, Direction};
|
||||
use PeerId;
|
||||
use super::{PingProtocol, PingContext};
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestPingContext {
|
||||
version: u32,
|
||||
messages: Mutex<Vec<(PeerId, Bytes)>>,
|
||||
}
|
||||
|
||||
impl PingContext for TestPingContext {
|
||||
fn send_to_peer<T>(context: Arc<Self>, peer: PeerId, payload: &T) where T: Payload {
|
||||
let value = (peer, serialize_payload(payload, context.version).unwrap());
|
||||
context.messages.lock().push(value);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ping_init() {
|
||||
let ping_context = Arc::new(TestPingContext::default());
|
||||
let peer = 99;
|
||||
let nonce = 1000;
|
||||
let expected_message = serialize_payload(&Ping::new(nonce), 0).unwrap();
|
||||
let mut ping_protocol = PingProtocol {
|
||||
context: ping_context.clone(),
|
||||
peer: peer,
|
||||
nonce_generator: StaticNonce::new(nonce),
|
||||
last_ping_nonce: None,
|
||||
};
|
||||
|
||||
ping_protocol.initialize(Direction::Inbound, 0);
|
||||
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
|
||||
assert_eq!(messages.len(), 1);
|
||||
assert_eq!(messages[0].0, peer);
|
||||
assert_eq!(messages[0].1, expected_message);
|
||||
assert_eq!(ping_protocol.last_ping_nonce, Some(nonce));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ping_on_message_ping() {
|
||||
let ping_context = Arc::new(TestPingContext::default());
|
||||
let peer = 99;
|
||||
let nonce = 1000;
|
||||
let command = "ping".into();
|
||||
let message = serialize_payload(&Ping::new(nonce), 0).unwrap();
|
||||
let expected_message = serialize_payload(&Pong::new(nonce), 0).unwrap();
|
||||
let mut ping_protocol = PingProtocol {
|
||||
context: ping_context.clone(),
|
||||
peer: peer,
|
||||
nonce_generator: StaticNonce::new(nonce),
|
||||
last_ping_nonce: None,
|
||||
};
|
||||
|
||||
assert!(ping_protocol.on_message(&command, &message, 0).is_ok());
|
||||
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
|
||||
assert_eq!(messages.len(), 1);
|
||||
assert_eq!(messages[0].0, peer);
|
||||
assert_eq!(messages[0].1, expected_message);
|
||||
assert_eq!(ping_protocol.last_ping_nonce, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ping_on_message_pong() {
|
||||
let ping_context = Arc::new(TestPingContext::default());
|
||||
let peer = 99;
|
||||
let nonce = 1000;
|
||||
let command = "pong".into();
|
||||
let message = serialize_payload(&Pong::new(nonce), 0).unwrap();
|
||||
let mut ping_protocol = PingProtocol {
|
||||
context: ping_context.clone(),
|
||||
peer: peer,
|
||||
nonce_generator: StaticNonce::new(nonce),
|
||||
last_ping_nonce: Some(nonce),
|
||||
};
|
||||
|
||||
assert!(ping_protocol.on_message(&command, &message, 0).is_ok());
|
||||
let messages: Vec<(PeerId, Bytes)> = ping_context.messages.lock().clone();
|
||||
assert_eq!(messages.len(), 0);
|
||||
assert_eq!(ping_protocol.last_ping_nonce, None);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,15 +13,15 @@ impl NonceGenerator for RandomNonce {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct _StaticNonce(u64);
|
||||
pub struct StaticNonce(u64);
|
||||
|
||||
impl _StaticNonce {
|
||||
pub fn _new(nonce: u64) -> Self {
|
||||
_StaticNonce(nonce)
|
||||
impl StaticNonce {
|
||||
pub fn new(nonce: u64) -> Self {
|
||||
StaticNonce(nonce)
|
||||
}
|
||||
}
|
||||
|
||||
impl NonceGenerator for _StaticNonce {
|
||||
impl NonceGenerator for StaticNonce {
|
||||
fn get(&self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ impl LocalNode {
|
|||
|
||||
// request inventory from peer
|
||||
self.executor.lock().execute(SynchronizationTask::RequestInventory(peer_index));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_peer_inventory(&self, peer_index: usize, message: types::Inv) {
|
||||
trace!(target: "sync", "Got `inventory` message from peer#{}. Inventory len: {}", peer_index, message.inventory.len());
|
||||
|
@ -78,7 +78,7 @@ impl LocalNode {
|
|||
let unknown_blocks: Vec<_> = {
|
||||
let chain = self.chain.read();
|
||||
message.inventory.iter()
|
||||
.filter(|item| InventoryType::from_u32(item.inv_type) == Some(InventoryType::MessageBlock))
|
||||
.filter(|item| item.inv_type == InventoryType::MessageBlock)
|
||||
.filter(|item| chain.block_state(&item.hash) == BlockState::Unknown)
|
||||
.map(|item| item.hash.clone())
|
||||
.collect()
|
||||
|
|
|
@ -40,7 +40,7 @@ impl SynchronizationTaskExecutor for LocalSynchronizationTaskExecutor {
|
|||
let getdata = types::GetData {
|
||||
inventory: blocks_hashes.into_iter()
|
||||
.map(|hash| InventoryVector {
|
||||
inv_type: InventoryType::MessageBlock.into(),
|
||||
inv_type: InventoryType::MessageBlock,
|
||||
hash: hash,
|
||||
}).collect()
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue