diff --git a/multinode-demo/drone.sh b/multinode-demo/drone.sh index 60e938ccc..ca0744794 100755 --- a/multinode-demo/drone.sh +++ b/multinode-demo/drone.sh @@ -40,6 +40,7 @@ $rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/ trap 'kill "$pid" && wait "$pid"' INT TERM $solana_drone \ -l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \ + --timeout 120 \ > >($drone_logger) 2>&1 & pid=$! oom_score_adj "$pid" 1000 diff --git a/multinode-demo/wallet.sh b/multinode-demo/wallet.sh index 3b14939a9..d8738eef4 100755 --- a/multinode-demo/wallet.sh +++ b/multinode-demo/wallet.sh @@ -42,4 +42,4 @@ fi # shellcheck disable=SC2086 # $solana_wallet should not be quoted exec $solana_wallet \ - -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" "$@" + -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@" diff --git a/src/bin/drone.rs b/src/bin/drone.rs index 7714e0cec..302288115 100644 --- a/src/bin/drone.rs +++ b/src/bin/drone.rs @@ -8,24 +8,23 @@ extern crate tokio_codec; use bincode::deserialize; use clap::{App, Arg}; -use solana::crdt::{Crdt, NodeInfo, TestNode}; +use solana::crdt::NodeInfo; use solana::drone::{Drone, DroneRequest, DRONE_PORT}; use solana::fullnode::Config; use solana::logger; use solana::metrics::set_panic_hook; -use solana::ncp::Ncp; -use solana::service::Service; use solana::signature::read_keypair; +use solana::thin_client::poll_gossip_for_leader; +use std::error; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; use std::thread; use tokio::net::TcpListener; use tokio::prelude::*; use tokio_codec::{BytesCodec, Decoder}; -fn main() { +fn main() -> Result<(), Box> { logger::setup(); set_panic_hook("drone"); let matches = App::new("drone") @@ -48,20 +47,25 @@ fn main() { .help("/path/to/mint.json"), ) .arg( - Arg::with_name("time") - .short("t") - .long("time") + Arg::with_name("slice") + .long("slice") .value_name("SECONDS") .takes_value(true) - .help("time slice over which to limit requests to drone"), + .help("Time slice over which to limit requests to drone"), ) .arg( Arg::with_name("cap") - .short("c") .long("cap") .value_name("NUMBER") .takes_value(true) - .help("request limit for time slice"), + .help("Request limit for time slice"), + ) + .arg( + Arg::with_name("timeout") + .long("timeout") + .value_name("SECONDS") + .takes_value(true) + .help("Max SECONDS to wait to get necessary gossip from the network"), ) .get_matches(); @@ -77,8 +81,8 @@ fn main() { read_keypair(matches.value_of("keypair").expect("keypair")).expect("client keypair"); let time_slice: Option; - if let Some(t) = matches.value_of("time") { - time_slice = Some(t.to_string().parse().expect("integer")); + if let Some(secs) = matches.value_of("slice") { + time_slice = Some(secs.to_string().parse().expect("integer")); } else { time_slice = None; } @@ -88,30 +92,14 @@ fn main() { } else { request_cap = None; } + let timeout: Option; + if let Some(secs) = matches.value_of("timeout") { + timeout = Some(secs.to_string().parse().expect("integer")); + } else { + timeout = None; + } - // Set up gossip functionality - let exit = Arc::new(AtomicBool::new(false)); - let testnode = TestNode::new_localhost(); - let extra_data = testnode.data.clone(); - let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new"))); - let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new( - &crdt.clone(), - window, - None, - testnode.sockets.gossip, - testnode.sockets.gossip_send, - exit.clone(), - ).unwrap(); - let leader_entry_point = NodeInfo::new_entry_point(leader.contact_info.ncp); - crdt.write().unwrap().insert(&leader_entry_point); - - // Block until leader's correct contact info is received - while crdt.read().unwrap().leader_data().is_none() {} - - exit.store(true, Ordering::Relaxed); - ncp.join().unwrap(); - let leader = crdt.read().unwrap().leader_data().unwrap().clone(); + let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?; let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap(); @@ -168,6 +156,7 @@ fn main() { tokio::spawn(processor) }); tokio::run(done); + Ok(()) } fn read_leader(path: &str) -> Config { let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 2e5552574..972885f75 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -9,23 +9,19 @@ extern crate solana; use clap::{App, Arg, SubCommand}; use solana::client::mk_client; -use solana::crdt::{Crdt, NodeInfo, TestNode}; +use solana::crdt::NodeInfo; use solana::drone::DRONE_PORT; use solana::fullnode::Config; use solana::logger; -use solana::ncp::Ncp; -use solana::service::Service; use solana::signature::{read_keypair, Keypair, KeypairUtil, Pubkey, Signature}; -use solana::thin_client::ThinClient; +use solana::thin_client::{poll_gossip_for_leader, ThinClient}; use solana::wallet::request_airdrop; use std::error; use std::fmt; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; use std::thread::sleep; -use std::time::{Duration, Instant}; +use std::time::Duration; enum WalletCommand { Address, @@ -39,7 +35,6 @@ enum WalletCommand { enum WalletError { CommandNotRecognized(String), BadParameter(String), - NoNode(String), } impl fmt::Display for WalletError { @@ -97,12 +92,18 @@ fn parse_args() -> Result> { .takes_value(true) .help("/path/to/id.json"), ) + .arg( + Arg::with_name("timeout") + .long("timeout") + .value_name("SECONDS") + .takes_value(true) + .help("Max SECONDS to wait to get necessary gossip from the network"), + ) .subcommand( SubCommand::with_name("airdrop") .about("Request a batch of tokens") .arg( Arg::with_name("tokens") - // .index(1) .long("tokens") .value_name("NUMBER") .takes_value(true) @@ -115,16 +116,14 @@ fn parse_args() -> Result> { .about("Send a payment") .arg( Arg::with_name("tokens") - // .index(2) .long("tokens") .value_name("NUMBER") .takes_value(true) .required(true) - .help("the number of tokens to send"), + .help("The number of tokens to send"), ) .arg( Arg::with_name("to") - // .index(1) .long("to") .value_name("PUBKEY") .takes_value(true) @@ -153,6 +152,12 @@ fn parse_args() -> Result> { let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); leader = NodeInfo::new_leader(&server_addr); }; + let timeout: Option; + if let Some(secs) = matches.value_of("timeout") { + timeout = Some(secs.to_string().parse().expect("integer")); + } else { + timeout = None; + } let mut path = dirs::home_dir().expect("home directory"); let id_path = if matches.is_present("keypair") { @@ -168,34 +173,7 @@ fn parse_args() -> Result> { ))) })?; - // Set up gossip functionality - let exit = Arc::new(AtomicBool::new(false)); - let testnode = TestNode::new_localhost(); - let extra_data = testnode.data.clone(); - let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new"))); - let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new( - &crdt.clone(), - window, - None, - testnode.sockets.gossip, - testnode.sockets.gossip_send, - exit.clone(), - ).unwrap(); - let leader_entry_point = NodeInfo::new_entry_point(leader.contact_info.ncp); - crdt.write().unwrap().insert(&leader_entry_point); - - let now = Instant::now(); - // Block until leader's correct contact info is received - while crdt.read().unwrap().leader_data().is_none() { - if now.elapsed() > Duration::new(10, 0) { - Err(WalletError::NoNode("No leader detected".to_string()))?; - } - } - - exit.store(true, Ordering::Relaxed); - ncp.join().unwrap(); - let leader = crdt.read().unwrap().leader_data().unwrap().clone(); + let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?; let mut drone_addr = leader.contact_info.tpu; drone_addr.set_port(DRONE_PORT); diff --git a/src/result.rs b/src/result.rs index ba6783f72..3f4caaa7d 100644 --- a/src/result.rs +++ b/src/result.rs @@ -31,6 +31,14 @@ pub enum Error { pub type Result = std::result::Result; +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "solana error") + } +} + +impl std::error::Error for Error {} + impl std::convert::From for Error { fn from(e: std::sync::mpsc::RecvError) -> Error { Error::RecvError(e) diff --git a/src/thin_client.rs b/src/thin_client.rs index 1120ec37d..646666ab8 100755 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -5,12 +5,17 @@ use bank::Account; use bincode::{deserialize, serialize}; +use crdt::{Crdt, CrdtError, NodeInfo, TestNode}; use hash::Hash; +use ncp::Ncp; use request::{Request, Response}; +use result::{Error, Result}; use signature::{Keypair, Pubkey, Signature}; use std::collections::HashMap; use std::io; use std::net::{SocketAddr, UdpSocket}; +use std::sync::atomic::AtomicBool; +use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; use std::time::Instant; @@ -320,6 +325,38 @@ impl Drop for ThinClient { } } +pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> Result { + let exit = Arc::new(AtomicBool::new(false)); + let testnode = TestNode::new_localhost(); + let extra_data = testnode.data.clone(); + let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new"))); + let window = Arc::new(RwLock::new(vec![])); + let ncp = Ncp::new( + &crdt.clone(), + window, + None, + testnode.sockets.gossip, + testnode.sockets.gossip_send, + exit.clone(), + ).unwrap(); + let leader_entry_point = NodeInfo::new_entry_point(leader_ncp); + crdt.write().unwrap().insert(&leader_entry_point); + + sleep(Duration::from_millis(100)); + + let now = Instant::now(); + // Block until leader's correct contact info is received + while crdt.read().unwrap().leader_data().is_none() { + if timeout.is_some() && now.elapsed() > Duration::new(timeout.unwrap(), 0) { + return Err(Error::CrdtError(CrdtError::NoLeader)); + } + } + + ncp.close()?; + let leader = crdt.read().unwrap().leader_data().unwrap().clone(); + Ok(leader) +} + #[cfg(test)] mod tests { use super::*;