From d63dd95806bb1c5085d515ba5137e51b7ab50506 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 29 Dec 2020 20:51:01 -0700 Subject: [PATCH] Rewrite faucet with tokio v0.3 (#14336) * Rewrite faucet for contemporary tokio * Move away from framed decoder --- Cargo.lock | 4 +- faucet/Cargo.toml | 4 +- faucet/src/bin/faucet.rs | 5 +- faucet/src/faucet.rs | 110 +++++++++++++++++++++------------------ 4 files changed, 65 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cf47608d..ebe8a8c82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4144,7 +4144,6 @@ version = "1.6.0" dependencies = [ "bincode", "byteorder", - "bytes 0.4.12", "clap", "log 0.4.11", "serde", @@ -4155,8 +4154,7 @@ dependencies = [ "solana-metrics", "solana-sdk", "solana-version", - "tokio 0.1.22", - "tokio-codec", + "tokio 0.3.5", ] [[package]] diff --git a/faucet/Cargo.toml b/faucet/Cargo.toml index 5f1cb79a0..d5630b9b8 100644 --- a/faucet/Cargo.toml +++ b/faucet/Cargo.toml @@ -11,7 +11,6 @@ edition = "2018" [dependencies] bincode = "1.3.1" byteorder = "1.3.4" -bytes = "0.4" clap = "2.33" log = "0.4.11" serde = "1.0.112" @@ -22,8 +21,7 @@ solana-logger = { path = "../logger", version = "1.6.0" } solana-metrics = { path = "../metrics", version = "1.6.0" } solana-sdk = { path = "../sdk", version = "1.6.0" } solana-version = { path = "../version", version = "1.6.0" } -tokio = "0.1" -tokio-codec = "0.1" +tokio = { version = "0.3", features = ["full"] } [lib] crate-type = ["lib"] diff --git a/faucet/src/bin/faucet.rs b/faucet/src/bin/faucet.rs index dac98a2bd..0d4b9b76d 100644 --- a/faucet/src/bin/faucet.rs +++ b/faucet/src/bin/faucet.rs @@ -11,7 +11,8 @@ use std::{ thread, }; -fn main() { +#[tokio::main] +async fn main() { let default_keypair = solana_cli_config::Config::default().keypair_path; solana_logger::setup_with_default("solana=info"); @@ -76,5 +77,5 @@ fn main() { faucet1.lock().unwrap().clear_request_count(); }); - run_faucet(faucet, faucet_addr, None); + run_faucet(faucet, faucet_addr, None).await; } diff --git a/faucet/src/faucet.rs b/faucet/src/faucet.rs index 3f8142a12..0de8109de 100644 --- a/faucet/src/faucet.rs +++ b/faucet/src/faucet.rs @@ -4,9 +4,8 @@ //! checking requests against a request cap for a given time time_slice //! and (to come) an IP rate limit. -use bincode::{deserialize, serialize}; +use bincode::{deserialize, serialize, serialized_size}; use byteorder::{ByteOrder, LittleEndian}; -use bytes::{Bytes, BytesMut}; use log::*; use serde_derive::{Deserialize, Serialize}; use solana_metrics::datapoint_info; @@ -20,18 +19,17 @@ use solana_sdk::{ transaction::Transaction, }; use std::{ - io::{self, Error, ErrorKind}, + io::{self, Error, ErrorKind, Read, Write}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, sync::{mpsc::Sender, Arc, Mutex}, thread, time::Duration, }; use tokio::{ - self, - net::TcpListener, - prelude::{Future, Read, Sink, Stream, Write}, + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream as TokioTcpStream}, + runtime::Runtime, }; -use tokio_codec::{BytesCodec, Decoder}; #[macro_export] macro_rules! socketaddr { @@ -58,6 +56,16 @@ pub enum FaucetRequest { }, } +impl Default for FaucetRequest { + fn default() -> Self { + Self::GetAirdrop { + lamports: u64::default(), + to: Pubkey::default(), + blockhash: Hash::default(), + } + } +} + pub struct Faucet { faucet_keypair: Keypair, ip_cache: Vec, @@ -154,7 +162,7 @@ impl Faucet { } } } - pub fn process_faucet_request(&mut self, bytes: &BytesMut) -> Result { + pub fn process_faucet_request(&mut self, bytes: &[u8]) -> Result, io::Error> { let req: FaucetRequest = deserialize(bytes).map_err(|err| { io::Error::new( io::ErrorKind::Other, @@ -177,9 +185,8 @@ impl Faucet { LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16); response_vec_with_length.extend_from_slice(&response_vec); - let response_bytes = Bytes::from(response_vec_with_length); info!("Airdrop transaction granted"); - Ok(response_bytes) + Ok(response_vec_with_length) } Err(err) => { warn!("Airdrop transaction failed: {:?}", err); @@ -270,7 +277,8 @@ pub fn run_local_faucet_with_port( per_time_cap, None, ))); - run_faucet(faucet, faucet_addr, Some(sender)); + let runtime = Runtime::new().unwrap(); + runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender))); }); } @@ -283,14 +291,14 @@ pub fn run_local_faucet( run_local_faucet_with_port(faucet_keypair, sender, per_time_cap, 0) } -pub fn run_faucet( +pub async fn run_faucet( faucet: Arc>, faucet_addr: SocketAddr, send_addr: Option>, ) { - let socket = TcpListener::bind(&faucet_addr).unwrap(); + let listener = TcpListener::bind(&faucet_addr).await.unwrap(); if let Some(send_addr) = send_addr { - send_addr.send(socket.local_addr().unwrap()).unwrap(); + send_addr.send(listener.local_addr().unwrap()).unwrap(); } info!("Faucet started. Listening on: {}", faucet_addr); info!( @@ -298,43 +306,48 @@ pub fn run_faucet( faucet.lock().unwrap().faucet_keypair.pubkey() ); - let done = socket - .incoming() - .map_err(|e| debug!("failed to accept socket; error = {:?}", e)) - .for_each(move |socket| { - let faucet2 = faucet.clone(); - let framed = BytesCodec::new().framed(socket); - let (writer, reader) = framed.split(); + loop { + let _faucet = faucet.clone(); + match listener.accept().await { + Ok((stream, _)) => { + tokio::spawn(async move { + if let Err(e) = process(stream, _faucet).await { + info!("failed to process request; error = {:?}", e); + } + }); + } + Err(e) => debug!("failed to accept socket; error = {:?}", e), + } + } +} - let processor = reader.and_then(move |bytes| { - match faucet2.lock().unwrap().process_faucet_request(&bytes) { - Ok(response_bytes) => { - trace!("Airdrop response_bytes: {:?}", response_bytes.to_vec()); - Ok(response_bytes) - } - Err(e) => { - info!("Error in request: {:?}", e); - Ok(Bytes::from(0u16.to_le_bytes().to_vec())) - } - } - }); - let server = writer - .send_all(processor.or_else(|err| { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Faucet response: {:?}", err), - )) - })) - .then(|_| Ok(())); - tokio::spawn(server) - }); - tokio::run(done); +async fn process( + mut stream: TokioTcpStream, + faucet: Arc>, +) -> Result<(), Box> { + let mut request = vec![0u8; serialized_size(&FaucetRequest::default()).unwrap() as usize]; + while stream.read_exact(&mut request).await.is_ok() { + trace!("{:?}", request); + + let response = match faucet.lock().unwrap().process_faucet_request(&request) { + Ok(response_bytes) => { + trace!("Airdrop response_bytes: {:?}", response_bytes); + response_bytes + } + Err(e) => { + info!("Error in request: {:?}", e); + 0u16.to_le_bytes().to_vec() + } + }; + stream.write_all(&response).await?; + } + + Ok(()) } #[cfg(test)] mod tests { use super::*; - use bytes::BufMut; use solana_sdk::system_instruction::SystemInstruction; use std::time::Duration; @@ -446,8 +459,6 @@ mod tests { to, }; let req = serialize(&req).unwrap(); - let mut bytes = BytesMut::with_capacity(req.len()); - bytes.put(&req[..]); let keypair = Keypair::new(); let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports); @@ -459,12 +470,11 @@ mod tests { expected_vec_with_length.extend_from_slice(&expected_bytes); let mut faucet = Faucet::new(keypair, None, None, None); - let response = faucet.process_faucet_request(&bytes); + let response = faucet.process_faucet_request(&req); let response_vec = response.unwrap().to_vec(); assert_eq!(expected_vec_with_length, response_vec); - let mut bad_bytes = BytesMut::with_capacity(9); - bad_bytes.put("bad bytes"); + let bad_bytes = "bad bytes".as_bytes(); assert!(faucet.process_faucet_request(&bad_bytes).is_err()); } }