diff --git a/Cargo.lock b/Cargo.lock index 2f99b14f..744a3c26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,6 +4,7 @@ version = "0.1.0" dependencies = [ "clap 2.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "db 0.1.0", + "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "keys 0.1.0", "message 0.1.0", "miner 0.1.0", @@ -11,6 +12,14 @@ dependencies = [ "script 0.1.0", ] +[[package]] +name = "aho-corasick" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ansi_term" version = "0.9.0" @@ -63,6 +72,7 @@ name = "chain" version = "0.1.0" dependencies = [ "bitcrypto 0.1.0", + "heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "serialization 0.1.0", @@ -117,6 +127,15 @@ name = "elastic-array" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "env_logger" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 0.1.77 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "eth-secp256k1" version = "0.5.6" @@ -162,6 +181,14 @@ dependencies = [ "rayon 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "heapsize" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -204,6 +231,14 @@ name = "log" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "memchr" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "message" version = "0.1.0" @@ -220,6 +255,7 @@ name = "miner" version = "0.1.0" dependencies = [ "chain 0.1.0", + "heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", "serialization 0.1.0", ] @@ -317,11 +353,11 @@ dependencies = [ "bitcrypto 0.1.0", "futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "message 0.1.0", "parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.1.0", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "serialization 0.1.0", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -351,6 +387,7 @@ dependencies = [ name = "primitives" version = "0.1.0" dependencies = [ + "heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -372,6 +409,23 @@ dependencies = [ "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "regex" +version = "0.1.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "regex-syntax 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "regex-syntax" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "rocksdb" version = "0.4.5" @@ -476,6 +530,23 @@ dependencies = [ "chain 0.1.0", ] +[[package]] +name = "thread-id" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thread_local" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.35" @@ -508,6 +579,11 @@ name = "unicode-width" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "utf8-ranges" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "vec_map" version = "0.6.0" @@ -543,6 +619,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] +"checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" "checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096" "checksum base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5024ee8015f02155eee35c711107ddd9a9bf3cb689cf2a9089c97e79b6e1ae83" @@ -554,15 +631,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf" "checksum elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4bc9250a632e7c001b741eb0ec6cee93c9a5b6d5f1879696a4b94d62b012210a" +"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "" "checksum futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0bd34f72c0fffc9d2f6c570fd392bf99b9c5cd1481d79809e1cc2320befc0af0" "checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82" "checksum gcc 0.3.35 (registry+https://github.com/rust-lang/crates.io-index)" = "91ecd03771effb0c968fd6950b37e89476a578aaf1c70297d8e92b6516ec3312" +"checksum heapsize 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8c80e194758495a9109566134dc06e42ea0423987d6ceca016edaa90381b3549" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f" "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" "checksum libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)" = "408014cace30ee0f767b1c4517980646a573ec61a57957aeeabcac8ac0a02e8d" "checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054" +"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2dadd39d4b47343e10513ac2a731c979517a4761224ecb6bbd243602300c9537" "checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" @@ -576,6 +656,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum parking_lot_core 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fb1b97670a2ffadce7c397fb80a3d687c4f3060140b885621ef1653d0e5d5068" "checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5" "checksum rayon 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "655df67c314c30fa3055a365eae276eb88aa4f3413a352a1ab32c1320eda41ea" +"checksum regex 0.1.77 (registry+https://github.com/rust-lang/crates.io-index)" = "64b03446c466d35b42f2a8b203c8e03ed8b91c0f17b56e1f84f7210a257aa665" +"checksum regex-syntax 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "48f0573bcee95a48da786f8823465b5f2a1fae288a55407aca991e5b3e0eae11" "checksum rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)" = "" "checksum rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)" = "" "checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" @@ -587,10 +669,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410" "checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e" "checksum term_size 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3f7f5f3f71b0040cecc71af239414c23fd3c73570f5ff54cf50e03cef637f2a0" +"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" +"checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" "checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af" "checksum tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "659cbae6c954dee37352853816c6a52180e47feb70be73bbfeec6d58c4da4a71" "checksum unicode-segmentation 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b905d0fc2a1f0befd86b0e72e31d1787944efef9d38b9358a9e92a69757f7e3b" "checksum unicode-width 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6722facc10989f63ee0e20a83cd4e1714a9ae11529403ac7e0afd069abc39e" +"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f" "checksum vec_map 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cac5efe5cb0fa14ec2f84f83c701c562ee63f6dcc680861b21d65c682adfb05f" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" diff --git a/Cargo.toml b/Cargo.toml index 4c6732e1..2333ecf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ authors = ["Ethcore "] description = "Parity bitcoin client." [dependencies] +env_logger = "0.3" clap = { version = "2", features = ["yaml"] } keys = { path = "keys" } message = { path = "message" } diff --git a/chain/Cargo.toml b/chain/Cargo.toml index 57d0cc9e..ca16d59f 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -5,6 +5,7 @@ authors = ["debris "] [dependencies] rustc-serialize = "0.3" +heapsize = "0.3" bitcrypto = { path = "../crypto" } primitives = { path = "../primitives" } serialization = { path = "../serialization" } diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 9af799bf..6ea2d533 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -1,4 +1,5 @@ extern crate rustc_serialize; +extern crate heapsize; extern crate primitives; extern crate bitcrypto as crypto; extern crate serialization as ser; diff --git a/chain/src/transaction.rs b/chain/src/transaction.rs index 8912a62b..bd23713f 100644 --- a/chain/src/transaction.rs +++ b/chain/src/transaction.rs @@ -2,6 +2,7 @@ //! Bitcoin trainsaction. //! https://en.bitcoin.it/wiki/Protocol_documentation#tx +use heapsize::HeapSizeOf; use hex::FromHex; use bytes::Bytes; use ser::{ @@ -105,6 +106,12 @@ impl Deserializable for TransactionInput { } } +impl HeapSizeOf for TransactionInput { + fn heap_size_of_children(&self) -> usize { + self.script_sig.heap_size_of_children() + } +} + impl TransactionInput { pub fn previous_output(&self) -> &OutPoint { &self.previous_output @@ -159,6 +166,12 @@ impl Default for TransactionOutput { } } +impl HeapSizeOf for TransactionOutput { + fn heap_size_of_children(&self) -> usize { + self.script_pubkey.heap_size_of_children() + } +} + impl TransactionOutput { pub fn value(&self) -> u64 { self.value @@ -214,6 +227,12 @@ impl From<&'static str> for Transaction { } } +impl HeapSizeOf for Transaction { + fn heap_size_of_children(&self) -> usize { + self.inputs.heap_size_of_children() + self.outputs.heap_size_of_children() + } +} + impl Transaction { pub fn hash(&self) -> H256 { dhash256(&serialize(self)) diff --git a/message/src/common/address.rs b/message/src/common/address.rs index e9a67485..f5c1371d 100644 --- a/message/src/common/address.rs +++ b/message/src/common/address.rs @@ -3,11 +3,11 @@ use ser::{ Stream, Serializable, Reader, Deserializable, Error as ReaderError, deserialize, }; -use common::{Port, IpAddress, ServiceFlags}; +use common::{Port, IpAddress, Services}; #[derive(Debug, PartialEq)] pub struct NetAddress { - pub services: ServiceFlags, + pub services: Services, pub address: IpAddress, pub port: Port, } @@ -42,7 +42,7 @@ impl From<&'static str> for NetAddress { #[cfg(test)] mod tests { use ser::{serialize, deserialize}; - use common::ServiceFlags; + use common::Services; use super::NetAddress; #[test] @@ -54,7 +54,7 @@ mod tests { ].into(); let address = NetAddress { - services: ServiceFlags::default().with_network(true), + services: Services::default().with_network(true), address: "::ffff:a00:1".into(), port: 8333.into(), }; @@ -71,7 +71,7 @@ mod tests { ]; let expected = NetAddress { - services: ServiceFlags::default().with_network(true), + services: Services::default().with_network(true), address: "::ffff:a00:1".into(), port: 8333.into(), }; @@ -82,7 +82,7 @@ mod tests { #[test] fn test_net_address_from_static_str() { let expected = NetAddress { - services: ServiceFlags::default().with_network(true), + services: Services::default().with_network(true), address: "::ffff:a00:1".into(), port: 8333.into(), diff --git a/message/src/common/mod.rs b/message/src/common/mod.rs index a6164156..08bbc1e3 100644 --- a/message/src/common/mod.rs +++ b/message/src/common/mod.rs @@ -20,4 +20,4 @@ pub use self::ip::IpAddress; pub use self::magic::Magic; pub use self::port::Port; pub use self::prefilled_transaction::PrefilledTransaction; -pub use self::service::ServiceFlags; +pub use self::service::Services; diff --git a/message/src/common/service.rs b/message/src/common/service.rs index 209ef35c..c3699218 100644 --- a/message/src/common/service.rs +++ b/message/src/common/service.rs @@ -3,23 +3,22 @@ use ser::{ Deserializable, Reader, Error as ReaderError }; -#[derive(Debug, Default, PartialEq, Clone, Copy)] -pub struct ServiceFlags(u64); +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub struct Services(u64); -impl From for u64 { - fn from(s: ServiceFlags) -> Self { +impl From for u64 { + fn from(s: Services) -> Self { s.0 } } -impl From for ServiceFlags { +impl From for Services { fn from(v: u64) -> Self { - ServiceFlags(v) + Services(v) } } - -impl ServiceFlags { +impl Services { pub fn network(&self) -> bool { self.bit_at(0) } @@ -63,7 +62,11 @@ impl ServiceFlags { pub fn with_xthin(mut self, v: bool) -> Self { self.set_bit(4, v); self - } + } + + pub fn includes(&self, other: &Self) -> bool { + self.0 & other.0 == other.0 + } fn set_bit(&mut self, bit: usize, bit_value: bool) { if bit_value { @@ -78,14 +81,35 @@ impl ServiceFlags { } } -impl Serializable for ServiceFlags { +impl Serializable for Services { fn serialize(&self, stream: &mut Stream) { stream.append(&self.0); } } -impl Deserializable for ServiceFlags { +impl Deserializable for Services { fn deserialize(reader: &mut Reader) -> Result where Self: Sized { - reader.read().map(ServiceFlags) + reader.read().map(Services) + } +} + +#[cfg(test)] +mod test { + use super::Services; + + #[test] + fn test_serivces_includes() { + let s1 = Services::default() + .with_witness(true) + .with_xthin(true); + let s2 = Services::default() + .with_witness(true); + + assert!(s1.witness()); + assert!(s1.xthin()); + assert!(s2.witness()); + assert!(!s2.xthin()); + assert!(s1.includes(&s2)); + assert!(!s2.includes(&s1)); } } diff --git a/message/src/serialization/stream.rs b/message/src/serialization/stream.rs index 3b1f52e3..cd4effd8 100644 --- a/message/src/serialization/stream.rs +++ b/message/src/serialization/stream.rs @@ -29,10 +29,6 @@ impl PayloadStream { t.serialize_payload(&mut self.stream, self.version) } - pub fn raw_stream(&mut self) -> &mut Stream { - &mut self.stream - } - pub fn out(self) -> Bytes { self.stream.out() } diff --git a/message/src/types/ping.rs b/message/src/types/ping.rs index 9d8e00f4..6aea240f 100644 --- a/message/src/types/ping.rs +++ b/message/src/types/ping.rs @@ -6,6 +6,14 @@ pub struct Ping { pub nonce: u64, } +impl Ping { + pub fn new(nonce: u64) -> Self { + Ping { + nonce: nonce, + } + } +} + impl Payload for Ping { fn version() -> u32 { 0 diff --git a/message/src/types/pong.rs b/message/src/types/pong.rs index f695d954..fd2419de 100644 --- a/message/src/types/pong.rs +++ b/message/src/types/pong.rs @@ -6,6 +6,14 @@ pub struct Pong { pub nonce: u64, } +impl Pong { + pub fn new(nonce: u64) -> Self { + Pong { + nonce: nonce, + } + } +} + impl Payload for Pong { fn version() -> u32 { 0 diff --git a/message/src/types/version.rs b/message/src/types/version.rs index f4d019b5..f66a8bfb 100644 --- a/message/src/types/version.rs +++ b/message/src/types/version.rs @@ -3,7 +3,7 @@ use ser::{ Serializable, Stream, Deserializable, Reader, Error as ReaderError, }; -use common::{NetAddress, ServiceFlags}; +use common::{NetAddress, Services}; use {Payload, MessageResult}; use serialization::deserialize_payload; @@ -69,12 +69,20 @@ impl Version { Version::V70001(ref s, _, _) => s.version, } } + + pub fn services(&self) -> Services { + match *self { + Version::V0(ref s) => s.services, + Version::V106(ref s, _) => s.services, + Version::V70001(ref s, _, _) => s.services, + } + } } #[derive(Debug, PartialEq)] pub struct V0 { pub version: u32, - pub services: ServiceFlags, + pub services: Services, pub timestamp: i64, pub receiver: NetAddress, } diff --git a/miner/Cargo.toml b/miner/Cargo.toml index c909881b..41bb19a6 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" authors = ["Ethcore "] [dependencies] +heapsize = "0.3" chain = { path = "../chain" } primitives = { path = "../primitives" } serialization = { path = "../serialization" } diff --git a/miner/benches/mod.rs b/miner/benches/mod.rs new file mode 100644 index 00000000..0826871e --- /dev/null +++ b/miner/benches/mod.rs @@ -0,0 +1,119 @@ +#![feature(test)] + +extern crate test; +extern crate miner; +extern crate chain; +extern crate primitives; +extern crate serialization as ser; + +#[cfg(test)] +mod benchmarks { + use std::collections::VecDeque; + use super::chain::{Transaction, TransactionInput, OutPoint}; + use super::primitives::bytes::Bytes; + use super::test::Bencher; + use super::miner::{MemoryPool, MemoryPoolOrderingStrategy}; + + fn prepare_independent_transactions(n: usize) -> VecDeque { + (0..n).map(|nonce| Transaction { + version: nonce as i32, + inputs: vec![], + outputs: vec![], + lock_time: 0, + }).collect() + } + + fn prepare_dependent_transactions(n: usize) -> VecDeque { + let previous_transaction: Transaction = "00000000000164000000000000000000000000".into(); + let mut previous_transaction_hash = previous_transaction.hash(); + let mut result = VecDeque::new(); + result.push_back(previous_transaction); + result.extend((0..n).map(|_nonce| { + let transaction = Transaction { + version: 0, + inputs: vec![ + TransactionInput { + previous_output: OutPoint { + hash: previous_transaction_hash.clone(), + index: 0, + }, + script_sig: Bytes::new_with_len(0), + sequence: 0, + }, + ], + outputs: vec![], + lock_time: 0, + }; + previous_transaction_hash = transaction.hash(); + transaction + })); + result + } + + #[bench] + // test benchmarks::memory_pool_insert_independent_transactions ... bench: 1,455 ns/iter (+/- 12) + fn memory_pool_insert_independent_transactions(b: &mut Bencher) { + let iterations = 100; + let mut pool = MemoryPool::new(); + let mut transactions = prepare_independent_transactions(iterations); + b.bench_n(iterations as u64, |b| b.iter(|| { + pool.insert_verified(transactions.pop_front().unwrap()) + })); + } + + #[bench] + // test benchmarks::memory_pool_insert_descendant_transaction ... bench: 7,834 ns/iter (+/- 288) + fn memory_pool_insert_descendant_transaction(b: &mut Bencher) { + let iterations = 100usize; + let mut pool = MemoryPool::new(); + let mut transactions = prepare_dependent_transactions(iterations); + pool.insert_verified(transactions.pop_front().unwrap()); + + b.bench_n(iterations as u64, |b| b.iter(|| { + pool.insert_verified(transactions.pop_front().unwrap()) + })); + } + + #[bench] + // test benchmarks::memory_pool_insert_ancestor_transaction ... bench: 376,067 ns/iter (+/- 11,249) + // very slow due to weird usage scenario: + // (1) transactions inserted to memorypool are verified + // (2) verified => ancestors already verified + // (3) ancestors verified => they are already in memorypool (not this case) or in database (why inserting to memorypool then) + fn memory_pool_insert_ancestor_transaction(b: &mut Bencher) { + let iterations = 100usize; + let mut pool = MemoryPool::new(); + let mut transactions = prepare_dependent_transactions(iterations); + pool.insert_verified(transactions.pop_front().unwrap()); + + b.bench_n(iterations as u64, |b| b.iter(|| { + pool.insert_verified(transactions.pop_back().unwrap()) + })); + } + + #[bench] + // test benchmarks::memory_pool_remove_independent_in_order ... bench: 460 ns/iter (+/- 47) + fn memory_pool_remove_independent_in_order(b: &mut Bencher) { + let iterations = 100; + let mut pool = MemoryPool::new(); + for transaction in prepare_independent_transactions(iterations) { + pool.insert_verified(transaction) + } + b.bench_n(iterations as u64, |b| b.iter(|| { + pool.remove_with_strategy(MemoryPoolOrderingStrategy::ByTimestamp) + })); + } + + #[bench] + // test benchmarks::memory_pool_remove_dependent_in_order ... bench: 754 ns/iter (+/- 111) + fn memory_pool_remove_dependent_in_order(b: &mut Bencher) { + let iterations = 100; + let mut pool = MemoryPool::new(); + for transaction in prepare_dependent_transactions(iterations) { + pool.insert_verified(transaction) + } + b.bench_n(iterations as u64, |b| b.iter(|| { + pool.remove_with_strategy(MemoryPoolOrderingStrategy::ByTimestamp) + })); + } +} diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 2462ef2b..c0121298 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -1,4 +1,5 @@ extern crate chain; +extern crate heapsize; extern crate primitives; extern crate serialization as ser; @@ -6,4 +7,4 @@ pub mod memory_pool; pub use primitives::{hash}; -pub use self::memory_pool::{MemoryPool, Information as MemoryPoolInformation}; \ No newline at end of file +pub use self::memory_pool::{MemoryPool, Information as MemoryPoolInformation, OrderingStrategy as MemoryPoolOrderingStrategy}; \ No newline at end of file diff --git a/miner/src/memory_pool.rs b/miner/src/memory_pool.rs index e8989b54..05693f6d 100644 --- a/miner/src/memory_pool.rs +++ b/miner/src/memory_pool.rs @@ -10,7 +10,9 @@ use chain::Transaction; use std::cmp::Ordering; use std::collections::HashMap; use std::collections::HashSet; +use std::collections::BTreeSet; use ser::Serializable; +use heapsize::HeapSizeOf; /// Transactions ordering strategy #[derive(Debug, Clone, Copy)] @@ -26,9 +28,9 @@ pub enum OrderingStrategy { /// Information on current `MemoryPool` state #[derive(Debug)] pub struct Information { - /// The number of transactions currently in the `MemoryPool` + /// Number of transactions currently in the `MemoryPool` pub transactions_count: usize, - /// The total number of bytes in the transactions in the `MemoryPool` + /// Total number of bytes occupied by transactions from the `MemoryPool` pub transactions_size_in_bytes: usize, } @@ -77,134 +79,152 @@ struct Storage { references: ReferenceStorage, } +/// Multi-index storage which holds references to entries from Storage::by_hash #[derive(Debug, Clone)] struct ReferenceStorage { /// By-input storage by_input: HashMap>, /// Pending entries pending: HashSet, - /// By-entry-time storage - by_storage_index: storage_index_strategy::Storage, - /// By-score storage - by_transaction_score: transaction_score_strategy::Storage, - /// By-package-score strategy - by_package_score: package_score_strategy::Storage, + /// Ordered storage + ordered: OrderedReferenceStorage, } -macro_rules! ordering_strategy { - ($strategy: ident; $($member: ident: $member_type: ty), *; $comparer: expr) => { - mod $strategy { - use std::cmp::Ordering; - use hash::H256; - use std::collections::BTreeSet; - use super::Entry; +/// Multi-index orderings storage which holds ordered references to entries from Storage::by_hash +#[derive(Debug, Clone)] +struct OrderedReferenceStorage { + /// By-entry-time storage + by_storage_index: BTreeSet, + /// By-score storage + by_transaction_score: BTreeSet, + /// By-package-score strategy + by_package_score: BTreeSet, +} - /// Lightweight struct maintain transactions ordering - #[derive(Debug, Eq, PartialEq, Clone)] - pub struct OrderedEntry { - /// Transaction hash - hash: H256, - /// Transaction data - $($member: $member_type), * - } +#[derive(Debug, Clone, PartialEq, Eq)] +struct ByTimestampOrderedEntry { + /// Transaction hash + hash: H256, + /// Throughout index of this transaction in memory pool (non persistent) + storage_index: u64, +} - impl OrderedEntry { - pub fn for_entry(entry: &Entry) -> OrderedEntry { - OrderedEntry { - hash: entry.hash.clone(), - $($member: entry.$member.clone()), * - } - } - } +#[derive(Debug, Eq, PartialEq, Clone)] +struct ByTransactionScoreOrderedEntry { + /// Transaction hash + hash: H256, + /// Transaction size + size: usize, + /// Transaction fee + miner_fee: i64, + /// Virtual transaction fee + miner_virtual_fee: i64, +} - impl PartialOrd for OrderedEntry { - fn partial_cmp(&self, other: &OrderedEntry) -> Option { - Some(self.cmp(other)) - } - } +#[derive(Debug, Eq, PartialEq, Clone)] +struct ByPackageScoreOrderedEntry { + /// Transaction hash + hash: H256, + /// size + Sum(size) for all in-pool descendants + package_size: usize, + /// miner_fee + Sum(miner_fee) for all in-pool descendants + package_miner_fee: i64, + /// miner_virtual_fee + Sum(miner_virtual_fee) for all in-pool descendants + package_miner_virtual_fee: i64, +} - impl Ord for OrderedEntry { - fn cmp(&self, other: &Self) -> Ordering { - let order = $comparer(&self, other); - if order != Ordering::Equal { - return order - } - - self.hash.cmp(&other.hash) - } - } - - /// Ordering storage - #[derive(Debug, Clone)] - pub struct Storage { - data: BTreeSet, - } - - impl Storage { - pub fn new() -> Self { - Storage { - data: BTreeSet::new() - } - } - - /// Insert entry to storage - pub fn insert(&mut self, entry: &Entry) { - self.data.replace(OrderedEntry::for_entry(entry)); - } - - /// Remove entry from storage - pub fn remove(&mut self, entry: &Entry) -> bool { - self.data.remove(&OrderedEntry::for_entry(entry)) - } - - /// Returns hash of the top entry - pub fn top(&self) -> Option { - self.data.iter().map(|ref entry| entry.hash.clone()).nth(0) - } - } +impl<'a> From<&'a Entry> for ByTimestampOrderedEntry { + fn from(entry: &'a Entry) -> Self { + ByTimestampOrderedEntry { + hash: entry.hash.clone(), + storage_index: entry.storage_index, } } } -// Ordering strategies declaration - -ordering_strategy!(storage_index_strategy; - storage_index: u64; - |me: &Self, other: &Self| - me.storage_index.cmp(&other.storage_index)); -ordering_strategy!(transaction_score_strategy; - size: usize, miner_fee: i64, miner_virtual_fee: i64; - |me: &Self, other: &Self| { - // lesser miner score means later removal - let left = (me.miner_fee + me.miner_virtual_fee) * (other.size as i64); - let right = (other.miner_fee + other.miner_virtual_fee) * (me.size as i64); - right.cmp(&left) - }); -ordering_strategy!(package_score_strategy; - package_size: usize, package_miner_fee: i64, package_miner_virtual_fee: i64; - |me: &Self, other: &Self| { - // lesser miner score means later removal - let left = (me.package_miner_fee + me.package_miner_virtual_fee) * (other.package_size as i64); - let right = (other.package_miner_fee + other.package_miner_virtual_fee) * (me.package_size as i64); - right.cmp(&left) - }); - -// Macro to use instead of member functions (to deal with double references) - -macro_rules! insert_to_orderings { - ($me: expr, $entry: expr) => ( - $me.by_storage_index.insert(&$entry); - $me.by_transaction_score.insert(&$entry); - $me.by_package_score.insert(&$entry); - ) +impl<'a> From<&'a Entry> for ByTransactionScoreOrderedEntry { + fn from(entry: &'a Entry) -> Self { + ByTransactionScoreOrderedEntry { + hash: entry.hash.clone(), + size: entry.size, + miner_fee: entry.miner_fee, + miner_virtual_fee: entry.miner_virtual_fee, + } + } } -macro_rules! remove_from_orderings { - ($me: expr, $entry: expr) => ( - $me.by_storage_index.remove(&$entry); - $me.by_transaction_score.remove(&$entry); - $me.by_package_score.remove(&$entry); - ) +impl<'a> From<&'a Entry> for ByPackageScoreOrderedEntry { + fn from(entry: &'a Entry) -> Self { + ByPackageScoreOrderedEntry { + hash: entry.hash.clone(), + package_size: entry.package_size, + package_miner_fee: entry.package_miner_fee, + package_miner_virtual_fee: entry.package_miner_virtual_fee, + } + } +} + +impl PartialOrd for ByTimestampOrderedEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ByTimestampOrderedEntry { + fn cmp(&self, other: &Self) -> Ordering { + let order = self.storage_index.cmp(&other.storage_index); + if order != Ordering::Equal { + return order + } + + self.hash.cmp(&other.hash) + } +} + +impl PartialOrd for ByTransactionScoreOrderedEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ByTransactionScoreOrderedEntry { + fn cmp(&self, other: &Self) -> Ordering { + // lesser miner score means later removal + let left = (self.miner_fee + self.miner_virtual_fee) * (other.size as i64); + let right = (other.miner_fee + other.miner_virtual_fee) * (self.size as i64); + let order = right.cmp(&left); + if order != Ordering::Equal { + return order + } + + self.hash.cmp(&other.hash) + } +} + +impl PartialOrd for ByPackageScoreOrderedEntry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ByPackageScoreOrderedEntry { + fn cmp(&self, other: &Self) -> Ordering { + // lesser miner score means later removal + let left = (self.package_miner_fee + self.package_miner_virtual_fee) * (other.package_size as i64); + let right = (other.package_miner_fee + other.package_miner_virtual_fee) * (self.package_size as i64); + let order = right.cmp(&left); + if order != Ordering::Equal { + return order + } + + self.hash.cmp(&other.hash) + } +} + +impl HeapSizeOf for Entry { + fn heap_size_of_children(&self) -> usize { + self.transaction.heap_size_of_children() + self.ancestors.heap_size_of_children() + } } impl Storage { @@ -216,9 +236,11 @@ impl Storage { references: ReferenceStorage { by_input: HashMap::new(), pending: HashSet::new(), - by_storage_index: storage_index_strategy::Storage::new(), - by_transaction_score: transaction_score_strategy::Storage::new(), - by_package_score: package_score_strategy::Storage::new(), + ordered: OrderedReferenceStorage { + by_storage_index: BTreeSet::new(), + by_transaction_score: BTreeSet::new(), + by_package_score: BTreeSet::new(), + }, }, } } @@ -234,25 +256,25 @@ impl Storage { // update score of all packages this transaction is in for ancestor_hash in entry.ancestors.iter() { - if let Some(ref mut ancestor_entry) = self.by_hash.get_mut(ancestor_hash) { - let removed = self.references.by_package_score.remove(ancestor_entry); + if let Some(mut ancestor_entry) = self.by_hash.get_mut(ancestor_hash) { + let removed = self.references.ordered.by_package_score.remove(&(ancestor_entry as &Entry).into()); ancestor_entry.package_size += entry.size; ancestor_entry.package_miner_fee += entry.package_miner_fee; ancestor_entry.package_miner_virtual_fee += entry.package_miner_virtual_fee; if removed { - self.references.by_package_score.insert(ancestor_entry); + self.references.ordered.by_package_score.insert((ancestor_entry as &Entry).into()); } } } // insert either to pending queue or to orderings - if Storage::has_in_pool_ancestors(None, &self.by_hash, &entry.transaction) { + if self.references.has_in_pool_ancestors(None, &self.by_hash, &entry.transaction) { self.references.pending.insert(entry.hash.clone()); } else { - insert_to_orderings!(self.references, entry); + self.references.ordered.insert_to_orderings(&entry); } // add to by_hash storage @@ -273,9 +295,9 @@ impl Storage { let mut ancestors: Option> = None; // modify the entry itself - if let Some(ref mut entry) = self.by_hash.get_mut(h) { - let insert_to_package_score = self.references.by_package_score.remove(&entry); - let insert_to_transaction_score = self.references.by_transaction_score.remove(&entry); + if let Some(mut entry) = self.by_hash.get_mut(h) { + let insert_to_package_score = self.references.ordered.by_package_score.remove(&(entry as &Entry).into()); + let insert_to_transaction_score = self.references.ordered.by_transaction_score.remove(&(entry as &Entry).into()); miner_virtual_fee_change = virtual_fee - entry.miner_virtual_fee; if !entry.ancestors.is_empty() { @@ -285,10 +307,10 @@ impl Storage { entry.miner_virtual_fee = virtual_fee; if insert_to_transaction_score { - self.references.by_transaction_score.insert(&entry); + self.references.ordered.by_transaction_score.insert((entry as &Entry).into()); } if insert_to_package_score { - self.references.by_package_score.insert(&entry); + self.references.ordered.by_package_score.insert((entry as &Entry).into()); } } @@ -296,11 +318,11 @@ impl Storage { if miner_virtual_fee_change != 0 { ancestors.map(|ancestors| { for ancestor_hash in ancestors { - if let Some(ref mut ancestor_entry) = self.by_hash.get_mut(&ancestor_hash) { - let insert_to_package_score = self.references.by_package_score.remove(ancestor_entry); + if let Some(mut ancestor_entry) = self.by_hash.get_mut(&ancestor_hash) { + let insert_to_package_score = self.references.ordered.by_package_score.remove(&(ancestor_entry as &Entry).into()); ancestor_entry.package_miner_virtual_fee += miner_virtual_fee_change; if insert_to_package_score { - self.references.by_package_score.insert(ancestor_entry); + self.references.ordered.by_package_score.insert((ancestor_entry as &Entry).into()); } } } @@ -310,9 +332,9 @@ impl Storage { pub fn read_with_strategy(&self, strategy: OrderingStrategy) -> Option { match strategy { - OrderingStrategy::ByTimestamp => self.references.by_storage_index.top(), - OrderingStrategy::ByTransactionScore => self.references.by_transaction_score.top(), - OrderingStrategy::ByPackageScore => self.references.by_package_score.top(), + OrderingStrategy::ByTimestamp => self.references.ordered.by_storage_index.iter().map(|entry| entry.hash.clone()).nth(0), + OrderingStrategy::ByTransactionScore => self.references.ordered.by_transaction_score.iter().map(|entry| entry.hash.clone()).nth(0), + OrderingStrategy::ByPackageScore => self.references.ordered.by_package_score.iter().map(|entry| entry.hash.clone()).nth(0), } } @@ -336,9 +358,9 @@ impl Storage { n -= 1; let top_hash = match strategy { - OrderingStrategy::ByTimestamp => references.by_storage_index.top(), - OrderingStrategy::ByTransactionScore => references.by_transaction_score.top(), - OrderingStrategy::ByPackageScore => references.by_package_score.top(), + OrderingStrategy::ByTimestamp => references.ordered.by_storage_index.iter().map(|entry| entry.hash.clone()).nth(0), + OrderingStrategy::ByTransactionScore => references.ordered.by_transaction_score.iter().map(|entry| entry.hash.clone()).nth(0), + OrderingStrategy::ByPackageScore => references.ordered.by_package_score.iter().map(|entry| entry.hash.clone()).nth(0), }; match top_hash { None => break, @@ -346,7 +368,7 @@ impl Storage { self.by_hash.get(&top_hash).map(|entry| { // simulate removal removed.insert(top_hash.clone()); - Storage::remove(Some(&removed), &self.by_hash, &mut references, &entry); + references.remove(Some(&removed), &self.by_hash, &entry); // return this entry result.push(top_hash); @@ -364,7 +386,7 @@ impl Storage { self.transactions_size_in_bytes -= entry.size; // remove from storage - Storage::remove(None, &self.by_hash, &mut self.references, &entry); + self.references.remove(None, &self.by_hash, &entry); entry }) @@ -415,9 +437,9 @@ impl Storage { pub fn remove_with_strategy(&mut self, strategy: OrderingStrategy) -> Option { let top_hash = match strategy { - OrderingStrategy::ByTimestamp => self.references.by_storage_index.top(), - OrderingStrategy::ByTransactionScore => self.references.by_transaction_score.top(), - OrderingStrategy::ByPackageScore => self.references.by_package_score.top(), + OrderingStrategy::ByTimestamp => self.references.ordered.by_storage_index.iter().map(|entry| entry.hash.clone()).nth(0), + OrderingStrategy::ByTransactionScore => self.references.ordered.by_transaction_score.iter().map(|entry| entry.hash.clone()).nth(0), + OrderingStrategy::ByPackageScore => self.references.ordered.by_package_score.iter().map(|entry| entry.hash.clone()).nth(0), }; top_hash.map(|hash| self.remove_by_hash(&hash) .expect("`hash` is read from `references`; entries in `references` have corresponging entries in `by_hash`; `remove_by_hash` removes entry from `by_hash`; qed") @@ -443,36 +465,77 @@ impl Storage { pub fn get_transactions_ids(&self) -> Vec { self.by_hash.keys().map(|h| h.clone()).collect() } +} - fn remove(removed: Option<&HashSet>, by_hash: &HashMap, references: &mut ReferenceStorage, entry: &Entry) { +impl ReferenceStorage { + pub fn has_in_pool_ancestors(&self, removed: Option<&HashSet>, by_hash: &HashMap, transaction: &Transaction) -> bool { + transaction.inputs.iter() + .any(|input| by_hash.contains_key(&input.previous_output.hash) + && !removed.map_or(false, |r| r.contains(&input.previous_output.hash))) + } + + pub fn remove(&mut self, removed: Option<&HashSet>, by_hash: &HashMap, entry: &Entry) { // for each pending descendant transaction - if let Some(descendants) = references.by_input.get(&entry.hash) { + if let Some(descendants) = self.by_input.get(&entry.hash) { let descendants = descendants.iter().filter_map(|hash| by_hash.get(&hash)); for descendant in descendants { // if there are no more ancestors of this transaction in the pool // => can move from pending to orderings - if !Storage::has_in_pool_ancestors(removed, by_hash, &descendant.transaction) { - references.pending.remove(&descendant.hash); + if !self.has_in_pool_ancestors(removed, by_hash, &descendant.transaction) { + self.pending.remove(&descendant.hash); if let Some(descendant_entry) = by_hash.get(&descendant.hash) { - insert_to_orderings!(references, &descendant_entry); + self.ordered.insert_to_orderings(&descendant_entry); } } } } - references.by_input.remove(&entry.hash); + self.by_input.remove(&entry.hash); // remove from pending - references.pending.remove(&entry.hash); + self.pending.remove(&entry.hash); // remove from orderings - remove_from_orderings!(references, entry); + self.ordered.remove_from_orderings(entry); + } +} + +impl OrderedReferenceStorage { + pub fn insert_to_orderings(&mut self, entry: &Entry) { + self.by_storage_index.insert(entry.into()); + self.by_transaction_score.insert(entry.into()); + self.by_package_score.insert(entry.into()); } - fn has_in_pool_ancestors(removed: Option<&HashSet>, by_hash: &HashMap, transaction: &Transaction) -> bool { - transaction.inputs.iter() - .any(|input| by_hash.contains_key(&input.previous_output.hash) - && !removed.map_or(false, |r| r.contains(&input.previous_output.hash))) + pub fn remove_from_orderings(&mut self, entry: &Entry) { + self.by_storage_index.remove(&entry.into()); + self.by_transaction_score.remove(&entry.into()); + self.by_package_score.remove(&entry.into()); + } +} + +impl HeapSizeOf for Storage { + fn heap_size_of_children(&self) -> usize { + self.by_hash.heap_size_of_children() + self.references.heap_size_of_children() + } +} + +impl HeapSizeOf for ReferenceStorage { + fn heap_size_of_children(&self) -> usize { + self.by_input.heap_size_of_children() + + self.pending.heap_size_of_children() + + self.ordered.heap_size_of_children() + } +} + +impl HeapSizeOf for OrderedReferenceStorage { + fn heap_size_of_children(&self) -> usize { + // HeapSizeOf is not implemented for BTreeSet => rough estimation here + use std::mem::size_of; + let len = self.by_storage_index.len(); + len * (size_of::() + + size_of::() + + size_of::()) } } @@ -511,6 +574,7 @@ impl MemoryPool { /// Reads hashes of up to n transactions from the `MemoryPool`, using selected strategy. /// Ancestors are always returned before descendant transactions. + /// Use this function with care, only if really needed (heavy memory usage) pub fn read_n_with_strategy(&mut self, n: usize, strategy: OrderingStrategy) -> Vec { self.storage.read_n_with_strategy(n, strategy) } @@ -547,7 +611,7 @@ impl MemoryPool { pub fn information(&self) -> Information { Information { transactions_count: self.storage.by_hash.len(), - transactions_size_in_bytes: self.storage.transactions_size_in_bytes + transactions_size_in_bytes: self.storage.transactions_size_in_bytes, } } @@ -613,11 +677,18 @@ impl MemoryPool { } } +impl HeapSizeOf for MemoryPool { + fn heap_size_of_children(&self) -> usize { + self.storage.heap_size_of_children() + } +} + #[cfg(test)] mod tests { use std::cmp::Ordering; use hash::H256; use chain::Transaction; + use heapsize::HeapSizeOf; use super::{MemoryPool, OrderingStrategy}; // output_value = 898126612, size = 225, miner_score ~ 3991673.83 @@ -657,6 +728,21 @@ mod tests { pool } + #[test] + fn test_memory_pool_heap_size() { + let mut pool = MemoryPool::new(); + + let size1 = pool.heap_size_of_children(); + + pool.insert_verified(RAW_TRANSACTION1.into()); + let size2 = pool.heap_size_of_children(); + assert!(size2 > size1); + + pool.insert_verified(RAW_TRANSACTION2.into()); + let size3 = pool.heap_size_of_children(); + assert!(size3 > size2); + } + #[test] fn test_memory_pool_insert_same_transaction() { let mut pool = MemoryPool::new(); diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 8300acec..ac61c3c4 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -10,8 +10,8 @@ futures = "0.1" futures-cpupool = "0.1" time = "0.1" rand = "0.3" +log = "0.3" primitives = { path = "../primitives" } bitcrypto = { path = "../crypto" } -serialization = { path = "../serialization" } message = { path = "../message" } diff --git a/p2p/src/config.rs b/p2p/src/config.rs index 3b0b96e1..5cd0c5d8 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -3,10 +3,20 @@ use net::Config as NetConfig; #[derive(Debug)] pub struct Config { + /// Number of threads used by p2p thread pool. + pub threads: usize, + /// Lowest supported protocol version. + pub protocol_minimum: u32, + /// Highest supported protocol version. + pub protocol_maximum: u32, + /// Number of inbound connections. + pub inbound_connections: usize, + /// Number of outbound connections. + pub outbound_connections: usize, /// Configuration for every connection. pub connection: NetConfig, - /// Connect to these nodes to retrieve peer addresses, and disconnect. - pub seednodes: Vec, /// Connect only ot these nodes. - pub limited_connect: Option>, + pub peers: Vec, + /// Connect to these nodes to retrieve peer addresses, and disconnect. + pub seeds: Vec, } diff --git a/p2p/src/io/handshake.rs b/p2p/src/io/handshake.rs index dfba578a..8872263c 100644 --- a/p2p/src/io/handshake.rs +++ b/p2p/src/io/handshake.rs @@ -1,6 +1,6 @@ use std::{io, cmp}; use futures::{Future, Poll, Async}; -use message::{Message, Error}; +use message::{Message, MessageResult}; use message::types::{Version, Verack}; use message::common::Magic; use io::{write_message, WriteMessage, ReadMessage, read_message}; @@ -81,7 +81,7 @@ pub struct AcceptHandshake { } impl Future for Handshake where A: io::Read + io::Write { - type Item = (A, Result); + type Item = (A, MessageResult); type Error = io::Error; fn poll(&mut self) -> Poll { @@ -128,7 +128,7 @@ impl Future for Handshake where A: io::Read + io::Write { } impl Future for AcceptHandshake where A: io::Read + io::Write { - type Item = (A, Result); + type Item = (A, MessageResult); type Error = io::Error; fn poll(&mut self) -> Poll { diff --git a/p2p/src/io/mod.rs b/p2p/src/io/mod.rs index 5bffdd4c..6c7ffd62 100644 --- a/p2p/src/io/mod.rs +++ b/p2p/src/io/mod.rs @@ -1,7 +1,7 @@ mod handshake; mod read_header; mod read_message; -mod read_message_stream; +mod read_any_message; mod read_payload; mod sharedtcpstream; mod write_message; @@ -12,6 +12,6 @@ pub use self::handshake::{ pub use self::read_header::{read_header, ReadHeader}; pub use self::read_payload::{read_payload, ReadPayload}; pub use self::read_message::{read_message, ReadMessage}; -pub use self::read_message_stream::{read_message_stream, ReadMessageStream}; +pub use self::read_any_message::{read_any_message, ReadAnyMessage}; pub use self::sharedtcpstream::SharedTcpStream; pub use self::write_message::{write_message, WriteMessage}; diff --git a/p2p/src/io/read_any_message.rs b/p2p/src/io/read_any_message.rs new file mode 100644 index 00000000..06470cdf --- /dev/null +++ b/p2p/src/io/read_any_message.rs @@ -0,0 +1,65 @@ +use std::io; +use futures::{Future, Poll, Async}; +use tokio_core::io::{read_exact, ReadExact}; +use crypto::checksum; +use message::{Error, MessageHeader, MessageResult, Magic, Command}; +use bytes::Bytes; +use io::{read_header, ReadHeader}; + +pub fn read_any_message(a: A, magic: Magic) -> ReadAnyMessage where A: io::Read { + ReadAnyMessage { + state: ReadAnyMessageState::ReadHeader(read_header(a, magic)), + } +} + +pub enum ReadAnyMessageState { + ReadHeader(ReadHeader), + ReadPayload { + header: MessageHeader, + future: ReadExact + }, + Finished, +} + +pub struct ReadAnyMessage { + state: ReadAnyMessageState, +} + +impl Future for ReadAnyMessage where A: io::Read { + type Item = MessageResult<(Command, Bytes)>; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + let (next, result) = match self.state { + ReadAnyMessageState::ReadHeader(ref mut header) => { + let (stream, header) = try_ready!(header.poll()); + let header = match header { + Ok(header) => header, + Err(err) => return Ok(Err(err).into()), + }; + let future = read_exact(stream, Bytes::new_with_len(header.len as usize)); + let next = ReadAnyMessageState::ReadPayload { + header: header, + future: future, + }; + (next, Async::NotReady) + }, + ReadAnyMessageState::ReadPayload { ref mut header, ref mut future } => { + let (_stream, bytes) = try_ready!(future.poll()); + if checksum(&bytes) != header.checksum { + return Ok(Err(Error::InvalidChecksum).into()); + } + let next = ReadAnyMessageState::Finished; + (next, Ok((header.command.clone(), bytes)).into()) + }, + ReadAnyMessageState::Finished => panic!("poll ReadAnyMessage after it's done"), + }; + + self.state = next; + match result { + // by polling again, we register new future + Async::NotReady => self.poll(), + result => Ok(result) + } + } +} diff --git a/p2p/src/io/read_message_stream.rs b/p2p/src/io/read_message_stream.rs deleted file mode 100644 index d8281d01..00000000 --- a/p2p/src/io/read_message_stream.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::io; -use futures::{Future, Poll, Async}; -use futures::stream::Stream; -use tokio_core::io::{read_exact, ReadExact}; -use crypto::checksum; -use message::{Error, MessageHeader, MessageResult, Magic, Command}; -use bytes::Bytes; -use io::{read_header, ReadHeader}; - -pub fn read_message_stream(a: A, magic: Magic) -> ReadMessageStream where A: io::Read { - ReadMessageStream { - state: ReadMessageStreamState::ReadHeader(read_header(a, magic)), - magic: magic, - } -} - -pub enum ReadMessageStreamState { - ReadHeader(ReadHeader), - ReadPayload { - header: MessageHeader, - future: ReadExact - }, -} - -pub struct ReadMessageStream { - state: ReadMessageStreamState, - magic: Magic, -} - -impl Stream for ReadMessageStream where A: io::Read { - type Item = MessageResult<(Command, Bytes)>; - type Error = io::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let (next, result) = match self.state { - ReadMessageStreamState::ReadHeader(ref mut header) => { - let (stream, header) = try_ready!(header.poll()); - let header = match header { - Ok(header) => header, - Err(err) => return Ok(Some(Err(err)).into()), - }; - let future = read_exact(stream, Bytes::new_with_len(header.len as usize)); - let next = ReadMessageStreamState::ReadPayload { - header: header, - future: future, - }; - (next, Async::NotReady) - }, - ReadMessageStreamState::ReadPayload { ref mut header, ref mut future } => { - let (stream, bytes) = try_ready!(future.poll()); - if checksum(&bytes) != header.checksum { - return Ok(Some(Err(Error::InvalidChecksum)).into()); - } - let future = read_header(stream, self.magic); - let next = ReadMessageStreamState::ReadHeader(future); - (next, Some(Ok((header.command.clone(), bytes))).into()) - }, - }; - - self.state = next; - Ok(result) - } -} diff --git a/p2p/src/io/sharedtcpstream.rs b/p2p/src/io/sharedtcpstream.rs index 752e5379..2e797d3e 100644 --- a/p2p/src/io/sharedtcpstream.rs +++ b/p2p/src/io/sharedtcpstream.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::net::Shutdown; use std::io::{Read, Write, Error}; use tokio_core::net::TcpStream; @@ -12,6 +13,11 @@ impl SharedTcpStream { io: a, } } + + pub fn shutdown(&self) { + // error is irrelevant here, the connection is dropped anyway + let _ = self.io.shutdown(Shutdown::Both); + } } impl From for SharedTcpStream { diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index d4ec4351..d1cf5f5e 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -5,14 +5,17 @@ extern crate rand; extern crate time; extern crate tokio_core; extern crate parking_lot; +#[macro_use] +extern crate log; extern crate bitcrypto as crypto; extern crate message; extern crate primitives; -extern crate serialization as ser; pub mod io; pub mod net; +pub mod protocol; +pub mod session; pub mod util; mod config; mod event_loop; @@ -26,6 +29,5 @@ pub use primitives::{hash, bytes}; pub use config::Config; pub use event_loop::{event_loop, forever}; pub use p2p::P2P; - -pub type PeerId = usize; +pub use util::{PeerId, PeerInfo}; diff --git a/p2p/src/net/channel.rs b/p2p/src/net/channel.rs index b5d6b620..29ad9800 100644 --- a/p2p/src/net/channel.rs +++ b/p2p/src/net/channel.rs @@ -1,35 +1,54 @@ -use std::io; -use futures::Poll; -use futures::stream::Stream; -use parking_lot::Mutex; -use bytes::Bytes; -use message::{MessageResult, Payload, Command}; +use message::{Payload, Magic, Message}; use net::Connection; -use io::{read_message_stream, ReadMessageStream, SharedTcpStream, WriteMessage}; +use session::Session; +use io::{SharedTcpStream, WriteMessage, write_message, read_any_message, ReadAnyMessage}; +use {PeerId, PeerInfo}; pub struct Channel { - connection: Connection, - message_stream: Mutex>, + version: u32, + magic: Magic, + peer_info: PeerInfo, + session: Session, + stream: SharedTcpStream, } impl Channel { - pub fn new(connection: Connection) -> Self { - let stream = read_message_stream(connection.stream.clone(), connection.magic); + pub fn new(connection: Connection, peer_id: PeerId, session: Session) -> Self { Channel { - connection: connection, - message_stream: Mutex::new(stream), + version: connection.version, + magic: connection.magic, + peer_info: PeerInfo { + address: connection.address, + id: peer_id, + }, + session: session, + stream: connection.stream, } } pub fn write_message(&self, payload: &T) -> WriteMessage where T: Payload { - self.connection.write_message(payload) + // TODO: some tracing here + let message = Message::new(self.magic, self.version, payload).expect("failed to create outgoing message"); + write_message(self.stream.clone(), message) } - pub fn poll_message(&self) -> Poll)>, io::Error> { - self.message_stream.lock().poll() + pub fn read_message(&self) -> ReadAnyMessage { + read_any_message(self.stream.clone(), self.magic) + } + + pub fn shutdown(&self) { + self.stream.shutdown(); } pub fn version(&self) -> u32 { - self.connection.version + self.version + } + + pub fn peer_info(&self) -> PeerInfo { + self.peer_info + } + + pub fn session(&self) -> &Session { + &self.session } } diff --git a/p2p/src/net/config.rs b/p2p/src/net/config.rs index 6bef7b66..5f620abb 100644 --- a/p2p/src/net/config.rs +++ b/p2p/src/net/config.rs @@ -1,5 +1,5 @@ use std::net::SocketAddr; -use message::common::{Magic, ServiceFlags, NetAddress}; +use message::common::{Magic, Services, NetAddress}; use message::types::version::{Version, V0, V106, V70001}; use util::time::{Time, RealTime}; use util::nonce::{NonceGenerator, RandomNonce}; @@ -9,7 +9,7 @@ use VERSION; pub struct Config { pub magic: Magic, pub local_address: SocketAddr, - pub services: ServiceFlags, + pub services: Services, pub user_agent: String, pub start_height: i32, pub relay: bool, diff --git a/p2p/src/net/connect.rs b/p2p/src/net/connect.rs index 5c850910..3667fd92 100644 --- a/p2p/src/net/connect.rs +++ b/p2p/src/net/connect.rs @@ -57,6 +57,7 @@ impl Future for Connect { stream: stream.into(), version: result.negotiated_version, magic: self.magic, + services: result.version.services(), address: self.address, }; (ConnectState::Connected, Async::Ready(Ok(connection))) diff --git a/p2p/src/net/connection.rs b/p2p/src/net/connection.rs index ac20b616..603d58d6 100644 --- a/p2p/src/net/connection.rs +++ b/p2p/src/net/connection.rs @@ -1,23 +1,12 @@ use std::net; -use message::{Message, Payload, Magic}; -use io::{write_message, WriteMessage, SharedTcpStream}; +use message::Magic; +use message::common::Services; +use io::SharedTcpStream; pub struct Connection { pub stream: SharedTcpStream, pub version: u32, pub magic: Magic, + pub services: Services, pub address: net::SocketAddr, } - -impl Connection { - pub fn write_message(&self, payload: &T) -> WriteMessage where T: Payload { - let message = match Message::new(self.magic, self.version, payload) { - Ok(message) => message, - Err(_err) => { - // trace here! outgoing messages should always be written properly - panic!(); - } - }; - write_message(self.stream.clone(), message) - } -} diff --git a/p2p/src/net/connections.rs b/p2p/src/net/connections.rs index 0390844c..86c02cd6 100644 --- a/p2p/src/net/connections.rs +++ b/p2p/src/net/connections.rs @@ -2,46 +2,21 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::collections::HashMap; use parking_lot::RwLock; -use futures::{finished, Future}; -use futures_cpupool::CpuPool; -use tokio_core::reactor::Handle; -use message::Payload; use net::{Connection, Channel}; +use session::Session; use PeerId; +#[derive(Default)] pub struct Connections { + /// Incremental peer counter. peer_counter: AtomicUsize, + /// All open connections. channels: RwLock>>, } impl Connections { pub fn new() -> Self { - Connections { - peer_counter: AtomicUsize::default(), - channels: RwLock::default(), - } - } - - /// Broadcast messages to the network. - /// Returned future completes of first confirmed receive. - pub fn broadcast(connections: &Arc, handle: &Handle, pool: &CpuPool, payload: T) where T: Payload { - let channels = connections.channels(); - for (id, channel) in channels.into_iter() { - let write = channel.write_message(&payload); - let cs = connections.clone(); - let pool_work = pool.spawn(write).then(move |x| { - match x { - Ok(_) => { - // successfully sent message - }, - Err(_) => { - cs.remove(id); - } - } - finished(()) - }); - handle.spawn(pool_work); - } + Connections::default() } /// Returns safe (nonblocking) copy of channels. @@ -55,13 +30,16 @@ impl Connections { } /// Stores new channel. - pub fn store(&self, connection: Connection) { + /// Returnes a shared pointer to it. + pub fn store(&self, connection: Connection, session: Session) -> Arc { let id = self.peer_counter.fetch_add(1, Ordering::AcqRel); - self.channels.write().insert(id, Arc::new(Channel::new(connection))); + let channel = Arc::new(Channel::new(connection, id, session)); + self.channels.write().insert(id, channel.clone()); + channel } /// Removes channel with given id. - pub fn remove(&self, id: PeerId) { - self.channels.write().remove(&id); + pub fn remove(&self, id: PeerId) -> Option> { + self.channels.write().remove(&id) } } diff --git a/p2p/src/net/listen.rs b/p2p/src/net/listen.rs index d5a89ef5..dc4fdc9c 100644 --- a/p2p/src/net/listen.rs +++ b/p2p/src/net/listen.rs @@ -4,8 +4,7 @@ use futures::stream::Stream; use tokio_core::reactor::Handle; use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::io::IoStream; -use message::Error; -use message::common::Magic; +use message::{MessageResult, Magic}; use io::{accept_handshake, AcceptHandshake}; use net::{Config, Connection}; @@ -21,7 +20,7 @@ pub fn listen(handle: &Handle, config: Config) -> Result { pub struct Listen { - inner: IoStream>, + inner: IoStream>, } fn accept_connection(stream: TcpStream, config: &Config, address: net::SocketAddr) -> AcceptConnection { @@ -39,7 +38,7 @@ struct AcceptConnection { } impl Future for AcceptConnection { - type Item = Result; + type Item = MessageResult; type Error = io::Error; fn poll(&mut self) -> Poll { @@ -52,6 +51,7 @@ impl Future for AcceptConnection { stream: stream.into(), version: result.negotiated_version, magic: self.magic, + services: result.version.services(), address: self.address, }; Ok(Ok(connection).into()) @@ -59,7 +59,7 @@ impl Future for AcceptConnection { } impl Stream for Listen { - type Item = Result; + type Item = MessageResult; type Error = io::Error; fn poll(&mut self) -> Poll, Self::Error> { diff --git a/p2p/src/net/messages.rs b/p2p/src/net/messages.rs deleted file mode 100644 index 1c654f27..00000000 --- a/p2p/src/net/messages.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::io; -use std::sync::Weak; -use bytes::Bytes; -use futures::{Poll, Async}; -use futures::stream::Stream; -use message::common::Command; -use net::Connections; -use PeerId; - -pub struct MessagesHandler { - last_polled: usize, - connections: Weak, -} - -fn next_to_poll(channels: usize, last_polled: usize) -> usize { - // it's irrelevant if we sometimes poll the same peer - if channels > last_polled + 1 { - // let's poll the next peer - last_polled + 1 - } else { - // let's move to the first channel - 0 - } -} - -impl MessagesHandler { - pub fn new(connections: Weak) -> Self { - MessagesHandler { - last_polled: usize::max_value(), - connections: connections, - } - } -} - -impl Stream for MessagesHandler { - type Item = (Command, Bytes, u32, PeerId); - type Error = io::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let connections = match self.connections.upgrade() { - Some(c) => c, - // application is about to shutdown - None => return Ok(None.into()) - }; - let channels = connections.channels(); - if channels.len() == 0 { - // let's wait for some connections - return Ok(Async::NotReady); - } - - let mut to_poll = next_to_poll(channels.len(), self.last_polled); - let mut result = None; - - while result.is_none() && to_poll != self.last_polled { - let (id, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()"); - let status = channel.poll_message(); - - match status { - Ok(Async::Ready(Some(Ok((command, message))))) => { - result = Some((command, message, channel.version(), *id)); - }, - Ok(Async::NotReady) => { - // no messages yet, try next channel - to_poll = next_to_poll(channels.len(), to_poll); - }, - _ => { - // channel has been closed or there was error - connections.remove(*id); - to_poll = next_to_poll(channels.len(), to_poll); - }, - } - } - - self.last_polled = to_poll; - match result.is_some() { - true => Ok(Async::Ready(result)), - false => Ok(Async::NotReady), - } - } -} - - diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs index beb49bff..da1b4673 100644 --- a/p2p/src/net/mod.rs +++ b/p2p/src/net/mod.rs @@ -3,15 +3,11 @@ mod config; mod connect; mod connection; mod connections; -mod messages; mod listen; -mod subscriber; pub use self::channel::Channel; pub use self::config::Config; pub use self::connect::{Connect, connect}; pub use self::connection::Connection; pub use self::connections::Connections; -pub use self::messages::MessagesHandler; pub use self::listen::{Listen, listen}; -pub use self::subscriber::Subscriber; diff --git a/p2p/src/net/subscriber.rs b/p2p/src/net/subscriber.rs deleted file mode 100644 index 7d234320..00000000 --- a/p2p/src/net/subscriber.rs +++ /dev/null @@ -1,78 +0,0 @@ -use std::sync::mpsc::{Sender, Receiver, channel}; -use std::mem; -use parking_lot::Mutex; -use message::{Error, Payload, Command, deserialize_payload}; -use message::types::{Addr, GetAddr}; -use PeerId; - -struct Handler { - sender: Mutex>>, -} - -impl Default for Handler { - fn default() -> Self { - Handler { - sender: Mutex::default(), - } - } -} - -impl Handler where S: Payload { - fn command(&self) -> Command { - S::command().into() - } - - fn handle(&self, payload: &[u8], version: u32, peerid: PeerId) -> Result<(), Error> { - let payload: S = try!(deserialize_payload(payload, version)); - if let Some(sender) = self.sender() { - if let Err(_err) = sender.send((payload, peerid)) { - // TODO: unsubscribe channel? - // TODO: trace - } - } - Ok(()) - } - - fn sender(&self) -> Option> { - self.sender.lock().clone() - } - - fn store(&self, sender: Sender<(S, PeerId)>) { - mem::replace(&mut *self.sender.lock(), Some(sender)); - } -} - -#[derive(Default)] -pub struct Subscriber { - addr: Handler, - getaddr: Handler, -} - -macro_rules! define_subscribe { - ($name: ident, $result: ident, $sub: ident) => { - pub fn $name(&self) -> Receiver<($result, PeerId)> { - let (sender, receiver) = channel(); - self.$sub.store(sender); - receiver - } - } -} - -macro_rules! maybe_handle { - ($command: expr, $sub: expr, $payload: expr, $version: expr, $peerid: expr) => { - if $command == $sub.command() { - return $sub.handle($payload, $version, $peerid); - } - } -} - -impl Subscriber { - define_subscribe!(subscribe_addr, Addr, addr); - define_subscribe!(subscribe_getaddr, GetAddr, getaddr); - - pub fn try_handle(&self, payload: &[u8], version: u32, command: Command, peerid: PeerId) -> Result<(), Error> { - maybe_handle!(command, self.addr, payload, version, peerid); - maybe_handle!(command, self.getaddr, payload, version, peerid); - Err(Error::InvalidCommand) - } -} diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index 26966950..b529a961 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -1,12 +1,151 @@ use std::{io, net}; use std::sync::Arc; -use futures::{Future, finished}; +use parking_lot::RwLock; +use futures::{Future, finished, failed, BoxFuture}; use futures::stream::Stream; use futures_cpupool::CpuPool; +use tokio_core::io::IoFuture; use tokio_core::reactor::Handle; use message::Payload; -use net::{connect, listen, Connections, Subscriber, MessagesHandler}; -use Config; +use session::Session; +use io::{ReadAnyMessage, SharedTcpStream}; +use net::{connect, listen, Connections, Channel, Config as NetConfig}; +use util::NodeTable; +use {Config, PeerInfo}; + +pub type BoxedMessageFuture = BoxFuture< as Future>::Item, as Future>::Error>; +pub type BoxedEmptyFuture = BoxFuture<(), ()>; + +/// Network context. +#[derive(Default)] +pub struct Context { + /// Connections. + connections: Connections, + /// Node Table. + node_table: RwLock, +} + +impl Context { + pub fn connect(context: Arc, socket: net::SocketAddr, handle: &Handle, config: &NetConfig) -> BoxedEmptyFuture { + trace!("Trying to connect to: {}", socket); + let connection = connect(&socket, handle, config); + connection.then(move |result| { + match result { + Ok(Ok(connection)) => { + // successfull hanshake + trace!("Connected to {}", connection.address); + context.node_table.write().insert(connection.address, connection.services); + let session = Session::new(); + let channel = context.connections.store(connection, session); + + // initialize session and then start reading messages + channel.session().initialize(context.clone(), channel.clone()) + .and_then(move |_| Context::on_message(context, channel)) + .boxed() + }, + Ok(Err(err)) => { + // protocol error + trace!("Handshake with {} failed", socket); + // TODO: close socket + finished(Err(err)).boxed() + }, + Err(err) => { + // network error + trace!("Unable to connect to {}", socket); + failed(err).boxed() + } + } + }) + .then(|_| finished(())) + .boxed() + } + + pub fn listen(context: Arc, handle: &Handle, config: NetConfig) -> Result { + trace!("Starting tcp server"); + let listen = try!(listen(&handle, config)); + let server = listen.then(move |result| { + match result { + Ok(Ok(connection)) => { + // successfull hanshake + trace!("Accepted connection from {}", connection.address); + context.node_table.write().insert(connection.address, connection.services); + let session = Session::new(); + let channel = context.connections.store(connection, session); + + // initialize session and then start reading messages + let cloned_context = context.clone(); + channel.session().initialize(context.clone(), channel.clone()) + .and_then(|_| Context::on_message(cloned_context, channel)) + .boxed() + }, + Ok(Err(err)) => { + // protocol error + // TODO: close socket + finished(Err(err)).boxed() + }, + Err(err) => { + // network error + failed(err).boxed() + } + } + }) + .for_each(|_| Ok(())) + .then(|_| finished(())) + .boxed(); + Ok(server) + } + + pub fn on_message(context: Arc, channel: Arc) -> BoxedMessageFuture { + channel.read_message().then(move |result| { + match result { + Ok(Ok((command, payload))) => { + // successful read + trace!("Received {} message from {}", command, channel.peer_info().address); + // handle message and read the next one + channel.session().on_message(context.clone(), channel.clone(), command, payload) + .and_then(move |_| Context::on_message(context, channel)) + .boxed() + }, + Ok(Err(err)) => { + // protocol error + context.close_connection(channel.peer_info()); + finished(Err(err)).boxed() + }, + Err(err) => { + // network error + context.close_connection(channel.peer_info()); + failed(err).boxed() + } + } + }).boxed() + } + + pub fn send(context: Arc, channel: Arc, payload: &T) -> IoFuture<()> where T: Payload { + trace!("Sending {} message to {}", T::command(), channel.peer_info().address); + channel.write_message(payload).then(move |result| { + match result { + Ok(_) => { + // successful send + trace!("Sent {} message to {}", T::command(), channel.peer_info().address); + finished(()).boxed() + }, + Err(err) => { + // network error + context.close_connection(channel.peer_info()); + failed(err).boxed() + }, + } + }).boxed() + } + + pub fn close_connection(&self, peer_info: PeerInfo) { + if let Some(channel) = self.connections.remove(peer_info.id) { + trace!("Disconnecting from {}", peer_info.address); + channel.shutdown(); + self.node_table.write().note_failure(&peer_info.address); + } + } +} pub struct P2P { /// Global event loop handle. @@ -15,82 +154,78 @@ pub struct P2P { pool: CpuPool, /// P2P config. config: Config, - /// Connections. - connections: Arc, - /// Message subscriber. - subscriber: Arc, + /// Network context. + context: Arc, } impl P2P { pub fn new(config: Config, handle: Handle) -> Self { - let pool = CpuPool::new(4); + let pool = CpuPool::new(config.threads); P2P { event_loop_handle: handle.clone(), pool: pool.clone(), config: config, - connections: Arc::new(Connections::new()), - subscriber: Arc::new(Subscriber::default()), + context: Arc::default(), } } pub fn run(&self) -> Result<(), io::Error> { - for seednode in self.config.seednodes.iter() { - self.connect(*seednode) + for peer in self.config.peers.iter() { + self.connect(*peer) } try!(self.listen()); - self.handle_messages(); Ok(()) } pub fn connect(&self, ip: net::IpAddr) { let socket = net::SocketAddr::new(ip, self.config.connection.magic.port()); - let connections = self.connections.clone(); - let connection = connect(&socket, &self.event_loop_handle, &self.config.connection); - let pool_work = self.pool.spawn(connection).then(move |x| { - if let Ok(Ok(con)) = x { - connections.store(con); - } - finished(()) - }); + let connection = Context::connect(self.context.clone(), socket, &self.event_loop_handle, &self.config.connection); + let pool_work = self.pool.spawn(connection); self.event_loop_handle.spawn(pool_work); } fn listen(&self) -> Result<(), io::Error> { - let listen = try!(listen(&self.event_loop_handle, self.config.connection.clone())); - let connections = self.connections.clone(); - let server = listen.for_each(move |x| { - if let Ok(con) = x { - connections.store(con); - } - Ok(()) - }).then(|_| { - finished(()) - }); + let server = try!(Context::listen(self.context.clone(), &self.event_loop_handle, self.config.connection.clone())); let pool_work = self.pool.spawn(server); self.event_loop_handle.spawn(pool_work); Ok(()) } - fn handle_messages(&self) { - let incoming = MessagesHandler::new(Arc::downgrade(&self.connections)); - let subscriber = self.subscriber.clone(); - let connections = self.connections.clone(); - let incoming_future = incoming.for_each(move |result| { - let (command, payload, version, peerid) = result; - if let Err(_err) = subscriber.try_handle(&payload, version, command, peerid) { - connections.remove(peerid); - } - Ok(()) - }).then(|_| { - finished(()) - }); - let pool_work = self.pool.spawn(incoming_future); - self.event_loop_handle.spawn(pool_work); + /* + pub fn broadcast(&self, payload: T) where T: Payload { + let channels = self.connections.channels(); + for (_id, channel) in channels.into_iter() { + self.send_to_channel(&payload, &channel); + } } - pub fn broadcast(&self, payload: T) where T: Payload { - Connections::broadcast(&self.connections, &self.event_loop_handle, &self.pool, payload) + pub fn send(&self, payload: T, peer: PeerId) where T: Payload { + let channels = self.connections.channels(); + if let Some(channel) = channels.get(&peer) { + self.send_to_channel(&payload, channel); + } } + + fn send_to_channel(&self, payload: &T, channel: &Arc) where T: Payload { + let connections = self.connections.clone(); + let node_table = self.node_table.clone(); + let peer_info = channel.peer_info(); + let write = channel.write_message(payload); + let pool_work = self.pool.spawn(write).then(move |result| { + match result { + Ok(_) => { + node_table.write().note_used(&peer_info.address); + }, + Err(_err) => { + node_table.write().note_failure(&peer_info.address); + connections.remove(peer_info.id); + } + } + finished(()) + }); + self.event_loop_handle.spawn(pool_work); + } + */ } diff --git a/p2p/src/protocol/mod.rs b/p2p/src/protocol/mod.rs new file mode 100644 index 00000000..2e510123 --- /dev/null +++ b/p2p/src/protocol/mod.rs @@ -0,0 +1,28 @@ +mod ping; + +use bytes::Bytes; +use message::Error; +use message::common::Command; + +pub use self::ping::PingProtocol; + +pub enum Direction { + Inbound, + Outbound, +} + +pub enum ProtocolAction { + Reply(Bytes), + None, + Disconnect, +} + +pub trait Protocol: Send { + /// Initialize the protocol. + fn initialize(&mut self, _direction: Direction, _version: u32) -> Result { + Ok(ProtocolAction::None) + } + + /// Handle the message. + fn on_message(&self, command: &Command, payload: &Bytes, version: u32) -> Result; +} diff --git a/p2p/src/protocol/ping.rs b/p2p/src/protocol/ping.rs new file mode 100644 index 00000000..fd907f60 --- /dev/null +++ b/p2p/src/protocol/ping.rs @@ -0,0 +1,56 @@ +use bytes::Bytes; +use message::{Error, Payload, deserialize_payload, serialize_payload}; +use message::types::{Ping, Pong}; +use message::common::Command; +use protocol::{Protocol, ProtocolAction, Direction}; +use util::nonce::{NonceGenerator, RandomNonce}; + +pub struct PingProtocol { + /// Nonce generator + nonce_generator: T, + /// Last nonce sent in a ping message. + last_ping_nonce: u64, +} + +impl PingProtocol { + pub fn new() -> Self { + PingProtocol { + nonce_generator: RandomNonce::default(), + last_ping_nonce: 0, + } + } +} + +impl Protocol for PingProtocol where T: NonceGenerator + Send { + fn initialize(&mut self, direction: Direction, version: u32) -> Result { + match direction { + Direction::Outbound => Ok(ProtocolAction::None), + Direction::Inbound => { + let nonce = self.nonce_generator.get(); + self.last_ping_nonce = nonce; + let ping = Ping::new(nonce); + let serialized = try!(serialize_payload(&ping, version)); + Ok(ProtocolAction::Reply(serialized)) + }, + } + } + + fn on_message(&self, command: &Command, payload: &Bytes, version: u32) -> Result { + if command == &Ping::command().into() { + let ping: Ping = try!(deserialize_payload(payload, version)); + let pong = Pong::new(ping.nonce); + let serialized = try!(serialize_payload(&pong, version)); + Ok(ProtocolAction::Reply(serialized)) + } else if command == &Pong::command().into() { + let pong: Pong = try!(deserialize_payload(payload, version)); + if pong.nonce != self.last_ping_nonce { + Err(Error::InvalidCommand) + } else { + Ok(ProtocolAction::None) + } + } else { + Ok(ProtocolAction::None) + } + } +} + diff --git a/p2p/src/session.rs b/p2p/src/session.rs new file mode 100644 index 00000000..122d565c --- /dev/null +++ b/p2p/src/session.rs @@ -0,0 +1,39 @@ +use std::sync::Arc; +use parking_lot::Mutex; +use tokio_core::io::IoFuture; +use bytes::Bytes; +use message::Command; +use p2p::Context; +use net::Channel; +use protocol::{Protocol, PingProtocol}; + +pub struct Session { + protocols: Vec>>>, +} + +impl Session { + pub fn new() -> Self { + let ping = PingProtocol::new(); + Session::new_with_protocols(vec![Box::new(ping)]) + } + + pub fn new_seednode() -> Self { + let ping = PingProtocol::new(); + Session::new_with_protocols(vec![Box::new(ping)]) + } + + pub fn new_with_protocols(protocols: Vec>) -> Self { + Session { + protocols: protocols.into_iter().map(Mutex::new).map(Arc::new).collect(), + } + } + + pub fn initialize(&self, _context: Arc, _channel: Arc) -> IoFuture<()> { + unimplemented!(); + } + + pub fn on_message(&self, _context: Arc, _channel: Arc, _command: Command, _payload: Bytes) -> IoFuture<()> { + unimplemented!(); + } +} + diff --git a/p2p/src/util/mod.rs b/p2p/src/util/mod.rs index 1154bb2d..5366572a 100644 --- a/p2p/src/util/mod.rs +++ b/p2p/src/util/mod.rs @@ -1,2 +1,7 @@ pub mod nonce; pub mod time; +mod node_table; +mod peer; + +pub use self::node_table::{NodeTable, Node}; +pub use self::peer::{PeerId, PeerInfo}; diff --git a/p2p/src/util/node_table.rs b/p2p/src/util/node_table.rs new file mode 100644 index 00000000..5cf296a2 --- /dev/null +++ b/p2p/src/util/node_table.rs @@ -0,0 +1,161 @@ +use std::collections::{HashMap, BTreeSet}; +use std::net::SocketAddr; +use std::cmp::{PartialOrd, Ord, Ordering}; +use message::common::Services; +use util::time::{Time, RealTime}; + +#[derive(PartialEq, Eq, Clone)] +pub struct Node { + /// Node address. + addr: SocketAddr, + /// Timestamp of last interaction with a node. + time: i64, + /// Services supported by the node. + services: Services, + /// Node failures counter. + failures: u32, +} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option { + if self.failures == other.failures { + self.time.partial_cmp(&other.time) + } else { + other.failures.partial_cmp(&self.failures) + } + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> Ordering { + if self.failures == other.failures { + self.time.cmp(&other.time) + } else { + other.failures.cmp(&self.failures) + } + } +} + +#[derive(Default)] +pub struct NodeTable where T: Time { + /// Time source. + time: T, + /// Nodes by socket address. + by_addr: HashMap, + /// Nodes sorted by score. + by_score: BTreeSet, +} + +impl NodeTable where T: Time { + /// Inserts new address and services pair into NodeTable. + pub fn insert(&mut self, addr: SocketAddr, services: Services) { + let failures = self.by_addr.get(&addr).map_or(0, |ref node| node.failures); + + let node = Node { + addr: addr, + time: self.time.get().sec, + services: services, + failures: failures, + }; + + self.by_addr.insert(addr, node.clone()); + self.by_score.insert(node); + } + + /// Returnes most reliable nodes with desired services. + pub fn nodes_with_services(&self, services: &Services, limit: usize) -> Vec { + self.by_score.iter() + .rev() + .filter(|s| s.services.includes(services)) + .map(Clone::clone) + .take(limit) + .collect() + } + + /// Marks address as recently used. + pub fn note_used(&mut self, addr: &SocketAddr) { + if let Some(ref mut node) = self.by_addr.get_mut(addr) { + assert!(self.by_score.remove(node)); + node.time = self.time.get().sec; + self.by_score.insert(node.clone()); + } + } + + /// Notes failure. + pub fn note_failure(&mut self, addr: &SocketAddr) { + if let Some(ref mut node) = self.by_addr.get_mut(addr) { + assert!(self.by_score.remove(node)); + node.failures += 1; + self.by_score.insert(node.clone()); + } + } +} + +#[cfg(test)] +mod tests { + use std::net::SocketAddr; + use message::common::Services; + use util::time::IncrementalTime; + use super::NodeTable; + + #[test] + fn test_node_table_insert() { + let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap(); + let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap(); + let mut table = NodeTable::::default(); + table.insert(s0, Services::default()); + table.insert(s1, Services::default()); + table.insert(s2, Services::default()); + let nodes = table.nodes_with_services(&Services::default(), 2); + assert_eq!(nodes.len(), 2); + assert_eq!(nodes[0].addr, s2); + assert_eq!(nodes[0].time, 2); + assert_eq!(nodes[0].failures, 0); + assert_eq!(nodes[1].addr, s1); + assert_eq!(nodes[1].time, 1); + assert_eq!(nodes[1].failures, 0); + } + + #[test] + fn test_node_table_note() { + let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap(); + let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap(); + let s3: SocketAddr = "127.0.0.1:8003".parse().unwrap(); + let s4: SocketAddr = "127.0.0.1:8004".parse().unwrap(); + let mut table = NodeTable::::default(); + table.insert(s0, Services::default()); + table.insert(s1, Services::default()); + table.insert(s2, Services::default()); + table.insert(s3, Services::default()); + table.insert(s4, Services::default()); + table.note_used(&s2); + table.note_used(&s4); + table.note_used(&s1); + table.note_failure(&s2); + table.note_failure(&s3); + let nodes = table.nodes_with_services(&Services::default(), 10); + assert_eq!(nodes.len(), 5); + + assert_eq!(nodes[0].addr, s1); + assert_eq!(nodes[0].time, 7); + assert_eq!(nodes[0].failures, 0); + + assert_eq!(nodes[1].addr, s4); + assert_eq!(nodes[1].time, 6); + assert_eq!(nodes[1].failures, 0); + + assert_eq!(nodes[2].addr, s0); + assert_eq!(nodes[2].time, 0); + assert_eq!(nodes[2].failures, 0); + + assert_eq!(nodes[3].addr, s2); + assert_eq!(nodes[3].time, 5); + assert_eq!(nodes[3].failures, 1); + + assert_eq!(nodes[4].addr, s3); + assert_eq!(nodes[4].time, 3); + assert_eq!(nodes[4].failures, 1); + } +} diff --git a/p2p/src/util/peer.rs b/p2p/src/util/peer.rs new file mode 100644 index 00000000..1d0dbc6e --- /dev/null +++ b/p2p/src/util/peer.rs @@ -0,0 +1,10 @@ +use std::net::SocketAddr; + +pub type PeerId = usize; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub struct PeerInfo { + pub id: PeerId, + pub address: SocketAddr, +} + diff --git a/p2p/src/util/time.rs b/p2p/src/util/time.rs index cca37f45..58b6c8af 100644 --- a/p2p/src/util/time.rs +++ b/p2p/src/util/time.rs @@ -1,3 +1,4 @@ +use std::cell::Cell; use time; pub trait Time { @@ -26,3 +27,17 @@ impl Time for StaticTime { self.0 } } + +#[derive(Default)] +pub struct IncrementalTime { + counter: Cell, +} + +impl Time for IncrementalTime { + fn get(&self) -> time::Timespec { + let c = self.counter.get(); + let result = time::Timespec::new(c, 0); + self.counter.set(c + 1); + result + } +} diff --git a/pbtc/main.rs b/pbtc/main.rs index c060e6a9..19ccdba8 100644 --- a/pbtc/main.rs +++ b/pbtc/main.rs @@ -2,6 +2,7 @@ #[macro_use] extern crate clap; +extern crate env_logger; extern crate keys; extern crate script; @@ -14,6 +15,7 @@ use std::net::SocketAddr; use p2p::{P2P, event_loop, forever, net}; fn main() { + env_logger::init().unwrap(); match run() { Err(err) => println!("{}", err), Ok(_) => (), @@ -28,6 +30,11 @@ fn run() -> Result<(), String> { let mut el = event_loop(); let p2p_cfg = p2p::Config { + threads: 4, + protocol_minimum: 70001, + protocol_maximum: 70017, + inbound_connections: 10, + outbound_connections: 10, connection: net::Config { magic: cfg.magic, local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port), @@ -36,8 +43,8 @@ fn run() -> Result<(), String> { start_height: 0, relay: false, }, - seednodes: cfg.seednode.map_or_else(|| vec![], |x| vec![x]), - limited_connect: cfg.connect.map_or(None, |x| Some(vec![x])), + peers: cfg.connect.map_or_else(|| vec![], |x| vec![x]), + seeds: cfg.seednode.map_or_else(|| vec![], |x| vec![x]), }; let p2p = P2P::new(p2p_cfg, el.handle()); diff --git a/primitives/Cargo.toml b/primitives/Cargo.toml index df32ffe9..0a23c670 100644 --- a/primitives/Cargo.toml +++ b/primitives/Cargo.toml @@ -4,4 +4,5 @@ version = "0.1.0" authors = ["debris "] [dependencies] +heapsize = "0.3" rustc-serialize = "0.3" diff --git a/primitives/src/hash.rs b/primitives/src/hash.rs index f2795fa1..a656db31 100644 --- a/primitives/src/hash.rs +++ b/primitives/src/hash.rs @@ -137,6 +137,8 @@ impl_hash!(H264, 33); impl_hash!(H512, 64); impl_hash!(H520, 65); +known_heap_size!(0, H32, H48, H96, H160, H256, H264, H512, H520); + impl H256 { #[inline] pub fn from_reversed_str(s: &'static str) -> Self { diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 477108c9..1f45ff3b 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -1,4 +1,5 @@ extern crate rustc_serialize; +#[macro_use] extern crate heapsize; pub mod bytes; pub mod hash; diff --git a/tools/graph.dot b/tools/graph.dot index b748cf12..65f8ef8d 100644 --- a/tools/graph.dot +++ b/tools/graph.dot @@ -2,69 +2,78 @@ digraph dependencies { N0[label="pbtc",shape=box]; N1[label="clap",shape=box]; N2[label="db",shape=box]; - N3[label="keys",shape=box]; - N4[label="message",shape=box]; - N5[label="miner",shape=box]; - N6[label="p2p",shape=box]; - N7[label="script",shape=box]; - N8[label="ansi_term",shape=box]; - N9[label="arrayvec",shape=box]; - N10[label="nodrop",shape=box]; - N11[label="odds",shape=box]; - N12[label="base58",shape=box]; - N13[label="bitcrypto",shape=box]; - N14[label="primitives",shape=box]; - N15[label="rust-crypto",shape=box]; - N16[label="bitflags v0.4.0",shape=box]; - N17[label="bitflags v0.7.0",shape=box]; - N18[label="byteorder",shape=box]; - N19[label="cfg-if",shape=box]; - N20[label="chain",shape=box]; - N21[label="rustc-serialize",shape=box]; - N22[label="serialization",shape=box]; - N23[label="libc",shape=box]; - N24[label="strsim",shape=box]; - N25[label="term_size",shape=box]; - N26[label="unicode-segmentation",shape=box]; - N27[label="unicode-width",shape=box]; - N28[label="vec_map",shape=box]; - N29[label="yaml-rust",shape=box]; - N30[label="crossbeam",shape=box]; - N31[label="elastic-array",shape=box]; - N32[label="ethcore-devtools",shape=box]; - N33[label="parking_lot",shape=box]; - N34[label="rocksdb",shape=box]; - N35[label="deque",shape=box]; - N36[label="rand",shape=box]; - N37[label="eth-secp256k1",shape=box]; - N38[label="gcc",shape=box]; - N39[label="futures",shape=box]; - N40[label="log",shape=box]; - N41[label="futures-cpupool",shape=box]; - N42[label="num_cpus v1.1.0",shape=box]; - N43[label="rayon",shape=box]; - N44[label="kernel32-sys",shape=box]; - N45[label="winapi",shape=box]; - N46[label="winapi-build",shape=box]; - N47[label="lazy_static",shape=box]; - N48[label="lazycell",shape=box]; - N49[label="mio",shape=box]; - N50[label="miow",shape=box]; - N51[label="net2",shape=box]; - N52[label="nix",shape=box]; - N53[label="slab",shape=box]; - N54[label="ws2_32-sys",shape=box]; - N55[label="rustc_version",shape=box]; - N56[label="semver",shape=box]; - N57[label="void",shape=box]; - N58[label="num_cpus v0.2.13",shape=box]; - N59[label="owning_ref",shape=box]; - N60[label="time",shape=box]; - N61[label="tokio-core",shape=box]; - N62[label="parking_lot_core",shape=box]; - N63[label="smallvec",shape=box]; - N64[label="rocksdb-sys",shape=box]; - N65[label="scoped-tls",shape=box]; + N3[label="env_logger",shape=box]; + N4[label="keys",shape=box]; + N5[label="message",shape=box]; + N6[label="miner",shape=box]; + N7[label="p2p",shape=box]; + N8[label="script",shape=box]; + N9[label="aho-corasick",shape=box]; + N10[label="memchr",shape=box]; + N11[label="ansi_term",shape=box]; + N12[label="arrayvec",shape=box]; + N13[label="nodrop",shape=box]; + N14[label="odds",shape=box]; + N15[label="base58",shape=box]; + N16[label="bitcrypto",shape=box]; + N17[label="primitives",shape=box]; + N18[label="rust-crypto",shape=box]; + N19[label="bitflags v0.4.0",shape=box]; + N20[label="bitflags v0.7.0",shape=box]; + N21[label="byteorder",shape=box]; + N22[label="cfg-if",shape=box]; + N23[label="chain",shape=box]; + N24[label="heapsize",shape=box]; + N25[label="rustc-serialize",shape=box]; + N26[label="serialization",shape=box]; + N27[label="libc",shape=box]; + N28[label="strsim",shape=box]; + N29[label="term_size",shape=box]; + N30[label="unicode-segmentation",shape=box]; + N31[label="unicode-width",shape=box]; + N32[label="vec_map",shape=box]; + N33[label="yaml-rust",shape=box]; + N34[label="crossbeam",shape=box]; + N35[label="elastic-array",shape=box]; + N36[label="ethcore-devtools",shape=box]; + N37[label="parking_lot",shape=box]; + N38[label="rocksdb",shape=box]; + N39[label="deque",shape=box]; + N40[label="rand",shape=box]; + N41[label="log",shape=box]; + N42[label="regex",shape=box]; + N43[label="eth-secp256k1",shape=box]; + N44[label="gcc",shape=box]; + N45[label="futures",shape=box]; + N46[label="futures-cpupool",shape=box]; + N47[label="num_cpus v1.1.0",shape=box]; + N48[label="rayon",shape=box]; + N49[label="kernel32-sys",shape=box]; + N50[label="winapi",shape=box]; + N51[label="winapi-build",shape=box]; + N52[label="lazy_static",shape=box]; + N53[label="lazycell",shape=box]; + N54[label="mio",shape=box]; + N55[label="miow",shape=box]; + N56[label="net2",shape=box]; + N57[label="nix",shape=box]; + N58[label="slab",shape=box]; + N59[label="ws2_32-sys",shape=box]; + N60[label="rustc_version",shape=box]; + N61[label="semver",shape=box]; + N62[label="void",shape=box]; + N63[label="num_cpus v0.2.13",shape=box]; + N64[label="owning_ref",shape=box]; + N65[label="time",shape=box]; + N66[label="tokio-core",shape=box]; + N67[label="parking_lot_core",shape=box]; + N68[label="smallvec",shape=box]; + N69[label="regex-syntax",shape=box]; + N70[label="thread_local",shape=box]; + N71[label="utf8-ranges",shape=box]; + N72[label="rocksdb-sys",shape=box]; + N73[label="scoped-tls",shape=box]; + N74[label="thread-id",shape=box]; N0 -> N1[label="",style=dashed]; N0 -> N2[label="",style=dashed]; N0 -> N3[label="",style=dashed]; @@ -72,137 +81,154 @@ digraph dependencies { N0 -> N5[label="",style=dashed]; N0 -> N6[label="",style=dashed]; N0 -> N7[label="",style=dashed]; - N1 -> N8[label="",style=dashed]; - N1 -> N17[label="",style=dashed]; - N1 -> N23[label="",style=dashed]; - N1 -> N24[label="",style=dashed]; - N1 -> N25[label="",style=dashed]; - N1 -> N26[label="",style=dashed]; + N0 -> N8[label="",style=dashed]; + N1 -> N11[label="",style=dashed]; + N1 -> N20[label="",style=dashed]; N1 -> N27[label="",style=dashed]; N1 -> N28[label="",style=dashed]; N1 -> N29[label="",style=dashed]; - N2 -> N14[label="",style=dashed]; - N2 -> N18[label="",style=dashed]; - N2 -> N20[label="",style=dashed]; - N2 -> N22[label="",style=dashed]; - N2 -> N31[label="",style=dashed]; - N2 -> N32[label="",style=dashed]; - N2 -> N33[label="",style=dashed]; - N2 -> N34[label="",style=dashed]; - N3 -> N12[label="",style=dashed]; - N3 -> N13[label="",style=dashed]; - N3 -> N14[label="",style=dashed]; - N3 -> N21[label="",style=dashed]; - N3 -> N36[label="",style=dashed]; - N3 -> N37[label="",style=dashed]; - N3 -> N47[label="",style=dashed]; - N4 -> N13[label="",style=dashed]; - N4 -> N14[label="",style=dashed]; - N4 -> N18[label="",style=dashed]; - N4 -> N20[label="",style=dashed]; - N4 -> N22[label="",style=dashed]; - N5 -> N14[label="",style=dashed]; - N5 -> N20[label="",style=dashed]; - N5 -> N22[label="",style=dashed]; - N6 -> N4[label="",style=dashed]; - N6 -> N13[label="",style=dashed]; - N6 -> N14[label="",style=dashed]; - N6 -> N22[label="",style=dashed]; - N6 -> N33[label="",style=dashed]; - N6 -> N36[label="",style=dashed]; - N6 -> N39[label="",style=dashed]; - N6 -> N41[label="",style=dashed]; - N6 -> N60[label="",style=dashed]; - N6 -> N61[label="",style=dashed]; - N7 -> N3[label="",style=dashed]; - N7 -> N13[label="",style=dashed]; - N7 -> N14[label="",style=dashed]; - N7 -> N20[label="",style=dashed]; - N7 -> N22[label="",style=dashed]; - N9 -> N10[label=""]; - N9 -> N11[label=""]; - N10 -> N11[label=""]; - N13 -> N14[label="",style=dashed]; - N13 -> N15[label="",style=dashed]; - N14 -> N21[label="",style=dashed]; - N15 -> N21[label="",style=dashed]; - N15 -> N23[label="",style=dashed]; - N15 -> N36[label="",style=dashed]; - N15 -> N38[label="",style=dashed]; - N15 -> N60[label="",style=dashed]; - N20 -> N13[label="",style=dashed]; - N20 -> N14[label="",style=dashed]; - N20 -> N21[label="",style=dashed]; - N20 -> N22[label="",style=dashed]; - N22 -> N14[label="",style=dashed]; - N22 -> N18[label="",style=dashed]; - N25 -> N23[label="",style=dashed]; - N25 -> N44[label="",style=dashed]; - N25 -> N45[label="",style=dashed]; - N32 -> N36[label="",style=dashed]; - N33 -> N59[label="",style=dashed]; - N33 -> N62[label="",style=dashed]; - N34 -> N23[label="",style=dashed]; - N34 -> N64[label="",style=dashed]; - N35 -> N36[label="",style=dashed]; - N36 -> N23[label="",style=dashed]; - N37 -> N9[label="",style=dashed]; - N37 -> N21[label="",style=dashed]; - N37 -> N23[label="",style=dashed]; - N37 -> N36[label="",style=dashed]; - N37 -> N38[label="",style=dashed]; - N38 -> N43[label="",style=dashed]; + N1 -> N30[label="",style=dashed]; + N1 -> N31[label="",style=dashed]; + N1 -> N32[label="",style=dashed]; + N1 -> N33[label="",style=dashed]; + N2 -> N17[label="",style=dashed]; + N2 -> N21[label="",style=dashed]; + N2 -> N23[label="",style=dashed]; + N2 -> N26[label="",style=dashed]; + N2 -> N35[label="",style=dashed]; + N2 -> N36[label="",style=dashed]; + N2 -> N37[label="",style=dashed]; + N2 -> N38[label="",style=dashed]; + N3 -> N41[label="",style=dashed]; + N3 -> N42[label="",style=dashed]; + N4 -> N15[label="",style=dashed]; + N4 -> N16[label="",style=dashed]; + N4 -> N17[label="",style=dashed]; + N4 -> N25[label="",style=dashed]; + N4 -> N40[label="",style=dashed]; + N4 -> N43[label="",style=dashed]; + N4 -> N52[label="",style=dashed]; + N5 -> N16[label="",style=dashed]; + N5 -> N17[label="",style=dashed]; + N5 -> N21[label="",style=dashed]; + N5 -> N23[label="",style=dashed]; + N5 -> N26[label="",style=dashed]; + N6 -> N17[label="",style=dashed]; + N6 -> N23[label="",style=dashed]; + N6 -> N24[label="",style=dashed]; + N6 -> N26[label="",style=dashed]; + N7 -> N5[label="",style=dashed]; + N7 -> N16[label="",style=dashed]; + N7 -> N17[label="",style=dashed]; + N7 -> N37[label="",style=dashed]; + N7 -> N40[label="",style=dashed]; + N7 -> N41[label="",style=dashed]; + N7 -> N45[label="",style=dashed]; + N7 -> N46[label="",style=dashed]; + N7 -> N65[label="",style=dashed]; + N7 -> N66[label="",style=dashed]; + N8 -> N4[label="",style=dashed]; + N8 -> N16[label="",style=dashed]; + N8 -> N17[label="",style=dashed]; + N8 -> N23[label="",style=dashed]; + N8 -> N26[label="",style=dashed]; + N9 -> N10[label="",style=dashed]; + N10 -> N27[label="",style=dashed]; + N12 -> N13[label=""]; + N12 -> N14[label=""]; + N13 -> N14[label=""]; + N16 -> N17[label="",style=dashed]; + N16 -> N18[label="",style=dashed]; + N17 -> N24[label="",style=dashed]; + N17 -> N25[label="",style=dashed]; + N18 -> N25[label="",style=dashed]; + N18 -> N27[label="",style=dashed]; + N18 -> N40[label="",style=dashed]; + N18 -> N44[label="",style=dashed]; + N18 -> N65[label="",style=dashed]; + N23 -> N16[label="",style=dashed]; + N23 -> N17[label="",style=dashed]; + N23 -> N24[label="",style=dashed]; + N23 -> N25[label="",style=dashed]; + N23 -> N26[label="",style=dashed]; + N24 -> N49[label="",style=dashed]; + N26 -> N17[label="",style=dashed]; + N26 -> N21[label="",style=dashed]; + N29 -> N27[label="",style=dashed]; + N29 -> N49[label="",style=dashed]; + N29 -> N50[label="",style=dashed]; + N36 -> N40[label="",style=dashed]; + N37 -> N64[label="",style=dashed]; + N37 -> N67[label="",style=dashed]; + N38 -> N27[label="",style=dashed]; + N38 -> N72[label="",style=dashed]; N39 -> N40[label="",style=dashed]; - N41 -> N30[label="",style=dashed]; - N41 -> N39[label="",style=dashed]; - N41 -> N42[label="",style=dashed]; - N42 -> N23[label="",style=dashed]; - N43 -> N35[label="",style=dashed]; - N43 -> N36[label="",style=dashed]; - N43 -> N58[label="",style=dashed]; - N44 -> N45[label="",style=dashed]; - N44 -> N46[label="",style=dashed]; - N49 -> N23[label="",style=dashed]; - N49 -> N40[label="",style=dashed]; - N49 -> N44[label="",style=dashed]; - N49 -> N45[label="",style=dashed]; - N49 -> N48[label=""]; - N49 -> N50[label=""]; - N49 -> N51[label=""]; - N49 -> N52[label=""]; - N49 -> N53[label="",style=dashed]; - N50 -> N44[label=""]; - N50 -> N45[label=""]; - N50 -> N51[label=""]; - N50 -> N54[label=""]; - N51 -> N19[label=""]; - N51 -> N23[label=""]; - N51 -> N44[label=""]; - N51 -> N45[label=""]; - N51 -> N54[label=""]; - N52 -> N16[label=""]; - N52 -> N19[label=""]; - N52 -> N23[label=""]; - N52 -> N55[label=""]; - N52 -> N56[label=""]; - N52 -> N57[label=""]; - N54 -> N45[label=""]; - N54 -> N46[label=""]; + N40 -> N27[label="",style=dashed]; + N42 -> N9[label="",style=dashed]; + N42 -> N10[label="",style=dashed]; + N42 -> N69[label="",style=dashed]; + N42 -> N70[label="",style=dashed]; + N42 -> N71[label="",style=dashed]; + N43 -> N12[label="",style=dashed]; + N43 -> N25[label="",style=dashed]; + N43 -> N27[label="",style=dashed]; + N43 -> N40[label="",style=dashed]; + N43 -> N44[label="",style=dashed]; + N44 -> N48[label="",style=dashed]; + N45 -> N41[label="",style=dashed]; + N46 -> N34[label="",style=dashed]; + N46 -> N45[label="",style=dashed]; + N46 -> N47[label="",style=dashed]; + N47 -> N27[label="",style=dashed]; + N48 -> N39[label="",style=dashed]; + N48 -> N40[label="",style=dashed]; + N48 -> N63[label="",style=dashed]; + N49 -> N50[label="",style=dashed]; + N49 -> N51[label="",style=dashed]; + N54 -> N27[label="",style=dashed]; + N54 -> N41[label="",style=dashed]; + N54 -> N49[label="",style=dashed]; + N54 -> N50[label="",style=dashed]; + N54 -> N53[label=""]; + N54 -> N55[label=""]; + N54 -> N56[label=""]; + N54 -> N57[label=""]; + N54 -> N58[label="",style=dashed]; + N55 -> N49[label=""]; + N55 -> N50[label=""]; N55 -> N56[label=""]; - N58 -> N23[label="",style=dashed]; - N60 -> N23[label="",style=dashed]; - N60 -> N44[label="",style=dashed]; - N60 -> N45[label="",style=dashed]; - N61 -> N39[label="",style=dashed]; - N61 -> N40[label="",style=dashed]; - N61 -> N49[label="",style=dashed]; - N61 -> N53[label="",style=dashed]; - N61 -> N65[label="",style=dashed]; - N62 -> N23[label="",style=dashed]; - N62 -> N36[label="",style=dashed]; - N62 -> N44[label="",style=dashed]; - N62 -> N45[label="",style=dashed]; - N62 -> N63[label="",style=dashed]; - N64 -> N23[label="",style=dashed]; - N64 -> N38[label="",style=dashed]; + N55 -> N59[label=""]; + N56 -> N22[label=""]; + N56 -> N27[label=""]; + N56 -> N49[label=""]; + N56 -> N50[label=""]; + N56 -> N59[label=""]; + N57 -> N19[label=""]; + N57 -> N22[label=""]; + N57 -> N27[label=""]; + N57 -> N60[label=""]; + N57 -> N61[label=""]; + N57 -> N62[label=""]; + N59 -> N50[label=""]; + N59 -> N51[label=""]; + N60 -> N61[label=""]; + N63 -> N27[label="",style=dashed]; + N65 -> N27[label="",style=dashed]; + N65 -> N49[label="",style=dashed]; + N65 -> N50[label="",style=dashed]; + N66 -> N41[label="",style=dashed]; + N66 -> N45[label="",style=dashed]; + N66 -> N54[label="",style=dashed]; + N66 -> N58[label="",style=dashed]; + N66 -> N73[label="",style=dashed]; + N67 -> N27[label="",style=dashed]; + N67 -> N40[label="",style=dashed]; + N67 -> N49[label="",style=dashed]; + N67 -> N50[label="",style=dashed]; + N67 -> N68[label="",style=dashed]; + N70 -> N74[label="",style=dashed]; + N72 -> N27[label="",style=dashed]; + N72 -> N44[label="",style=dashed]; + N74 -> N27[label="",style=dashed]; + N74 -> N49[label="",style=dashed]; } diff --git a/tools/graph.png b/tools/graph.png index aa0d3927..94c46fe2 100644 Binary files a/tools/graph.png and b/tools/graph.png differ