diff --git a/message/src/common/service.rs b/message/src/common/service.rs index 55a1a2e6..58b10e86 100644 --- a/message/src/common/service.rs +++ b/message/src/common/service.rs @@ -59,6 +59,15 @@ impl Services { self } + pub fn bitcoin_cash(&self) -> bool { + self.bit_at(5) + } + + pub fn with_bitcoin_cash(mut self, v: bool) -> Self { + self.set_bit(5, v); + self + } + pub fn includes(&self, other: &Self) -> bool { self.0 & other.0 == other.0 } diff --git a/p2p/src/config.rs b/p2p/src/config.rs index b4a271b5..117d3f6d 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -1,4 +1,5 @@ use std::{net, path}; +use message::common::Services; use net::Config as NetConfig; use util::InternetProtocol; @@ -16,8 +17,10 @@ pub struct Config { pub peers: Vec, /// Connect to these nodes to retrieve peer addresses, and disconnect. pub seeds: Vec, - /// p2p/nodes.csv file path + /// p2p/nodes.csv file path. pub node_table_path: path::PathBuf, + /// Peers with this services will get a boost in node_table. + pub preferable_services: Services, /// Internet protocol. pub internet_protocol: InternetProtocol, } diff --git a/p2p/src/p2p.rs b/p2p/src/p2p.rs index 24f9cdf2..589d97a5 100644 --- a/p2p/src/p2p.rs +++ b/p2p/src/p2p.rs @@ -12,6 +12,7 @@ use abstract_ns::Resolver; use ns_dns_tokio::DnsResolver; use message::{Payload, MessageResult, Message}; use message::common::Services; +use message::types::addr::AddressEntry; use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter}; use util::{NodeTable, Node, NodeTableError, Direction}; use session::{SessionFactory, SeednodeSessionFactory, NormalSessionFactory}; @@ -45,7 +46,7 @@ impl Context { let context = Context { connections: Default::default(), connection_counter: ConnectionCounter::new(config.inbound_connections, config.outbound_connections), - node_table: RwLock::new(try!(NodeTable::from_file(&config.node_table_path))), + node_table: RwLock::new(try!(NodeTable::from_file(config.preferable_services, &config.node_table_path))), pool: pool_handle, remote: remote, local_sync_node: local_sync_node, @@ -84,7 +85,7 @@ impl Context { } /// Updates node table. - pub fn update_node_table(&self, nodes: Vec) { + pub fn update_node_table(&self, nodes: Vec) { trace!("Updating node table with {} entries", nodes.len()); self.node_table.write().insert_many(nodes); } @@ -120,6 +121,7 @@ impl Context { let needed = context.connection_counter.outbound_connections_needed() as usize; if needed != 0 { + // TODO: pass Services::with_bitcoin_cash(true) after HF block let used_addresses = context.connections.addresses(); let peers = context.node_table.read().nodes_with_services(&Services::default(), context.config.internet_protocol, &used_addresses, needed); let addresses = peers.into_iter() diff --git a/p2p/src/protocol/addr.rs b/p2p/src/protocol/addr.rs index 48384e87..9ffc8d08 100644 --- a/p2p/src/protocol/addr.rs +++ b/p2p/src/protocol/addr.rs @@ -45,9 +45,8 @@ impl Protocol for AddrProtocol { unreachable!("This version of protocol is not supported!"); }, Addr::V31402(addr) => { - let nodes: Vec<_> = addr.addresses.into_iter().map(Into::into).collect(); - let nodes_len = nodes.len(); - self.context.global().update_node_table(nodes); + let nodes_len = addr.addresses.len(); + self.context.global().update_node_table(addr.addresses); // seednodes are currently responding with two addr messages: // 1) addr message with single address - seednode itself // 2) addr message with 1000 addresses (seednode node_table contents) diff --git a/p2p/src/util/node_table.rs b/p2p/src/util/node_table.rs index cf34ebe1..8f3cea20 100644 --- a/p2p/src/util/node_table.rs +++ b/p2p/src/util/node_table.rs @@ -17,6 +17,8 @@ pub struct Node { time: i64, /// Services supported by the node. services: Services, + /// Is preferable node? + is_preferable: bool, /// Node failures counter. failures: u32, } @@ -27,17 +29,6 @@ impl Node { } } -impl From for Node { - fn from(entry: AddressEntry) -> Self { - Node { - addr: SocketAddr::new(entry.address.address.into(), entry.address.port.into()), - time: entry.timestamp as i64, - services: entry.address.services, - failures: 0, - } - } -} - impl From for AddressEntry { fn from(node: Node) -> Self { AddressEntry { @@ -63,11 +54,17 @@ impl From for NodeByScore { impl PartialOrd for NodeByScore { fn partial_cmp(&self, other: &Self) -> Option { if self.0.failures == other.0.failures { - if other.0.time == self.0.time { - other.0.partial_cmp(&self.0) - } - else { - other.0.time.partial_cmp(&self.0.time) + if self.0.is_preferable == other.0.is_preferable { + if other.0.time == self.0.time { + other.0.partial_cmp(&self.0) + } + else { + other.0.time.partial_cmp(&self.0.time) + } + } else if self.0.is_preferable { + return Some(Ordering::Less) + } else { + Some(Ordering::Greater) } } else { self.0.failures.partial_cmp(&other.0.failures) @@ -78,11 +75,17 @@ impl PartialOrd for NodeByScore { impl Ord for NodeByScore { fn cmp(&self, other: &Self) -> Ordering { if self.0.failures == other.0.failures { - if other.0.time == self.0.time { - other.0.cmp(&self.0) - } - else { - other.0.time.cmp(&self.0.time) + if self.0.is_preferable == other.0.is_preferable { + if other.0.time == self.0.time { + other.0.cmp(&self.0) + } + else { + other.0.time.cmp(&self.0.time) + } + } else if self.0.is_preferable { + return Ordering::Less + } else { + Ordering::Greater } } else { self.0.failures.cmp(&other.0.failures) @@ -168,6 +171,8 @@ pub enum NodeTableError { AddressAlreadyAdded, NoAddressInTable } pub struct NodeTable where T: Time { /// Time source. time: T, + /// Preferable services. + preferable_services: Services, /// Nodes by socket address. by_addr: HashMap, /// Nodes sorted by score. @@ -177,15 +182,24 @@ pub struct NodeTable where T: Time { } impl NodeTable { + #[cfg(test)] + /// Creates empty node table with preferable services. + pub fn new(preferable_services: Services) -> Self { + NodeTable { + preferable_services: preferable_services, + ..Default::default() + } + } + /// Opens a file loads node_table from it. - pub fn from_file

(path: P) -> Result where P: AsRef { + pub fn from_file

(preferable_services: Services, path: P) -> Result where P: AsRef { fs::OpenOptions::new() .create(true) .read(true) // without opening for write, mac os returns os error 22 .write(true) .open(path) - .and_then(Self::load) + .and_then(|f| Self::load(preferable_services, f)) } /// Saves node table to file @@ -213,6 +227,7 @@ impl NodeTable where T: Time { addr: addr, time: now, services: services, + is_preferable: services.includes(&self.preferable_services), failures: 0, }; self.by_score.insert(node.clone().into()); @@ -254,14 +269,22 @@ impl NodeTable where T: Time { /// Inserts many new addresses into node table. /// Used in `addr` request handler. /// Discards all nodes with timestamp newer than current time. - pub fn insert_many(&mut self, nodes: Vec) { + pub fn insert_many(&mut self, addresses: Vec) { // discard all nodes with timestamp newer than current time. let now = self.time.get().sec; - let iter = nodes.into_iter() - .filter(|node| node.time <= now); + let iter = addresses.into_iter() + .filter(|addr| addr.timestamp as i64 <= now); // iterate over the rest - for node in iter { + for addr in iter { + let node = Node { + addr: SocketAddr::new(addr.address.address.into(), addr.address.port.into()), + time: addr.timestamp as i64, + services: addr.address.services, + is_preferable: addr.address.services.includes(&self.preferable_services), + failures: 0, + }; + match self.by_addr.entry(node.addr) { Entry::Occupied(mut entry) => { let old = entry.get_mut(); @@ -369,22 +392,25 @@ impl NodeTable where T: Time { } /// Loads table in from a csv source. - pub fn load(read: R) -> Result where R: io::Read, T: Default { + pub fn load(preferable_services: Services, read: R) -> Result where R: io::Read, T: Default { let mut rdr = csv::Reader::from_reader(read) .has_headers(false) .delimiter(b' '); let mut node_table = NodeTable::default(); + node_table.preferable_services = preferable_services; let err = || io::Error::new(io::ErrorKind::Other, "Load csv error"); for row in rdr.decode() { let (addr, time, services, failures): (String, i64, u64, u32) = try!(row.map_err(|_| err())); + let services = services.into(); let node = Node { addr: try!(addr.parse().map_err(|_| err())), time: time, - services: services.into(), + services: services, + is_preferable: services.includes(&preferable_services), failures: failures, }; @@ -559,7 +585,7 @@ mod tests { let mut db = Vec::new(); assert_eq!(table.save(&mut db).unwrap(), ()); - let loaded_table = NodeTable::::load(&db as &[u8]).unwrap(); + let loaded_table = NodeTable::::load(Services::default(), &db as &[u8]).unwrap(); assert_eq!(table.by_addr, loaded_table.by_addr); assert_eq!(table.by_score, loaded_table.by_score); assert_eq!(table.by_time, loaded_table.by_time); @@ -573,4 +599,21 @@ mod tests { 127.0.0.1:8003 3 0 1 ".to_string(), s); } + + #[test] + fn test_preferable_services() { + let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap(); + let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap(); + + let mut table = NodeTable::new(Services::default().with_network(true).with_bitcoin_cash(true)); + table.insert(s0, Services::default().with_network(true)); + table.insert(s1, Services::default().with_network(true).with_bitcoin_cash(true)); + assert_eq!(table.nodes_with_services(&Services::default(), InternetProtocol::default(), &HashSet::new(), 1)[0].address(), s1); + + table.note_failure(&s1); + assert_eq!(table.nodes_with_services(&Services::default(), InternetProtocol::default(), &HashSet::new(), 1)[0].address(), s0); + + table.note_failure(&s0); + assert_eq!(table.nodes_with_services(&Services::default(), InternetProtocol::default(), &HashSet::new(), 1)[0].address(), s1); + } } diff --git a/pbtc/commands/start.rs b/pbtc/commands/start.rs index 261bf02d..38e97953 100644 --- a/pbtc/commands/start.rs +++ b/pbtc/commands/start.rs @@ -5,6 +5,7 @@ use std::sync::mpsc::{channel, Sender, Receiver}; use std::sync::atomic::{AtomicBool, Ordering}; use sync::{create_sync_peers, create_local_sync_node, create_sync_connection_factory, SyncListener}; use message::Services; +use network::ConsensusFork; use primitives::hash::H256; use util::{open_db, init_db, node_table_path}; use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM}; @@ -90,6 +91,12 @@ pub fn start(cfg: config::Config) -> Result<(), String> { let nodes_path = node_table_path(&cfg); + let services = Services::default().with_network(true); + let services = match cfg.consensus.fork { + ConsensusFork::BitcoinCash(_) => services.with_bitcoin_cash(true), + ConsensusFork::NoFork | ConsensusFork::SegWit2x(_) => services, + }; + let p2p_cfg = p2p::Config { threads: cfg.p2p_threads, inbound_connections: cfg.inbound_connections, @@ -99,7 +106,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> { protocol_minimum: PROTOCOL_MINIMUM, magic: cfg.magic, local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port), - services: Services::default().with_network(true), + services: services, user_agent: cfg.user_agent, start_height: 0, relay: true, @@ -107,6 +114,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> { peers: cfg.connect.map_or_else(|| vec![], |x| vec![x]), seeds: cfg.seednodes, node_table_path: nodes_path, + preferable_services: services, internet_protocol: cfg.internet_protocol, };