Improve gossip use for drone and wallet

- Add utility function
  - Add thread sleep
  - Enable configurable timeout for gossip poll
This commit is contained in:
Tyera Eulberg 2018-08-20 14:03:36 -06:00 committed by Tyera Eulberg
parent 4fdd9fbfca
commit d4c41219f9
6 changed files with 91 additions and 78 deletions

View File

@ -40,6 +40,7 @@ $rsync -vPz "$rsync_leader_url"/config/leader.json "$SOLANA_CONFIG_DIR"/
trap 'kill "$pid" && wait "$pid"' INT TERM trap 'kill "$pid" && wait "$pid"' INT TERM
$solana_drone \ $solana_drone \
-l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \ -l "$SOLANA_CONFIG_DIR"/leader.json -k "$SOLANA_CONFIG_PRIVATE_DIR"/mint.json \
--timeout 120 \
> >($drone_logger) 2>&1 & > >($drone_logger) 2>&1 &
pid=$! pid=$!
oom_score_adj "$pid" 1000 oom_score_adj "$pid" 1000

View File

@ -42,4 +42,4 @@ fi
# shellcheck disable=SC2086 # $solana_wallet should not be quoted # shellcheck disable=SC2086 # $solana_wallet should not be quoted
exec $solana_wallet \ exec $solana_wallet \
-l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" "$@" -l "$SOLANA_CONFIG_CLIENT_DIR"/leader.json -k "$client_id_path" --timeout 10 "$@"

View File

@ -8,24 +8,23 @@ extern crate tokio_codec;
use bincode::deserialize; use bincode::deserialize;
use clap::{App, Arg}; use clap::{App, Arg};
use solana::crdt::{Crdt, NodeInfo, TestNode}; use solana::crdt::NodeInfo;
use solana::drone::{Drone, DroneRequest, DRONE_PORT}; use solana::drone::{Drone, DroneRequest, DRONE_PORT};
use solana::fullnode::Config; use solana::fullnode::Config;
use solana::logger; use solana::logger;
use solana::metrics::set_panic_hook; use solana::metrics::set_panic_hook;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::read_keypair; use solana::signature::read_keypair;
use solana::thin_client::poll_gossip_for_leader;
use std::error;
use std::fs::File; use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread; use std::thread;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::prelude::*; use tokio::prelude::*;
use tokio_codec::{BytesCodec, Decoder}; use tokio_codec::{BytesCodec, Decoder};
fn main() { fn main() -> Result<(), Box<error::Error>> {
logger::setup(); logger::setup();
set_panic_hook("drone"); set_panic_hook("drone");
let matches = App::new("drone") let matches = App::new("drone")
@ -48,20 +47,25 @@ fn main() {
.help("/path/to/mint.json"), .help("/path/to/mint.json"),
) )
.arg( .arg(
Arg::with_name("time") Arg::with_name("slice")
.short("t") .long("slice")
.long("time")
.value_name("SECONDS") .value_name("SECONDS")
.takes_value(true) .takes_value(true)
.help("time slice over which to limit requests to drone"), .help("Time slice over which to limit requests to drone"),
) )
.arg( .arg(
Arg::with_name("cap") Arg::with_name("cap")
.short("c")
.long("cap") .long("cap")
.value_name("NUMBER") .value_name("NUMBER")
.takes_value(true) .takes_value(true)
.help("request limit for time slice"), .help("Request limit for time slice"),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
) )
.get_matches(); .get_matches();
@ -77,8 +81,8 @@ fn main() {
read_keypair(matches.value_of("keypair").expect("keypair")).expect("client keypair"); read_keypair(matches.value_of("keypair").expect("keypair")).expect("client keypair");
let time_slice: Option<u64>; let time_slice: Option<u64>;
if let Some(t) = matches.value_of("time") { if let Some(secs) = matches.value_of("slice") {
time_slice = Some(t.to_string().parse().expect("integer")); time_slice = Some(secs.to_string().parse().expect("integer"));
} else { } else {
time_slice = None; time_slice = None;
} }
@ -88,30 +92,14 @@ fn main() {
} else { } else {
request_cap = None; request_cap = None;
} }
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("integer"));
} else {
timeout = None;
}
// Set up gossip functionality let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;
let exit = Arc::new(AtomicBool::new(false));
let testnode = TestNode::new_localhost();
let extra_data = testnode.data.clone();
let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
testnode.sockets.gossip,
testnode.sockets.gossip_send,
exit.clone(),
).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader.contact_info.ncp);
crdt.write().unwrap().insert(&leader_entry_point);
// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {}
exit.store(true, Ordering::Relaxed);
ncp.join().unwrap();
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap(); let drone_addr: SocketAddr = format!("0.0.0.0:{}", DRONE_PORT).parse().unwrap();
@ -168,6 +156,7 @@ fn main() {
tokio::spawn(processor) tokio::spawn(processor)
}); });
tokio::run(done); tokio::run(done);
Ok(())
} }
fn read_leader(path: &str) -> Config { fn read_leader(path: &str) -> Config {
let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path));

View File

@ -9,23 +9,19 @@ extern crate solana;
use clap::{App, Arg, SubCommand}; use clap::{App, Arg, SubCommand};
use solana::client::mk_client; use solana::client::mk_client;
use solana::crdt::{Crdt, NodeInfo, TestNode}; use solana::crdt::NodeInfo;
use solana::drone::DRONE_PORT; use solana::drone::DRONE_PORT;
use solana::fullnode::Config; use solana::fullnode::Config;
use solana::logger; use solana::logger;
use solana::ncp::Ncp;
use solana::service::Service;
use solana::signature::{read_keypair, Keypair, KeypairUtil, Pubkey, Signature}; use solana::signature::{read_keypair, Keypair, KeypairUtil, Pubkey, Signature};
use solana::thin_client::ThinClient; use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::wallet::request_airdrop; use solana::wallet::request_airdrop;
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::fs::File; use std::fs::File;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::{Duration, Instant}; use std::time::Duration;
enum WalletCommand { enum WalletCommand {
Address, Address,
@ -39,7 +35,6 @@ enum WalletCommand {
enum WalletError { enum WalletError {
CommandNotRecognized(String), CommandNotRecognized(String),
BadParameter(String), BadParameter(String),
NoNode(String),
} }
impl fmt::Display for WalletError { impl fmt::Display for WalletError {
@ -97,12 +92,18 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.takes_value(true) .takes_value(true)
.help("/path/to/id.json"), .help("/path/to/id.json"),
) )
.arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.subcommand( .subcommand(
SubCommand::with_name("airdrop") SubCommand::with_name("airdrop")
.about("Request a batch of tokens") .about("Request a batch of tokens")
.arg( .arg(
Arg::with_name("tokens") Arg::with_name("tokens")
// .index(1)
.long("tokens") .long("tokens")
.value_name("NUMBER") .value_name("NUMBER")
.takes_value(true) .takes_value(true)
@ -115,16 +116,14 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
.about("Send a payment") .about("Send a payment")
.arg( .arg(
Arg::with_name("tokens") Arg::with_name("tokens")
// .index(2)
.long("tokens") .long("tokens")
.value_name("NUMBER") .value_name("NUMBER")
.takes_value(true) .takes_value(true)
.required(true) .required(true)
.help("the number of tokens to send"), .help("The number of tokens to send"),
) )
.arg( .arg(
Arg::with_name("to") Arg::with_name("to")
// .index(1)
.long("to") .long("to")
.value_name("PUBKEY") .value_name("PUBKEY")
.takes_value(true) .takes_value(true)
@ -153,6 +152,12 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
leader = NodeInfo::new_leader(&server_addr); leader = NodeInfo::new_leader(&server_addr);
}; };
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("integer"));
} else {
timeout = None;
}
let mut path = dirs::home_dir().expect("home directory"); let mut path = dirs::home_dir().expect("home directory");
let id_path = if matches.is_present("keypair") { let id_path = if matches.is_present("keypair") {
@ -168,34 +173,7 @@ fn parse_args() -> Result<WalletConfig, Box<error::Error>> {
))) )))
})?; })?;
// Set up gossip functionality let leader = poll_gossip_for_leader(leader.contact_info.ncp, timeout)?;
let exit = Arc::new(AtomicBool::new(false));
let testnode = TestNode::new_localhost();
let extra_data = testnode.data.clone();
let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
testnode.sockets.gossip,
testnode.sockets.gossip_send,
exit.clone(),
).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader.contact_info.ncp);
crdt.write().unwrap().insert(&leader_entry_point);
let now = Instant::now();
// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {
if now.elapsed() > Duration::new(10, 0) {
Err(WalletError::NoNode("No leader detected".to_string()))?;
}
}
exit.store(true, Ordering::Relaxed);
ncp.join().unwrap();
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
let mut drone_addr = leader.contact_info.tpu; let mut drone_addr = leader.contact_info.tpu;
drone_addr.set_port(DRONE_PORT); drone_addr.set_port(DRONE_PORT);

View File

@ -31,6 +31,14 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "solana error")
}
}
impl std::error::Error for Error {}
impl std::convert::From<std::sync::mpsc::RecvError> for Error { impl std::convert::From<std::sync::mpsc::RecvError> for Error {
fn from(e: std::sync::mpsc::RecvError) -> Error { fn from(e: std::sync::mpsc::RecvError) -> Error {
Error::RecvError(e) Error::RecvError(e)

View File

@ -5,12 +5,17 @@
use bank::Account; use bank::Account;
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use crdt::{Crdt, CrdtError, NodeInfo, TestNode};
use hash::Hash; use hash::Hash;
use ncp::Ncp;
use request::{Request, Response}; use request::{Request, Response};
use result::{Error, Result};
use signature::{Keypair, Pubkey, Signature}; use signature::{Keypair, Pubkey, Signature};
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@ -320,6 +325,38 @@ impl Drop for ThinClient {
} }
} }
pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> Result<NodeInfo> {
let exit = Arc::new(AtomicBool::new(false));
let testnode = TestNode::new_localhost();
let extra_data = testnode.data.clone();
let crdt = Arc::new(RwLock::new(Crdt::new(extra_data).expect("Crdt::new")));
let window = Arc::new(RwLock::new(vec![]));
let ncp = Ncp::new(
&crdt.clone(),
window,
None,
testnode.sockets.gossip,
testnode.sockets.gossip_send,
exit.clone(),
).unwrap();
let leader_entry_point = NodeInfo::new_entry_point(leader_ncp);
crdt.write().unwrap().insert(&leader_entry_point);
sleep(Duration::from_millis(100));
let now = Instant::now();
// Block until leader's correct contact info is received
while crdt.read().unwrap().leader_data().is_none() {
if timeout.is_some() && now.elapsed() > Duration::new(timeout.unwrap(), 0) {
return Err(Error::CrdtError(CrdtError::NoLeader));
}
}
ncp.close()?;
let leader = crdt.read().unwrap().leader_data().unwrap().clone();
Ok(leader)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;