From d8c9a1aae999ef1f4e43c9dd6ff7efeb6af1dd66 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Fri, 14 Sep 2018 02:58:59 -0600 Subject: [PATCH] Add method to run local drone for tests --- src/drone.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 3 +++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/src/drone.rs b/src/drone.rs index 3ef4fc4591..2415c50944 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -4,15 +4,24 @@ //! checking requests against a request cap for a given time time_slice //! and (to come) an IP rate limit. +use bincode::{deserialize, serialize}; +use bytes::Bytes; use influx_db_client as influxdb; use metrics; use signature::Signature; use signature::{Keypair, Pubkey}; use std::io; use std::io::{Error, ErrorKind}; -use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; +use std::sync::mpsc::Sender; +use std::sync::{Arc, Mutex}; +use std::thread; use std::time::Duration; use thin_client::{poll_gossip_for_leader, ThinClient}; +use tokio; +use tokio::net::TcpListener; +use tokio::prelude::*; +use tokio_codec::{BytesCodec, Decoder}; use transaction::Transaction; pub const TIME_SLICE: u64 = 60; @@ -154,6 +163,66 @@ impl Drop for Drone { } } +pub fn run_local_drone(mint_keypair: Keypair, network: SocketAddr, sender: Sender) { + thread::spawn(move || { + let drone_addr = socketaddr!(0, 0); + let drone = Arc::new(Mutex::new(Drone::new( + mint_keypair, + drone_addr, + network, + None, + None, + ))); + let socket = TcpListener::bind(&drone_addr).unwrap(); + sender.send(socket.local_addr().unwrap()).unwrap(); + info!("Drone started. Listening on: {}", drone_addr); + let done = socket + .incoming() + .map_err(|e| debug!("failed to accept socket; error = {:?}", e)) + .for_each(move |socket| { + let drone2 = drone.clone(); + let framed = BytesCodec::new().framed(socket); + let (writer, reader) = framed.split(); + + let processor = reader.and_then(move |bytes| { + let req: DroneRequest = deserialize(&bytes).or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("deserialize packet in drone: {:?}", err), + )) + })?; + + info!("Airdrop requested..."); + let res1 = drone2.lock().unwrap().send_airdrop(req); + match res1 { + Ok(_) => info!("Airdrop sent!"), + Err(_) => info!("Request limit reached for this time slice"), + } + let response = res1?; + info!("Airdrop tx signature: {:?}", response); + let response_vec = serialize(&response).or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("serialize signature in drone: {:?}", err), + )) + })?; + let response_bytes = Bytes::from(response_vec.clone()); + Ok(response_bytes) + }); + let server = writer + .send_all(processor.or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Drone response: {:?}", err), + )) + })) + .then(|_| Ok(())); + tokio::spawn(server) + }); + tokio::run(done); + }); +} + #[cfg(test)] mod tests { use bank::Bank; diff --git a/src/lib.rs b/src/lib.rs index 11dc25c158..bb2d1f5ff4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,7 @@ pub mod write_stage; extern crate bincode; extern crate bs58; extern crate byteorder; +extern crate bytes; extern crate chrono; extern crate clap; extern crate dirs; @@ -87,6 +88,8 @@ extern crate serde_json; extern crate sha2; extern crate socket2; extern crate sys_info; +extern crate tokio; +extern crate tokio_codec; extern crate untrusted; #[cfg(test)]