Dedup drone code (#3885)

This commit is contained in:
Tyera Eulberg 2019-04-18 19:06:56 -06:00 committed by GitHub
parent 15bed29afa
commit b8ee952135
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 74 deletions

View File

@ -1,17 +1,11 @@
use bytes::Bytes;
use clap::{crate_description, crate_name, crate_version, App, Arg};
use log::*;
use solana_drone::drone::{Drone, DRONE_PORT};
use solana_drone::drone::{run_drone, Drone, DRONE_PORT};
use solana_drone::socketaddr;
use solana_sdk::signature::read_keypair;
use std::error;
use std::io;
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::net::TcpListener;
use tokio::prelude::{Future, Sink, Stream};
use tokio_codec::{BytesCodec, Decoder};
fn main() -> Result<(), Box<error::Error>> {
solana_logger::setup();
@ -75,38 +69,6 @@ fn main() -> Result<(), Box<error::Error>> {
drone1.lock().unwrap().clear_request_count();
});
let socket = TcpListener::bind(&drone_addr).unwrap();
info!("Drone started. Listening on: {}", drone_addr);
let done = socket
.incoming()
.map_err(|e| warn!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let drone2 = drone.clone();
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let processor = reader.and_then(move |bytes| {
match drone2.lock().unwrap().process_drone_request(&bytes) {
Ok(response_bytes) => {
trace!("Airdrop response_bytes: {:?}", response_bytes.to_vec());
Ok(response_bytes)
}
Err(e) => {
info!("Error in request: {:?}", e);
Ok(Bytes::from(&b""[..]))
}
}
});
let server = writer
.send_all(processor.or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Drone response: {:?}", err),
))
}))
.then(|_| Ok(()));
tokio::spawn(server)
});
tokio::run(done);
run_drone(drone, drone_addr, None);
Ok(())
}

View File

@ -261,43 +261,56 @@ pub fn run_local_drone(
None,
request_cap_input,
)));
let socket = TcpListener::bind(&drone_addr).unwrap();
sender.send(socket.local_addr().unwrap()).unwrap();
info!("Drone started. Listening on: {}", drone_addr);
let done = socket
.incoming()
.map_err(|e| debug!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let drone2 = drone.clone();
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let processor = reader.and_then(move |bytes| {
match drone2.lock().unwrap().process_drone_request(&bytes) {
Ok(response_bytes) => {
trace!("Airdrop response_bytes: {:?}", response_bytes.to_vec());
Ok(response_bytes)
}
Err(e) => {
info!("Error in request: {:?}", e);
Ok(Bytes::from(&b""[..]))
}
}
});
let server = writer
.send_all(processor.or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Drone response: {:?}", err),
))
}))
.then(|_| Ok(()));
tokio::spawn(server)
});
tokio::run(done);
run_drone(drone, drone_addr, Some(sender));
});
}
pub fn run_drone(
drone: Arc<Mutex<Drone>>,
drone_addr: SocketAddr,
send_addr: Option<Sender<SocketAddr>>,
) {
let socket = TcpListener::bind(&drone_addr).unwrap();
if send_addr.is_some() {
send_addr
.unwrap()
.send(socket.local_addr().unwrap())
.unwrap();
}
info!("Drone started. Listening on: {}", drone_addr);
let done = socket
.incoming()
.map_err(|e| debug!("failed to accept socket; error = {:?}", e))
.for_each(move |socket| {
let drone2 = drone.clone();
let framed = BytesCodec::new().framed(socket);
let (writer, reader) = framed.split();
let processor = reader.and_then(move |bytes| {
match drone2.lock().unwrap().process_drone_request(&bytes) {
Ok(response_bytes) => {
trace!("Airdrop response_bytes: {:?}", response_bytes.to_vec());
Ok(response_bytes)
}
Err(e) => {
info!("Error in request: {:?}", e);
Ok(Bytes::from(&b""[..]))
}
}
});
let server = writer
.send_all(processor.or_else(|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Drone response: {:?}", err),
))
}))
.then(|_| Ok(()));
tokio::spawn(server)
});
tokio::run(done);
}
#[cfg(test)]
mod tests {
use super::*;