save node_table to $APP_DIR/p2p/nodes.csv

This commit is contained in:
debris 2016-11-04 12:08:58 +01:00
parent 13420b22e3
commit fc32423017
9 changed files with 157 additions and 17 deletions

11
Cargo.lock generated
View File

@ -131,6 +131,15 @@ name = "crossbeam"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "csv"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "db"
version = "0.1.0"
@ -426,6 +435,7 @@ version = "0.1.0"
dependencies = [
"abstract-ns 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bitcrypto 0.1.0",
"csv 0.14.7 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -765,6 +775,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
"checksum clap 2.16.2 (registry+https://github.com/rust-lang/crates.io-index)" = "08aac7b078ec0a58e1d4b43cfb11d47001f8eb7c6f6f2bda4f5eed43c82491f1"
"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
"checksum csv 0.14.7 (registry+https://github.com/rust-lang/crates.io-index)" = "266c1815d7ca63a5bd86284043faf91e8c95e943e55ce05dc0ae08e952de18bc"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
"checksum domain 0.1.0 (git+https://github.com/debris/domain)" = "<none>"
"checksum elastic-array 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4bc9250a632e7c001b741eb0ec6cee93c9a5b6d5f1879696a4b94d62b012210a"

View File

@ -13,6 +13,7 @@ rand = "0.3"
log = "0.3"
abstract-ns = "0.2.1"
ns-dns-tokio = { git = "https://github.com/debris/abstract-ns", path = "ns-dns-tokio" }
csv = "0.14.7"
primitives = { path = "../primitives"}
bitcrypto = { path = "../crypto" }

View File

@ -1,4 +1,5 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use net::Config as NetConfig;
#[derive(Debug)]
@ -19,4 +20,6 @@ pub struct Config {
pub peers: Vec<SocketAddr>,
/// Connect to these nodes to retrieve peer addresses, and disconnect.
pub seeds: Vec<String>,
/// p2p module cache directory.
pub node_table_path: PathBuf,
}

View File

@ -9,6 +9,7 @@ extern crate parking_lot;
extern crate log;
extern crate abstract_ns;
extern crate ns_dns_tokio;
extern crate csv;
extern crate bitcrypto as crypto;
extern crate message;

View File

@ -1,4 +1,4 @@
use std::{io, net, error, time};
use std::{io, net, error, time, path};
use std::sync::Arc;
use parking_lot::RwLock;
use futures::{Future, finished, failed, BoxFuture};
@ -34,19 +34,24 @@ pub struct Context {
remote: Remote,
/// Local synchronization node.
local_sync_node: LocalSyncNodeRef,
/// Node table path.
node_table_path: path::PathBuf,
}
impl Context {
/// Creates new context with reference to local sync node, thread pool and event loop.
pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote, config: &Config) -> Self {
Context {
pub fn new(local_sync_node: LocalSyncNodeRef, pool_handle: CpuPool, remote: Remote, config: &Config) -> Result<Self, Box<error::Error>> {
let context = Context {
connections: Default::default(),
connection_counter: ConnectionCounter::new(config.inbound_connections, config.outbound_connections),
node_table: Default::default(),
node_table: RwLock::new(try!(NodeTable::from_file(&config.node_table_path))),
pool: pool_handle,
remote: remote,
local_sync_node: local_sync_node,
}
node_table_path: config.node_table_path.clone(),
};
Ok(context)
}
/// Spawns a future using thread pool and schedules execution of it with event loop handle.
@ -111,6 +116,10 @@ impl Context {
Context::connect::<NormalSessionFactory>(context.clone(), address, config.clone());
}
if let Err(_err) = context.node_table.read().save_to_file(&context.node_table_path) {
error!("Saving node table to disk failed");
}
Ok(())
})
.for_each(|_| Ok(()))
@ -369,15 +378,19 @@ impl Drop for P2P {
}
impl P2P {
pub fn new(config: Config, local_sync_node: LocalSyncNodeRef, handle: Handle) -> Self {
pub fn new(config: Config, local_sync_node: LocalSyncNodeRef, handle: Handle) -> Result<Self, Box<error::Error>> {
let pool = CpuPool::new(config.threads);
P2P {
let context = try!(Context::new(local_sync_node, pool.clone(), handle.remote().clone(), &config));
let p2p = P2P {
event_loop_handle: handle.clone(),
pool: pool.clone(),
context: Arc::new(Context::new(local_sync_node, pool, handle.remote().clone(), &config)),
pool: pool,
context: Arc::new(context),
config: config,
}
};
Ok(p2p)
}
pub fn run(&self) -> Result<(), Box<error::Error>> {

View File

@ -1,12 +1,14 @@
use std::{io, path, fs};
use std::collections::{HashMap, BTreeSet};
use std::collections::hash_map::Entry;
use std::net::SocketAddr;
use std::cmp::{PartialOrd, Ord, Ordering};
use csv;
use message::common::{Services, NetAddress};
use message::types::addr::AddressEntry;
use util::time::{Time, RealTime};
#[derive(PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Node {
/// Node address.
addr: SocketAddr,
@ -48,7 +50,7 @@ impl From<Node> for AddressEntry {
}
}
#[derive(PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone)]
struct NodeByScore(Node);
impl From<Node> for NodeByScore {
@ -87,7 +89,7 @@ impl Ord for NodeByScore {
}
}
#[derive(PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone)]
struct NodeByTime(Node);
impl From<Node> for NodeByTime {
@ -158,7 +160,7 @@ impl PartialOrd for Node {
}
}
#[derive(Default)]
#[derive(Default, Debug)]
pub struct NodeTable<T = RealTime> where T: Time {
/// Time source.
time: T,
@ -170,6 +172,25 @@ pub struct NodeTable<T = RealTime> where T: Time {
by_time: BTreeSet<NodeByTime>,
}
impl NodeTable {
/// Opens a file loads node_table from it.
pub fn from_file<P>(path: P) -> Result<Self, io::Error> where P: AsRef<path::Path> {
let res = fs::OpenOptions::new()
.create(true)
.read(true)
// without opening for write, mac os returns os error 22
.write(true)
.open(path)
.and_then(Self::load);
res
}
/// Saves node table to file
pub fn save_to_file<P>(&self, path: P) -> Result<(), io::Error> where P: AsRef<path::Path> {
fs::File::create(path).and_then(|file| self.save(file))
}
}
impl<T> NodeTable<T> where T: Time {
/// Inserts new address and services pair into NodeTable.
pub fn insert(&mut self, addr: SocketAddr, services: Services) {
@ -278,6 +299,52 @@ impl<T> NodeTable<T> where T: Time {
self.by_time.insert(node.clone().into());
}
}
/// Save node table in csv format.
pub fn save<W>(&self, write: W) -> Result<(), io::Error> where W: io::Write {
let mut writer = csv::Writer::from_writer(write)
.delimiter(b' ');
let iter = self.by_score.iter()
.map(|node| &node.0)
.take(1000);
let err = || io::Error::new(io::ErrorKind::Other, "Write csv error");
for n in iter {
let record = (n.addr.to_string(), n.time, u64::from(n.services), n.failures);
try!(writer.encode(record).map_err(|_| err()));
}
Ok(())
}
/// Loads table in from a csv source.
pub fn load<R>(read: R) -> Result<Self, io::Error> where R: io::Read, T: Default {
let mut rdr = csv::Reader::from_reader(read)
.has_headers(false)
.delimiter(b' ');
let mut node_table = NodeTable::default();
let err = || io::Error::new(io::ErrorKind::Other, "Load csv error");
for row in rdr.decode() {
let (addr, time, services, failures): (String, i64, u64, u32) = try!(row.map_err(|_| err()));
let node = Node {
addr: try!(addr.parse().map_err(|_| err())),
time: time,
services: services.into(),
failures: failures,
};
node_table.by_score.insert(node.clone().into());
node_table.by_time.insert(node.clone().into());
node_table.by_addr.insert(node.addr, node);
}
Ok(node_table)
}
}
#[cfg(test)]
@ -381,4 +448,40 @@ mod tests {
table.note_failure(&s0);
table.note_failure(&s1);
}
#[test]
fn test_save_and_load() {
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();
let s1: SocketAddr = "127.0.0.1:8001".parse().unwrap();
let s2: SocketAddr = "127.0.0.1:8002".parse().unwrap();
let s3: SocketAddr = "127.0.0.1:8003".parse().unwrap();
let s4: SocketAddr = "127.0.0.1:8004".parse().unwrap();
let mut table = NodeTable::<IncrementalTime>::default();
table.insert(s0, Services::default());
table.insert(s1, Services::default());
table.insert(s2, Services::default());
table.insert(s3, Services::default());
table.insert(s4, Services::default());
table.note_used(&s2);
table.note_used(&s4);
table.note_used(&s1);
table.note_failure(&s2);
table.note_failure(&s3);
let mut db = Vec::new();
assert_eq!(table.save(&mut db).unwrap(), ());
let loaded_table = NodeTable::<IncrementalTime>::load(&db as &[u8]).unwrap();
assert_eq!(table.by_addr, loaded_table.by_addr);
assert_eq!(table.by_score, loaded_table.by_score);
assert_eq!(table.by_time, loaded_table.by_time);
let s = String::from_utf8(db).unwrap();
assert_eq!(
"127.0.0.1:8001 7 0 0
127.0.0.1:8004 6 0 0
127.0.0.1:8000 0 0 0
127.0.0.1:8002 5 0 1
127.0.0.1:8003 3 0 1
".to_string(), s);
}
}

View File

@ -5,7 +5,7 @@ pub trait Time {
fn get(&self) -> time::Timespec;
}
#[derive(Default)]
#[derive(Default, Debug)]
pub struct RealTime;
impl Time for RealTime {

View File

@ -1,6 +1,6 @@
use std::net::SocketAddr;
use sync::create_sync_connection_factory;
use util::{open_db, init_db};
use util::{open_db, init_db, node_table_path};
use {config, p2p};
pub fn start(cfg: config::Config) -> Result<(), String> {
@ -22,6 +22,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
},
peers: cfg.connect.map_or_else(|| vec![], |x| vec![x]),
seeds: cfg.seednode.map_or_else(|| vec![], |x| vec![x]),
node_table_path: node_table_path(),
};
let db = open_db(cfg.use_disk_database);
@ -29,7 +30,7 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
let sync_connection_factory = create_sync_connection_factory(db);
let p2p = p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle());
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
el.run(p2p::forever()).unwrap();
Ok(())

View File

@ -1,4 +1,5 @@
use std::sync::Arc;
use std::path::PathBuf;
use app_dirs::{app_dir, AppDataType};
use chain::Block;
use {db, APP_INFO};
@ -15,6 +16,12 @@ pub fn open_db(use_disk_database: bool) -> Arc<db::Store> {
}
}
pub fn node_table_path() -> PathBuf {
let mut node_table = app_dir(AppDataType::UserData, &APP_INFO, "p2p").expect("Failed to get app dir");
node_table.push("nodes.csv");
node_table
}
pub fn init_db(db: &Arc<db::Store>) {
// insert genesis block if db is empty
if db.best_block().is_none() {