solana/src/bin/drone.rs

184 lines
5.9 KiB
Rust
Raw Normal View History

2018-06-19 21:40:44 -07:00
extern crate bincode;
extern crate bytes;
#[macro_use]
extern crate clap;
2018-06-19 21:40:44 -07:00
extern crate serde_json;
extern crate solana;
extern crate tokio;
extern crate tokio_codec;
use bincode::{deserialize, serialize};
use bytes::Bytes;
use clap::{App, Arg};
2018-07-18 12:38:18 -07:00
use solana::drone::{Drone, DroneRequest, DRONE_PORT};
2018-07-27 21:37:53 -07:00
use solana::logger;
2018-07-17 10:48:46 -07:00
use solana::metrics::set_panic_hook;
2018-07-12 15:02:14 -07:00
use solana::signature::read_keypair;
use solana::thin_client::poll_gossip_for_leader;
use std::error;
use std::net::{Ipv4Addr, SocketAddr};
use std::process::exit;
use std::sync::{Arc, Mutex};
use std::thread;
2018-06-19 21:40:44 -07:00
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_codec::{BytesCodec, Decoder};
2018-06-19 21:40:44 -07:00
macro_rules! socketaddr {
($ip:expr, $port:expr) => {
SocketAddr::from((Ipv4Addr::from($ip), $port))
};
($str:expr) => {{
let a: SocketAddr = $str.parse().unwrap();
a
}};
}
fn main() -> Result<(), Box<error::Error>> {
2018-07-27 21:37:53 -07:00
logger::setup();
2018-07-17 11:00:01 -07:00
set_panic_hook("drone");
let matches = App::new("drone")
.version(crate_version!())
.arg(
Arg::with_name("network")
.short("n")
.long("network")
.value_name("HOST:PORT")
.takes_value(true)
.required(true)
.help("rendezvous with the network at this gossip entry point"),
)
.arg(
2018-07-12 15:02:14 -07:00
Arg::with_name("keypair")
.short("k")
.long("keypair")
.value_name("PATH")
.takes_value(true)
2018-07-12 15:02:14 -07:00
.required(true)
.help("File to read the client's keypair from"),
)
.arg(
Arg::with_name("slice")
.long("slice")
.value_name("SECONDS")
.takes_value(true)
.help("Time slice over which to limit requests to drone"),
)
.arg(
Arg::with_name("cap")
.long("cap")
.value_name("NUMBER")
.takes_value(true)
.help("Request limit for time slice"),
)
.arg(
Arg::with_name("timeout")
.long("timeout")
.value_name("SECONDS")
.takes_value(true)
.help("Max SECONDS to wait to get necessary gossip from the network"),
)
.get_matches();
let network = matches
.value_of("network")
.unwrap()
.parse()
.unwrap_or_else(|e| {
eprintln!("failed to parse network: {}", e);
exit(1)
});
2018-07-12 15:02:14 -07:00
let mint_keypair =
read_keypair(matches.value_of("keypair").unwrap()).expect("failed to read client keypair");
2018-07-12 15:02:14 -07:00
2018-06-19 21:40:44 -07:00
let time_slice: Option<u64>;
if let Some(secs) = matches.value_of("slice") {
time_slice = Some(secs.to_string().parse().expect("failed to parse slice"));
} else {
2018-06-19 21:40:44 -07:00
time_slice = None;
}
let request_cap: Option<u64>;
if let Some(c) = matches.value_of("cap") {
request_cap = Some(c.to_string().parse().expect("failed to parse cap"));
} else {
2018-06-19 21:40:44 -07:00
request_cap = None;
}
let timeout: Option<u64>;
if let Some(secs) = matches.value_of("timeout") {
timeout = Some(secs.to_string().parse().expect("failed to parse timeout"));
} else {
timeout = None;
}
2018-06-19 21:40:44 -07:00
let leader = poll_gossip_for_leader(network, timeout)?;
let drone_addr = socketaddr!(0, DRONE_PORT);
2018-08-28 16:52:10 -07:00
let drone = Arc::new(Mutex::new(Drone::new(
mint_keypair,
drone_addr,
2018-07-09 17:55:11 -07:00
leader.contact_info.tpu,
leader.contact_info.rpu,
time_slice,
request_cap,
)));
let drone1 = drone.clone();
thread::spawn(move || loop {
let time = drone1.lock().unwrap().time_slice;
thread::sleep(time);
drone1.lock().unwrap().clear_request_count();
});
2018-08-28 16:52:10 -07:00
let socket = TcpListener::bind(&drone_addr).unwrap();
println!("Drone started. Listening on: {}", drone_addr);
2018-06-19 21:40:44 -07:00
let done = socket
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let drone2 = drone.clone();
// let client_ip = socket.peer_addr().expect("drone peer_addr").ip();
2018-06-19 21:40:44 -07:00
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();
2018-06-19 21:40:44 -07:00
let processor = reader.and_then(move |bytes| {
let req: DroneRequest = deserialize(&bytes).or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("deserialize packet in drone: {:?}", err),
))
})?;
println!("Airdrop requested...");
// let res = drone2.lock().unwrap().check_rate_limit(client_ip);
let res1 = drone2.lock().unwrap().send_airdrop(req);
match res1 {
Ok(_) => println!("Airdrop sent!"),
Err(_) => println!("Request limit reached for this time slice"),
}
let response = res1?;
println!("Airdrop tx signature: {:?}", response);
let response_vec = serialize(&response).or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("serialize signature in drone: {:?}", err),
))
})?;
let response_bytes = Bytes::from(response_vec.clone());
Ok(response_bytes)
});
let server = writer
.send_all(processor.or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Drone response: {:?}", err),
))
}))
.then(|_| Ok(()));
tokio::spawn(server)
2018-06-19 21:40:44 -07:00
});
tokio::run(done);
Ok(())
2018-06-19 21:40:44 -07:00
}