added logger to p2p, fixed reading messages

This commit is contained in:
debris 2016-10-19 14:17:18 +02:00
parent 716fd8d949
commit 49bb034bbd
12 changed files with 449 additions and 248 deletions

74
Cargo.lock generated
View File

@ -4,6 +4,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"clap 2.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"db 0.1.0", "db 0.1.0",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"keys 0.1.0", "keys 0.1.0",
"message 0.1.0", "message 0.1.0",
"miner 0.1.0", "miner 0.1.0",
@ -11,6 +12,14 @@ dependencies = [
"script 0.1.0", "script 0.1.0",
] ]
[[package]]
name = "aho-corasick"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "ansi_term" name = "ansi_term"
version = "0.9.0" version = "0.9.0"
@ -116,6 +125,15 @@ name = "elastic-array"
version = "0.5.0" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "env_logger"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 0.1.77 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "eth-secp256k1" name = "eth-secp256k1"
version = "0.5.6" version = "0.5.6"
@ -203,6 +221,14 @@ name = "log"
version = "0.3.6" version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "memchr"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "message" name = "message"
version = "0.1.0" version = "0.1.0"
@ -316,6 +342,7 @@ dependencies = [
"bitcrypto 0.1.0", "bitcrypto 0.1.0",
"futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"message 0.1.0", "message 0.1.0",
"parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"primitives 0.1.0", "primitives 0.1.0",
@ -370,6 +397,23 @@ dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "regex"
version = "0.1.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"regex-syntax 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
"utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "regex-syntax"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "rocksdb" name = "rocksdb"
version = "0.4.5" version = "0.4.5"
@ -467,6 +511,23 @@ dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "thread-id"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "thread_local"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.35" version = "0.1.35"
@ -499,6 +560,11 @@ name = "unicode-width"
version = "0.1.3" version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "utf8-ranges"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.6.0" version = "0.6.0"
@ -534,6 +600,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata] [metadata]
"checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66"
"checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6"
"checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096" "checksum arrayvec 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d89f1b0e242270b5b797778af0c8d182a1a2ccac5d8d6fadf414223cc0fab096"
"checksum base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5024ee8015f02155eee35c711107ddd9a9bf3cb689cf2a9089c97e79b6e1ae83" "checksum base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5024ee8015f02155eee35c711107ddd9a9bf3cb689cf2a9089c97e79b6e1ae83"
@ -545,6 +612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf" "checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
"checksum elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4bc9250a632e7c001b741eb0ec6cee93c9a5b6d5f1879696a4b94d62b012210a" "checksum elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4bc9250a632e7c001b741eb0ec6cee93c9a5b6d5f1879696a4b94d62b012210a"
"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
"checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "<none>" "checksum eth-secp256k1 0.5.6 (git+https://github.com/ethcore/rust-secp256k1)" = "<none>"
"checksum futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0bd34f72c0fffc9d2f6c570fd392bf99b9c5cd1481d79809e1cc2320befc0af0" "checksum futures 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0bd34f72c0fffc9d2f6c570fd392bf99b9c5cd1481d79809e1cc2320befc0af0"
"checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82" "checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82"
@ -554,6 +622,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
"checksum libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)" = "408014cace30ee0f767b1c4517980646a573ec61a57957aeeabcac8ac0a02e8d" "checksum libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)" = "408014cace30ee0f767b1c4517980646a573ec61a57957aeeabcac8ac0a02e8d"
"checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054" "checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054"
"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20"
"checksum mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2dadd39d4b47343e10513ac2a731c979517a4761224ecb6bbd243602300c9537" "checksum mio 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2dadd39d4b47343e10513ac2a731c979517a4761224ecb6bbd243602300c9537"
"checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a" "checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a"
"checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
@ -567,6 +636,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum parking_lot_core 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fb1b97670a2ffadce7c397fb80a3d687c4f3060140b885621ef1653d0e5d5068" "checksum parking_lot_core 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fb1b97670a2ffadce7c397fb80a3d687c4f3060140b885621ef1653d0e5d5068"
"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5" "checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5"
"checksum rayon 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "655df67c314c30fa3055a365eae276eb88aa4f3413a352a1ab32c1320eda41ea" "checksum rayon 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "655df67c314c30fa3055a365eae276eb88aa4f3413a352a1ab32c1320eda41ea"
"checksum regex 0.1.77 (registry+https://github.com/rust-lang/crates.io-index)" = "64b03446c466d35b42f2a8b203c8e03ed8b91c0f17b56e1f84f7210a257aa665"
"checksum regex-syntax 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "48f0573bcee95a48da786f8823465b5f2a1fae288a55407aca991e5b3e0eae11"
"checksum rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>" "checksum rocksdb 0.4.5 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>"
"checksum rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>" "checksum rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)" = "<none>"
"checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" "checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a"
@ -578,10 +649,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410" "checksum smallvec 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fcc8d19212aacecf95e4a7a2179b26f7aeb9732a915cf01f05b0d3e044865410"
"checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e" "checksum strsim 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "50c069df92e4b01425a8bf3576d5d417943a6a7272fbabaf5bd80b1aaa76442e"
"checksum term_size 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3f7f5f3f71b0040cecc71af239414c23fd3c73570f5ff54cf50e03cef637f2a0" "checksum term_size 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3f7f5f3f71b0040cecc71af239414c23fd3c73570f5ff54cf50e03cef637f2a0"
"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03"
"checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5"
"checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af" "checksum time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7ec6d62a20df54e07ab3b78b9a3932972f4b7981de295563686849eb3989af"
"checksum tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "659cbae6c954dee37352853816c6a52180e47feb70be73bbfeec6d58c4da4a71" "checksum tokio-core 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "659cbae6c954dee37352853816c6a52180e47feb70be73bbfeec6d58c4da4a71"
"checksum unicode-segmentation 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b905d0fc2a1f0befd86b0e72e31d1787944efef9d38b9358a9e92a69757f7e3b" "checksum unicode-segmentation 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b905d0fc2a1f0befd86b0e72e31d1787944efef9d38b9358a9e92a69757f7e3b"
"checksum unicode-width 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6722facc10989f63ee0e20a83cd4e1714a9ae11529403ac7e0afd069abc39e" "checksum unicode-width 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2d6722facc10989f63ee0e20a83cd4e1714a9ae11529403ac7e0afd069abc39e"
"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"
"checksum vec_map 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cac5efe5cb0fa14ec2f84f83c701c562ee63f6dcc680861b21d65c682adfb05f" "checksum vec_map 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cac5efe5cb0fa14ec2f84f83c701c562ee63f6dcc680861b21d65c682adfb05f"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"

View File

@ -6,6 +6,7 @@ authors = ["Ethcore <admin@ethcore.io>"]
description = "Parity bitcoin client." description = "Parity bitcoin client."
[dependencies] [dependencies]
env_logger = "0.3"
clap = { version = "2", features = ["yaml"] } clap = { version = "2", features = ["yaml"] }
keys = { path = "keys" } keys = { path = "keys" }
message = { path = "message" } message = { path = "message" }

View File

@ -10,6 +10,7 @@ futures = "0.1"
futures-cpupool = "0.1" futures-cpupool = "0.1"
time = "0.1" time = "0.1"
rand = "0.3" rand = "0.3"
log = "0.3"
primitives = { path = "../primitives" } primitives = { path = "../primitives" }
bitcrypto = { path = "../crypto" } bitcrypto = { path = "../crypto" }

View File

@ -58,6 +58,10 @@ impl<A> Stream for ReadMessageStream<A> where A: io::Read {
}; };
self.state = next; self.state = next;
Ok(result) match result {
// by polling again, we register new future
Async::NotReady => self.poll(),
result => Ok(result)
}
} }
} }

View File

@ -5,6 +5,8 @@ extern crate rand;
extern crate time; extern crate time;
extern crate tokio_core; extern crate tokio_core;
extern crate parking_lot; extern crate parking_lot;
#[macro_use]
extern crate log;
extern crate bitcrypto as crypto; extern crate bitcrypto as crypto;
extern crate message; extern crate message;

View File

@ -7,7 +7,16 @@ use message::common::Command;
use net::Connections; use net::Connections;
use PeerInfo; use PeerInfo;
/// Reads and concatenates incoming messages from all connections
/// into a single messages stream.
pub struct MessagePoller {
last_polled: usize,
connections: Weak<Connections>,
}
/// Result of polling connections for new message.
pub enum MessagePoll { pub enum MessagePoll {
/// Returned on new message.
Ready { Ready {
command: Command, command: Command,
payload: Bytes, payload: Bytes,
@ -15,19 +24,18 @@ pub enum MessagePoll {
peer_info: PeerInfo, peer_info: PeerInfo,
errored_peers: Vec<PeerInfo>, errored_peers: Vec<PeerInfo>,
}, },
/// Returned when there are no new messages
/// and some peers disconnected.
OnlyErrors { OnlyErrors {
errored_peers: Vec<PeerInfo>, errored_peers: Vec<PeerInfo>,
} },
} /// Returned when there are not peers to poll.
WaitingForPeers,
pub struct MessagePoller {
last_polled: usize,
connections: Weak<Connections>,
} }
fn next_to_poll(channels: usize, last_polled: usize) -> usize { fn next_to_poll(channels: usize, last_polled: usize) -> usize {
// it's irrelevant if we sometimes poll the same peer twice in a row // it's irrelevant if we sometimes poll the same peer twice in a row
if channels > last_polled + 1 { if channels - 1 > last_polled {
// let's poll the next peer // let's poll the next peer
last_polled + 1 last_polled + 1
} else { } else {
@ -53,57 +61,92 @@ impl Stream for MessagePoller {
let connections = match self.connections.upgrade() { let connections = match self.connections.upgrade() {
Some(c) => c, Some(c) => c,
// application is about to shutdown // application is about to shutdown
// let's end the stream
None => return Ok(None.into()) None => return Ok(None.into())
}; };
let channels = connections.channels(); let channels = connections.channels();
if channels.len() == 0 { match channels.len() {
// let's wait for some connections 0 => {
return Ok(Async::NotReady); // stream is not ready for reading messages
} // returning Async::NotReady, would require scheduling a poll
// returning Async::Ready with custom value is hash, cause next
let mut to_poll = next_to_poll(channels.len(), self.last_polled); // poll will be automatically scheduled
let mut result = None; Ok(Async::Ready(Some(MessagePoll::WaitingForPeers)))
let mut errored_peers = Vec::new();
while result.is_none() && to_poll != self.last_polled {
let (_, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len()");
let status = channel.poll_message();
match status {
Ok(Async::Ready(Some(Ok((command, payload))))) => {
result = Some((command, payload, channel.version(), channel.peer_info()));
},
Ok(Async::NotReady) => {
// no messages yet, try next channel
to_poll = next_to_poll(channels.len(), to_poll);
},
_ => {
// channel has been closed or there was error
errored_peers.push(channel.peer_info());
to_poll = next_to_poll(channels.len(), to_poll);
},
}
}
self.last_polled = to_poll;
match result {
Some((command, payload, version, info)) => {
let message_poll = MessagePoll::Ready {
command: command,
payload: payload,
version: version,
peer_info: info,
errored_peers: errored_peers,
};
Ok(Async::Ready(Some(message_poll)))
}, },
None if errored_peers.is_empty() => Ok(Async::NotReady), 1 => {
_ => { let (_, channel) = channels.iter().nth(0).expect("0 < channels.len(); qed");
let message_poll = MessagePoll::OnlyErrors {
errored_peers: errored_peers, match channel.poll_message() {
}; Ok(Async::Ready(Some(Ok((command, payload))))) => {
Ok(Async::Ready(Some(message_poll))) let message_poll = MessagePoll::Ready {
command: command,
payload: payload,
version: channel.version(),
peer_info: channel.peer_info(),
errored_peers: Vec::new(),
};
Ok(Async::Ready(Some(message_poll)))
},
Ok(Async::NotReady) => {
Ok(Async::NotReady)
},
_ => {
let message_poll = MessagePoll::OnlyErrors {
errored_peers: vec![channel.peer_info()],
};
Ok(Async::Ready(Some(message_poll)))
}
}
},
n => {
let mut to_poll = next_to_poll(n, self.last_polled);
let mut result = None;
let mut errored_peers = Vec::new();
while result.is_none() && to_poll != self.last_polled {
let (_, channel) = channels.iter().nth(to_poll).expect("to_poll < channels.len(); qed");
match channel.poll_message() {
Ok(Async::Ready(Some(Ok((command, payload))))) => {
result = Some((command, payload, channel.version(), channel.peer_info()));
},
Ok(Async::NotReady) => {
// no messages yet, try next channel
to_poll = next_to_poll(channels.len(), to_poll);
},
_ => {
// channel has been closed or there was error
errored_peers.push(channel.peer_info());
to_poll = next_to_poll(channels.len(), to_poll);
},
}
}
self.last_polled = to_poll;
match result {
Some((command, payload, version, info)) => {
let message_poll = MessagePoll::Ready {
command: command,
payload: payload,
version: version,
peer_info: info,
errored_peers: errored_peers,
};
Ok(Async::Ready(Some(message_poll)))
},
None if errored_peers.is_empty() => Ok(Async::NotReady),
_ => {
let message_poll = MessagePoll::OnlyErrors {
errored_peers: errored_peers,
};
Ok(Async::Ready(Some(message_poll)))
}
}
} }
} }
} }

View File

@ -37,8 +37,8 @@ impl P2P {
} }
pub fn run(&self) -> Result<(), io::Error> { pub fn run(&self) -> Result<(), io::Error> {
for seednode in self.config.peers.iter() { for peer in self.config.peers.iter() {
self.connect(*seednode) self.connect(*peer)
} }
try!(self.listen()); try!(self.listen());
@ -51,11 +51,14 @@ impl P2P {
let connections = self.connections.clone(); let connections = self.connections.clone();
let node_table = self.node_table.clone(); let node_table = self.node_table.clone();
let connection = connect(&socket, &self.event_loop_handle, &self.config.connection); let connection = connect(&socket, &self.event_loop_handle, &self.config.connection);
trace!("Trying to connect to: {}", socket);
let pool_work = self.pool.spawn(connection).then(move |result| { let pool_work = self.pool.spawn(connection).then(move |result| {
if let Ok(Ok(con)) = result { if let Ok(Ok(con)) = result {
trace!("Connected to {}", con.address);
node_table.write().insert(con.address, con.services); node_table.write().insert(con.address, con.services);
connections.store(con); connections.store(con);
} else { } else {
trace!("Failed to connect to {}", socket);
node_table.write().note_failure(&socket); node_table.write().note_failure(&socket);
} }
finished(()) finished(())
@ -69,6 +72,7 @@ impl P2P {
let node_table = self.node_table.clone(); let node_table = self.node_table.clone();
let server = listen.for_each(move |result| { let server = listen.for_each(move |result| {
if let Ok(con) = result { if let Ok(con) = result {
trace!("Accepted connection from {}", con.address);
node_table.write().insert(con.address, con.services); node_table.write().insert(con.address, con.services);
connections.store(con); connections.store(con);
} }
@ -89,7 +93,8 @@ impl P2P {
let node_table = self.node_table.clone(); let node_table = self.node_table.clone();
let polling = poller.for_each(move |result| { let polling = poller.for_each(move |result| {
match result { match result {
MessagePoll::Ready { errored_peers, .. } => { MessagePoll::Ready { errored_peers, command, peer_info, .. } => {
trace!("Received message {} from {}", command, peer_info.address);
// TODO: handle new messasges here! // TODO: handle new messasges here!
let mut node_table = node_table.write(); let mut node_table = node_table.write();
@ -104,7 +109,10 @@ impl P2P {
node_table.note_failure(&peer.address); node_table.note_failure(&peer.address);
connections.remove(peer.id); connections.remove(peer.id);
} }
} },
MessagePoll::WaitingForPeers => {
// do nothing
},
} }
Ok(()) Ok(())
}).then(|_| { }).then(|_| {

View File

@ -1 +1,21 @@
mod ping; mod ping;
use bytes::Bytes;
use message::Error;
use message::common::Command;
pub use self::ping::PingProtocol;
pub enum ProtocolResult {
Reply(Bytes),
Error(Error),
None,
Disconnect,
}
pub trait Protocol {
/// Initialize the protocol.
fn initialize(&self);
/// Handle the message.
fn on_message(&self, command: &Command, payload: &Bytes) -> ProtocolResult;
}

View File

@ -0,0 +1,24 @@
use bytes::Bytes;
use message::types::{Ping, Pong};
use message::common::Command;
use protocol::{Protocol, ProtocolResult};
enum PingState {
ExpectingPing,
ExpectingPong,
}
pub struct PingProtocol {
state: PingState,
}
impl Protocol for PingProtocol {
fn initialize(&self) {
// send ping to the peer
}
fn on_message(&self, command: &Command, payload: &Bytes) -> ProtocolResult {
ProtocolResult::None
}
}

View File

@ -2,6 +2,7 @@
#[macro_use] #[macro_use]
extern crate clap; extern crate clap;
extern crate env_logger;
extern crate keys; extern crate keys;
extern crate script; extern crate script;
@ -14,6 +15,7 @@ use std::net::SocketAddr;
use p2p::{P2P, event_loop, forever, net}; use p2p::{P2P, event_loop, forever, net};
fn main() { fn main() {
env_logger::init().unwrap();
match run() { match run() {
Err(err) => println!("{}", err), Err(err) => println!("{}", err),
Ok(_) => (), Ok(_) => (),

View File

@ -2,69 +2,77 @@ digraph dependencies {
N0[label="pbtc",shape=box]; N0[label="pbtc",shape=box];
N1[label="clap",shape=box]; N1[label="clap",shape=box];
N2[label="db",shape=box]; N2[label="db",shape=box];
N3[label="keys",shape=box]; N3[label="env_logger",shape=box];
N4[label="message",shape=box]; N4[label="keys",shape=box];
N5[label="miner",shape=box]; N5[label="message",shape=box];
N6[label="p2p",shape=box]; N6[label="miner",shape=box];
N7[label="script",shape=box]; N7[label="p2p",shape=box];
N8[label="ansi_term",shape=box]; N8[label="script",shape=box];
N9[label="arrayvec",shape=box]; N9[label="aho-corasick",shape=box];
N10[label="nodrop",shape=box]; N10[label="memchr",shape=box];
N11[label="odds",shape=box]; N11[label="ansi_term",shape=box];
N12[label="base58",shape=box]; N12[label="arrayvec",shape=box];
N13[label="bitcrypto",shape=box]; N13[label="nodrop",shape=box];
N14[label="primitives",shape=box]; N14[label="odds",shape=box];
N15[label="rust-crypto",shape=box]; N15[label="base58",shape=box];
N16[label="bitflags v0.4.0",shape=box]; N16[label="bitcrypto",shape=box];
N17[label="bitflags v0.7.0",shape=box]; N17[label="primitives",shape=box];
N18[label="byteorder",shape=box]; N18[label="rust-crypto",shape=box];
N19[label="cfg-if",shape=box]; N19[label="bitflags v0.4.0",shape=box];
N20[label="chain",shape=box]; N20[label="bitflags v0.7.0",shape=box];
N21[label="rustc-serialize",shape=box]; N21[label="byteorder",shape=box];
N22[label="serialization",shape=box]; N22[label="cfg-if",shape=box];
N23[label="libc",shape=box]; N23[label="chain",shape=box];
N24[label="strsim",shape=box]; N24[label="rustc-serialize",shape=box];
N25[label="term_size",shape=box]; N25[label="serialization",shape=box];
N26[label="unicode-segmentation",shape=box]; N26[label="libc",shape=box];
N27[label="unicode-width",shape=box]; N27[label="strsim",shape=box];
N28[label="vec_map",shape=box]; N28[label="term_size",shape=box];
N29[label="yaml-rust",shape=box]; N29[label="unicode-segmentation",shape=box];
N30[label="crossbeam",shape=box]; N30[label="unicode-width",shape=box];
N31[label="elastic-array",shape=box]; N31[label="vec_map",shape=box];
N32[label="ethcore-devtools",shape=box]; N32[label="yaml-rust",shape=box];
N33[label="parking_lot",shape=box]; N33[label="crossbeam",shape=box];
N34[label="rocksdb",shape=box]; N34[label="elastic-array",shape=box];
N35[label="deque",shape=box]; N35[label="ethcore-devtools",shape=box];
N36[label="rand",shape=box]; N36[label="parking_lot",shape=box];
N37[label="eth-secp256k1",shape=box]; N37[label="rocksdb",shape=box];
N38[label="gcc",shape=box]; N38[label="deque",shape=box];
N39[label="futures",shape=box]; N39[label="rand",shape=box];
N40[label="log",shape=box]; N40[label="log",shape=box];
N41[label="futures-cpupool",shape=box]; N41[label="regex",shape=box];
N42[label="num_cpus v1.1.0",shape=box]; N42[label="eth-secp256k1",shape=box];
N43[label="rayon",shape=box]; N43[label="gcc",shape=box];
N44[label="kernel32-sys",shape=box]; N44[label="futures",shape=box];
N45[label="winapi",shape=box]; N45[label="futures-cpupool",shape=box];
N46[label="winapi-build",shape=box]; N46[label="num_cpus v1.1.0",shape=box];
N47[label="lazy_static",shape=box]; N47[label="rayon",shape=box];
N48[label="lazycell",shape=box]; N48[label="kernel32-sys",shape=box];
N49[label="mio",shape=box]; N49[label="winapi",shape=box];
N50[label="miow",shape=box]; N50[label="winapi-build",shape=box];
N51[label="net2",shape=box]; N51[label="lazy_static",shape=box];
N52[label="nix",shape=box]; N52[label="lazycell",shape=box];
N53[label="slab",shape=box]; N53[label="mio",shape=box];
N54[label="ws2_32-sys",shape=box]; N54[label="miow",shape=box];
N55[label="rustc_version",shape=box]; N55[label="net2",shape=box];
N56[label="semver",shape=box]; N56[label="nix",shape=box];
N57[label="void",shape=box]; N57[label="slab",shape=box];
N58[label="num_cpus v0.2.13",shape=box]; N58[label="ws2_32-sys",shape=box];
N59[label="owning_ref",shape=box]; N59[label="rustc_version",shape=box];
N60[label="time",shape=box]; N60[label="semver",shape=box];
N61[label="tokio-core",shape=box]; N61[label="void",shape=box];
N62[label="parking_lot_core",shape=box]; N62[label="num_cpus v0.2.13",shape=box];
N63[label="smallvec",shape=box]; N63[label="owning_ref",shape=box];
N64[label="rocksdb-sys",shape=box]; N64[label="time",shape=box];
N65[label="scoped-tls",shape=box]; N65[label="tokio-core",shape=box];
N66[label="parking_lot_core",shape=box];
N67[label="smallvec",shape=box];
N68[label="regex-syntax",shape=box];
N69[label="thread_local",shape=box];
N70[label="utf8-ranges",shape=box];
N71[label="rocksdb-sys",shape=box];
N72[label="scoped-tls",shape=box];
N73[label="thread-id",shape=box];
N0 -> N1[label="",style=dashed]; N0 -> N1[label="",style=dashed];
N0 -> N2[label="",style=dashed]; N0 -> N2[label="",style=dashed];
N0 -> N3[label="",style=dashed]; N0 -> N3[label="",style=dashed];
@ -72,136 +80,150 @@ digraph dependencies {
N0 -> N5[label="",style=dashed]; N0 -> N5[label="",style=dashed];
N0 -> N6[label="",style=dashed]; N0 -> N6[label="",style=dashed];
N0 -> N7[label="",style=dashed]; N0 -> N7[label="",style=dashed];
N1 -> N8[label="",style=dashed]; N0 -> N8[label="",style=dashed];
N1 -> N17[label="",style=dashed]; N1 -> N11[label="",style=dashed];
N1 -> N23[label="",style=dashed]; N1 -> N20[label="",style=dashed];
N1 -> N24[label="",style=dashed];
N1 -> N25[label="",style=dashed];
N1 -> N26[label="",style=dashed]; N1 -> N26[label="",style=dashed];
N1 -> N27[label="",style=dashed]; N1 -> N27[label="",style=dashed];
N1 -> N28[label="",style=dashed]; N1 -> N28[label="",style=dashed];
N1 -> N29[label="",style=dashed]; N1 -> N29[label="",style=dashed];
N2 -> N14[label="",style=dashed]; N1 -> N30[label="",style=dashed];
N2 -> N18[label="",style=dashed]; N1 -> N31[label="",style=dashed];
N2 -> N20[label="",style=dashed]; N1 -> N32[label="",style=dashed];
N2 -> N22[label="",style=dashed]; N2 -> N17[label="",style=dashed];
N2 -> N31[label="",style=dashed]; N2 -> N21[label="",style=dashed];
N2 -> N32[label="",style=dashed]; N2 -> N23[label="",style=dashed];
N2 -> N33[label="",style=dashed]; N2 -> N25[label="",style=dashed];
N2 -> N34[label="",style=dashed]; N2 -> N34[label="",style=dashed];
N3 -> N12[label="",style=dashed]; N2 -> N35[label="",style=dashed];
N3 -> N13[label="",style=dashed]; N2 -> N36[label="",style=dashed];
N3 -> N14[label="",style=dashed]; N2 -> N37[label="",style=dashed];
N3 -> N21[label="",style=dashed]; N3 -> N40[label="",style=dashed];
N3 -> N36[label="",style=dashed]; N3 -> N41[label="",style=dashed];
N3 -> N37[label="",style=dashed]; N4 -> N15[label="",style=dashed];
N3 -> N47[label="",style=dashed]; N4 -> N16[label="",style=dashed];
N4 -> N13[label="",style=dashed]; N4 -> N17[label="",style=dashed];
N4 -> N14[label="",style=dashed]; N4 -> N24[label="",style=dashed];
N4 -> N18[label="",style=dashed]; N4 -> N39[label="",style=dashed];
N4 -> N20[label="",style=dashed]; N4 -> N42[label="",style=dashed];
N4 -> N22[label="",style=dashed]; N4 -> N51[label="",style=dashed];
N5 -> N14[label="",style=dashed]; N5 -> N16[label="",style=dashed];
N5 -> N20[label="",style=dashed]; N5 -> N17[label="",style=dashed];
N5 -> N22[label="",style=dashed]; N5 -> N21[label="",style=dashed];
N6 -> N4[label="",style=dashed]; N5 -> N23[label="",style=dashed];
N6 -> N13[label="",style=dashed]; N5 -> N25[label="",style=dashed];
N6 -> N14[label="",style=dashed]; N6 -> N17[label="",style=dashed];
N6 -> N33[label="",style=dashed]; N6 -> N23[label="",style=dashed];
N6 -> N36[label="",style=dashed]; N6 -> N25[label="",style=dashed];
N6 -> N39[label="",style=dashed]; N7 -> N5[label="",style=dashed];
N6 -> N41[label="",style=dashed]; N7 -> N16[label="",style=dashed];
N6 -> N60[label="",style=dashed]; N7 -> N17[label="",style=dashed];
N6 -> N61[label="",style=dashed]; N7 -> N36[label="",style=dashed];
N7 -> N3[label="",style=dashed]; N7 -> N39[label="",style=dashed];
N7 -> N13[label="",style=dashed]; N7 -> N40[label="",style=dashed];
N7 -> N14[label="",style=dashed]; N7 -> N44[label="",style=dashed];
N7 -> N20[label="",style=dashed]; N7 -> N45[label="",style=dashed];
N7 -> N22[label="",style=dashed]; N7 -> N64[label="",style=dashed];
N9 -> N10[label=""]; N7 -> N65[label="",style=dashed];
N9 -> N11[label=""]; N8 -> N4[label="",style=dashed];
N10 -> N11[label=""]; N8 -> N16[label="",style=dashed];
N13 -> N14[label="",style=dashed]; N8 -> N17[label="",style=dashed];
N13 -> N15[label="",style=dashed]; N8 -> N23[label="",style=dashed];
N14 -> N21[label="",style=dashed]; N8 -> N25[label="",style=dashed];
N15 -> N21[label="",style=dashed]; N9 -> N10[label="",style=dashed];
N15 -> N23[label="",style=dashed]; N10 -> N26[label="",style=dashed];
N15 -> N36[label="",style=dashed]; N12 -> N13[label=""];
N15 -> N38[label="",style=dashed]; N12 -> N14[label=""];
N15 -> N60[label="",style=dashed]; N13 -> N14[label=""];
N20 -> N13[label="",style=dashed]; N16 -> N17[label="",style=dashed];
N20 -> N14[label="",style=dashed]; N16 -> N18[label="",style=dashed];
N20 -> N21[label="",style=dashed]; N17 -> N24[label="",style=dashed];
N20 -> N22[label="",style=dashed]; N18 -> N24[label="",style=dashed];
N22 -> N14[label="",style=dashed]; N18 -> N26[label="",style=dashed];
N22 -> N18[label="",style=dashed]; N18 -> N39[label="",style=dashed];
N25 -> N23[label="",style=dashed]; N18 -> N43[label="",style=dashed];
N25 -> N44[label="",style=dashed]; N18 -> N64[label="",style=dashed];
N25 -> N45[label="",style=dashed]; N23 -> N16[label="",style=dashed];
N32 -> N36[label="",style=dashed]; N23 -> N17[label="",style=dashed];
N33 -> N59[label="",style=dashed]; N23 -> N24[label="",style=dashed];
N33 -> N62[label="",style=dashed]; N23 -> N25[label="",style=dashed];
N34 -> N23[label="",style=dashed]; N25 -> N17[label="",style=dashed];
N34 -> N64[label="",style=dashed]; N25 -> N21[label="",style=dashed];
N35 -> N36[label="",style=dashed]; N28 -> N26[label="",style=dashed];
N36 -> N23[label="",style=dashed]; N28 -> N48[label="",style=dashed];
N37 -> N9[label="",style=dashed]; N28 -> N49[label="",style=dashed];
N37 -> N21[label="",style=dashed]; N35 -> N39[label="",style=dashed];
N37 -> N23[label="",style=dashed]; N36 -> N63[label="",style=dashed];
N37 -> N36[label="",style=dashed]; N36 -> N66[label="",style=dashed];
N37 -> N38[label="",style=dashed]; N37 -> N26[label="",style=dashed];
N38 -> N43[label="",style=dashed]; N37 -> N71[label="",style=dashed];
N39 -> N40[label="",style=dashed]; N38 -> N39[label="",style=dashed];
N41 -> N30[label="",style=dashed]; N39 -> N26[label="",style=dashed];
N41 -> N39[label="",style=dashed]; N41 -> N9[label="",style=dashed];
N41 -> N42[label="",style=dashed]; N41 -> N10[label="",style=dashed];
N42 -> N23[label="",style=dashed]; N41 -> N68[label="",style=dashed];
N43 -> N35[label="",style=dashed]; N41 -> N69[label="",style=dashed];
N43 -> N36[label="",style=dashed]; N41 -> N70[label="",style=dashed];
N43 -> N58[label="",style=dashed]; N42 -> N12[label="",style=dashed];
N44 -> N45[label="",style=dashed]; N42 -> N24[label="",style=dashed];
N44 -> N46[label="",style=dashed]; N42 -> N26[label="",style=dashed];
N49 -> N23[label="",style=dashed]; N42 -> N39[label="",style=dashed];
N49 -> N40[label="",style=dashed]; N42 -> N43[label="",style=dashed];
N49 -> N44[label="",style=dashed]; N43 -> N47[label="",style=dashed];
N49 -> N45[label="",style=dashed]; N44 -> N40[label="",style=dashed];
N49 -> N48[label=""]; N45 -> N33[label="",style=dashed];
N49 -> N50[label=""]; N45 -> N44[label="",style=dashed];
N49 -> N51[label=""]; N45 -> N46[label="",style=dashed];
N49 -> N52[label=""]; N46 -> N26[label="",style=dashed];
N49 -> N53[label="",style=dashed]; N47 -> N38[label="",style=dashed];
N50 -> N44[label=""]; N47 -> N39[label="",style=dashed];
N50 -> N45[label=""]; N47 -> N62[label="",style=dashed];
N50 -> N51[label=""]; N48 -> N49[label="",style=dashed];
N50 -> N54[label=""]; N48 -> N50[label="",style=dashed];
N51 -> N19[label=""]; N53 -> N26[label="",style=dashed];
N51 -> N23[label=""]; N53 -> N40[label="",style=dashed];
N51 -> N44[label=""]; N53 -> N48[label="",style=dashed];
N51 -> N45[label=""]; N53 -> N49[label="",style=dashed];
N51 -> N54[label=""]; N53 -> N52[label=""];
N52 -> N16[label=""]; N53 -> N54[label=""];
N52 -> N19[label=""]; N53 -> N55[label=""];
N52 -> N23[label=""]; N53 -> N56[label=""];
N52 -> N55[label=""]; N53 -> N57[label="",style=dashed];
N52 -> N56[label=""]; N54 -> N48[label=""];
N52 -> N57[label=""]; N54 -> N49[label=""];
N54 -> N45[label=""]; N54 -> N55[label=""];
N54 -> N46[label=""]; N54 -> N58[label=""];
N55 -> N56[label=""]; N55 -> N22[label=""];
N58 -> N23[label="",style=dashed]; N55 -> N26[label=""];
N60 -> N23[label="",style=dashed]; N55 -> N48[label=""];
N60 -> N44[label="",style=dashed]; N55 -> N49[label=""];
N60 -> N45[label="",style=dashed]; N55 -> N58[label=""];
N61 -> N39[label="",style=dashed]; N56 -> N19[label=""];
N61 -> N40[label="",style=dashed]; N56 -> N22[label=""];
N61 -> N49[label="",style=dashed]; N56 -> N26[label=""];
N61 -> N53[label="",style=dashed]; N56 -> N59[label=""];
N61 -> N65[label="",style=dashed]; N56 -> N60[label=""];
N62 -> N23[label="",style=dashed]; N56 -> N61[label=""];
N62 -> N36[label="",style=dashed]; N58 -> N49[label=""];
N62 -> N44[label="",style=dashed]; N58 -> N50[label=""];
N62 -> N45[label="",style=dashed]; N59 -> N60[label=""];
N62 -> N63[label="",style=dashed]; N62 -> N26[label="",style=dashed];
N64 -> N23[label="",style=dashed]; N64 -> N26[label="",style=dashed];
N64 -> N38[label="",style=dashed]; N64 -> N48[label="",style=dashed];
N64 -> N49[label="",style=dashed];
N65 -> N40[label="",style=dashed];
N65 -> N44[label="",style=dashed];
N65 -> N53[label="",style=dashed];
N65 -> N57[label="",style=dashed];
N65 -> N72[label="",style=dashed];
N66 -> N26[label="",style=dashed];
N66 -> N39[label="",style=dashed];
N66 -> N48[label="",style=dashed];
N66 -> N49[label="",style=dashed];
N66 -> N67[label="",style=dashed];
N69 -> N73[label="",style=dashed];
N71 -> N26[label="",style=dashed];
N71 -> N43[label="",style=dashed];
N73 -> N26[label="",style=dashed];
N73 -> N48[label="",style=dashed];
} }

Binary file not shown.

Before

Width:  |  Height:  |  Size: 438 KiB

After

Width:  |  Height:  |  Size: 478 KiB