UAHF: set Services bit#5 + prefer nodes with bit#5 set

This commit is contained in:
Svyatoslav Nikolsky 2017-08-08 12:17:36 +03:00
parent 7eccf8b888
commit 127d662448
6 changed files with 101 additions and 37 deletions

View File

@ -59,6 +59,15 @@ impl Services {
self
}
pub fn bitcoin_cash(&self) -> bool {
self.bit_at(5)
}
pub fn with_bitcoin_cash(mut self, v: bool) -> Self {
self.set_bit(5, v);
self
}
pub fn includes(&self, other: &Self) -> bool {
self.0 & other.0 == other.0
}

View File

@ -1,4 +1,5 @@
use std::{net, path};
use message::common::Services;
use net::Config as NetConfig;
use util::InternetProtocol;
@ -16,8 +17,10 @@ pub struct Config {
pub peers: Vec<net::SocketAddr>,
/// Connect to these nodes to retrieve peer addresses, and disconnect.
pub seeds: Vec<String>,
/// p2p/nodes.csv file path
/// p2p/nodes.csv file path.
pub node_table_path: path::PathBuf,
/// Peers with this services will get a boost in node_table.
pub preferable_services: Services,
/// Internet protocol.
pub internet_protocol: InternetProtocol,
}

View File

@ -12,6 +12,7 @@ use abstract_ns::Resolver;
use ns_dns_tokio::DnsResolver;
use message::{Payload, MessageResult, Message};
use message::common::Services;
use message::types::addr::AddressEntry;
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter};
use util::{NodeTable, Node, NodeTableError, Direction};
use session::{SessionFactory, SeednodeSessionFactory, NormalSessionFactory};
@ -45,7 +46,7 @@ impl Context {
let context = Context {
connections: Default::default(),
connection_counter: ConnectionCounter::new(config.inbound_connections, config.outbound_connections),
node_table: RwLock::new(try!(NodeTable::from_file(&config.node_table_path))),
node_table: RwLock::new(try!(NodeTable::from_file(config.preferable_services, &config.node_table_path))),
pool: pool_handle,
remote: remote,
local_sync_node: local_sync_node,
@ -84,7 +85,7 @@ impl Context {
}
/// Updates node table.
pub fn update_node_table(&self, nodes: Vec<Node>) {
pub fn update_node_table(&self, nodes: Vec<AddressEntry>) {
trace!("Updating node table with {} entries", nodes.len());
self.node_table.write().insert_many(nodes);
}
@ -120,6 +121,7 @@ impl Context {
let needed = context.connection_counter.outbound_connections_needed() as usize;
if needed != 0 {
// TODO: pass Services::with_bitcoin_cash(true) after HF block
let used_addresses = context.connections.addresses();
let peers = context.node_table.read().nodes_with_services(&Services::default(), context.config.internet_protocol, &used_addresses, needed);
let addresses = peers.into_iter()

View File

@ -45,9 +45,8 @@ impl Protocol for AddrProtocol {
unreachable!("This version of protocol is not supported!");
},
Addr::V31402(addr) => {
let nodes: Vec<_> = addr.addresses.into_iter().map(Into::into).collect();
let nodes_len = nodes.len();
self.context.global().update_node_table(nodes);
let nodes_len = addr.addresses.len();
self.context.global().update_node_table(addr.addresses);
// seednodes are currently responding with two addr messages:
// 1) addr message with single address - seednode itself
// 2) addr message with 1000 addresses (seednode node_table contents)

View File

@ -17,6 +17,8 @@ pub struct Node {
time: i64,
/// Services supported by the node.
services: Services,
/// Is preferable node?
is_preferable: bool,
/// Node failures counter.
failures: u32,
}
@ -27,17 +29,6 @@ impl Node {
}
}
impl From<AddressEntry> for Node {
fn from(entry: AddressEntry) -> Self {
Node {
addr: SocketAddr::new(entry.address.address.into(), entry.address.port.into()),
time: entry.timestamp as i64,
services: entry.address.services,
failures: 0,
}
}
}
impl From<Node> for AddressEntry {
fn from(node: Node) -> Self {
AddressEntry {
@ -63,11 +54,17 @@ impl From<Node> for NodeByScore {
impl PartialOrd for NodeByScore {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
if self.0.failures == other.0.failures {
if other.0.time == self.0.time {
other.0.partial_cmp(&self.0)
}
else {
other.0.time.partial_cmp(&self.0.time)
if self.0.is_preferable == other.0.is_preferable {
if other.0.time == self.0.time {
other.0.partial_cmp(&self.0)
}
else {
other.0.time.partial_cmp(&self.0.time)
}
} else if self.0.is_preferable {
return Some(Ordering::Less)
} else {
Some(Ordering::Greater)
}
} else {
self.0.failures.partial_cmp(&other.0.failures)
@ -78,11 +75,17 @@ impl PartialOrd for NodeByScore {
impl Ord for NodeByScore {
fn cmp(&self, other: &Self) -> Ordering {
if self.0.failures == other.0.failures {
if other.0.time == self.0.time {
other.0.cmp(&self.0)
}
else {
other.0.time.cmp(&self.0.time)
if self.0.is_preferable == other.0.is_preferable {
if other.0.time == self.0.time {
other.0.cmp(&self.0)
}
else {
other.0.time.cmp(&self.0.time)
}
} else if self.0.is_preferable {
return Ordering::Less
} else {
Ordering::Greater
}
} else {
self.0.failures.cmp(&other.0.failures)
@ -168,6 +171,8 @@ pub enum NodeTableError { AddressAlreadyAdded, NoAddressInTable }
pub struct NodeTable<T = RealTime> where T: Time {
/// Time source.
time: T,
/// Preferable services.
preferable_services: Services,
/// Nodes by socket address.
by_addr: HashMap<SocketAddr, Node>,
/// Nodes sorted by score.
@ -177,15 +182,24 @@ pub struct NodeTable<T = RealTime> where T: Time {
}
impl NodeTable {
#[cfg(test)]
/// Creates empty node table with preferable services.
pub fn new(preferable_services: Services) -> Self {
NodeTable {
preferable_services: preferable_services,
..Default::default()
}
}
/// Opens a file loads node_table from it.
pub fn from_file<P>(path: P) -> Result<Self, io::Error> where P: AsRef<path::Path> {
pub fn from_file<P>(preferable_services: Services, path: P) -> Result<Self, io::Error> where P: AsRef<path::Path> {
fs::OpenOptions::new()
.create(true)
.read(true)
// without opening for write, mac os returns os error 22
.write(true)
.open(path)
.and_then(Self::load)
.and_then(|f| Self::load(preferable_services, f))
}
/// Saves node table to file
@ -213,6 +227,7 @@ impl<T> NodeTable<T> where T: Time {
addr: addr,
time: now,
services: services,
is_preferable: services.includes(&self.preferable_services),
failures: 0,
};
self.by_score.insert(node.clone().into());
@ -254,14 +269,22 @@ impl<T> NodeTable<T> where T: Time {
/// Inserts many new addresses into node table.
/// Used in `addr` request handler.
/// Discards all nodes with timestamp newer than current time.
pub fn insert_many(&mut self, nodes: Vec<Node>) {
pub fn insert_many(&mut self, addresses: Vec<AddressEntry>) {
// discard all nodes with timestamp newer than current time.
let now = self.time.get().sec;
let iter = nodes.into_iter()
.filter(|node| node.time <= now);
let iter = addresses.into_iter()
.filter(|addr| addr.timestamp as i64 <= now);
// iterate over the rest
for node in iter {
for addr in iter {
let node = Node {
addr: SocketAddr::new(addr.address.address.into(), addr.address.port.into()),
time: addr.timestamp as i64,
services: addr.address.services,
is_preferable: addr.address.services.includes(&self.preferable_services),
failures: 0,
};
match self.by_addr.entry(node.addr) {
Entry::Occupied(mut entry) => {
let old = entry.get_mut();
@ -369,22 +392,25 @@ impl<T> NodeTable<T> where T: Time {
}
/// Loads table in from a csv source.
pub fn load<R>(read: R) -> Result<Self, io::Error> where R: io::Read, T: Default {
pub fn load<R>(preferable_services: Services, read: R) -> Result<Self, io::Error> where R: io::Read, T: Default {
let mut rdr = csv::Reader::from_reader(read)
.has_headers(false)
.delimiter(b' ');
let mut node_table = NodeTable::default();
node_table.preferable_services = preferable_services;
let err = || io::Error::new(io::ErrorKind::Other, "Load csv error");
for row in rdr.decode() {
let (addr, time, services, failures): (String, i64, u64, u32) = try!(row.map_err(|_| err()));
let services = services.into();
let node = Node {
addr: try!(addr.parse().map_err(|_| err())),
time: time,
services: services.into(),
services: services,
is_preferable: services.includes(&preferable_services),
failures: failures,
};
@ -559,7 +585,7 @@ mod tests {
let mut db = Vec::new();
assert_eq!(table.save(&mut db).unwrap(), ());
let loaded_table = NodeTable::<IncrementalTime>::load(&db as &[u8]).unwrap();
let loaded_table = NodeTable::<IncrementalTime>::load(Services::default(), &db as &[u8]).unwrap();
assert_eq!(table.by_addr, loaded_table.by_addr);
assert_eq!(table.by_score, loaded_table.by_score);
assert_eq!(table.by_time, loaded_table.by_time);
@ -573,4 +599,21 @@ mod tests {
127.0.0.1:8003 3 0 1
".to_string(), s);
}
#[test]
fn test_preferable_services() {
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();
let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
let mut table = NodeTable::new(Services::default().with_network(true).with_bitcoin_cash(true));
table.insert(s0, Services::default().with_network(true));
table.insert(s1, Services::default().with_network(true).with_bitcoin_cash(true));
assert_eq!(table.nodes_with_services(&Services::default(), InternetProtocol::default(), &HashSet::new(), 1)[0].address(), s1);
table.note_failure(&s1);
assert_eq!(table.nodes_with_services(&Services::default(), InternetProtocol::default(), &HashSet::new(), 1)[0].address(), s0);
table.note_failure(&s0);
assert_eq!(table.nodes_with_services(&Services::default(), InternetProtocol::default(), &HashSet::new(), 1)[0].address(), s1);
}
}

View File

@ -5,6 +5,7 @@ use std::sync::mpsc::{channel, Sender, Receiver};
use std::sync::atomic::{AtomicBool, Ordering};
use sync::{create_sync_peers, create_local_sync_node, create_sync_connection_factory, SyncListener};
use message::Services;
use network::ConsensusFork;
use primitives::hash::H256;
use util::{open_db, init_db, node_table_path};
use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM};
@ -90,6 +91,12 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
let nodes_path = node_table_path(&cfg);
let services = Services::default().with_network(true);
let services = match cfg.consensus.fork {
ConsensusFork::BitcoinCash(_) => services.with_bitcoin_cash(true),
ConsensusFork::NoFork | ConsensusFork::SegWit2x(_) => services,
};
let p2p_cfg = p2p::Config {
threads: cfg.p2p_threads,
inbound_connections: cfg.inbound_connections,
@ -99,7 +106,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
protocol_minimum: PROTOCOL_MINIMUM,
magic: cfg.magic,
local_address: SocketAddr::new("127.0.0.1".parse().unwrap(), cfg.port),
services: Services::default().with_network(true),
services: services,
user_agent: cfg.user_agent,
start_height: 0,
relay: true,
@ -107,6 +114,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
peers: cfg.connect.map_or_else(|| vec![], |x| vec![x]),
seeds: cfg.seednodes,
node_table_path: nodes_path,
preferable_services: services,
internet_protocol: cfg.internet_protocol,
};