Rewrite faucet with tokio v0.3 (#14336)

* Rewrite faucet for contemporary tokio

* Move away from framed decoder
This commit is contained in:
Tyera Eulberg 2020-12-29 20:51:01 -07:00 committed by GitHub
parent fe667db910
commit d63dd95806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 58 deletions

4
Cargo.lock generated
View File

@ -4144,7 +4144,6 @@ version = "1.6.0"
dependencies = [ dependencies = [
"bincode", "bincode",
"byteorder", "byteorder",
"bytes 0.4.12",
"clap", "clap",
"log 0.4.11", "log 0.4.11",
"serde", "serde",
@ -4155,8 +4154,7 @@ dependencies = [
"solana-metrics", "solana-metrics",
"solana-sdk", "solana-sdk",
"solana-version", "solana-version",
"tokio 0.1.22", "tokio 0.3.5",
"tokio-codec",
] ]
[[package]] [[package]]

View File

@ -11,7 +11,6 @@ edition = "2018"
[dependencies] [dependencies]
bincode = "1.3.1" bincode = "1.3.1"
byteorder = "1.3.4" byteorder = "1.3.4"
bytes = "0.4"
clap = "2.33" clap = "2.33"
log = "0.4.11" log = "0.4.11"
serde = "1.0.112" serde = "1.0.112"
@ -22,8 +21,7 @@ solana-logger = { path = "../logger", version = "1.6.0" }
solana-metrics = { path = "../metrics", version = "1.6.0" } solana-metrics = { path = "../metrics", version = "1.6.0" }
solana-sdk = { path = "../sdk", version = "1.6.0" } solana-sdk = { path = "../sdk", version = "1.6.0" }
solana-version = { path = "../version", version = "1.6.0" } solana-version = { path = "../version", version = "1.6.0" }
tokio = "0.1" tokio = { version = "0.3", features = ["full"] }
tokio-codec = "0.1"
[lib] [lib]
crate-type = ["lib"] crate-type = ["lib"]

View File

@ -11,7 +11,8 @@ use std::{
thread, thread,
}; };
fn main() { #[tokio::main]
async fn main() {
let default_keypair = solana_cli_config::Config::default().keypair_path; let default_keypair = solana_cli_config::Config::default().keypair_path;
solana_logger::setup_with_default("solana=info"); solana_logger::setup_with_default("solana=info");
@ -76,5 +77,5 @@ fn main() {
faucet1.lock().unwrap().clear_request_count(); faucet1.lock().unwrap().clear_request_count();
}); });
run_faucet(faucet, faucet_addr, None); run_faucet(faucet, faucet_addr, None).await;
} }

View File

@ -4,9 +4,8 @@
//! checking requests against a request cap for a given time time_slice //! checking requests against a request cap for a given time time_slice
//! and (to come) an IP rate limit. //! and (to come) an IP rate limit.
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize, serialized_size};
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use log::*; use log::*;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use solana_metrics::datapoint_info; use solana_metrics::datapoint_info;
@ -20,18 +19,17 @@ use solana_sdk::{
transaction::Transaction, transaction::Transaction,
}; };
use std::{ use std::{
io::{self, Error, ErrorKind}, io::{self, Error, ErrorKind, Read, Write},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
sync::{mpsc::Sender, Arc, Mutex}, sync::{mpsc::Sender, Arc, Mutex},
thread, thread,
time::Duration, time::Duration,
}; };
use tokio::{ use tokio::{
self, io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener, net::{TcpListener, TcpStream as TokioTcpStream},
prelude::{Future, Read, Sink, Stream, Write}, runtime::Runtime,
}; };
use tokio_codec::{BytesCodec, Decoder};
#[macro_export] #[macro_export]
macro_rules! socketaddr { macro_rules! socketaddr {
@ -58,6 +56,16 @@ pub enum FaucetRequest {
}, },
} }
impl Default for FaucetRequest {
fn default() -> Self {
Self::GetAirdrop {
lamports: u64::default(),
to: Pubkey::default(),
blockhash: Hash::default(),
}
}
}
pub struct Faucet { pub struct Faucet {
faucet_keypair: Keypair, faucet_keypair: Keypair,
ip_cache: Vec<IpAddr>, ip_cache: Vec<IpAddr>,
@ -154,7 +162,7 @@ impl Faucet {
} }
} }
} }
pub fn process_faucet_request(&mut self, bytes: &BytesMut) -> Result<Bytes, io::Error> { pub fn process_faucet_request(&mut self, bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
let req: FaucetRequest = deserialize(bytes).map_err(|err| { let req: FaucetRequest = deserialize(bytes).map_err(|err| {
io::Error::new( io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
@ -177,9 +185,8 @@ impl Faucet {
LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16); LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16);
response_vec_with_length.extend_from_slice(&response_vec); response_vec_with_length.extend_from_slice(&response_vec);
let response_bytes = Bytes::from(response_vec_with_length);
info!("Airdrop transaction granted"); info!("Airdrop transaction granted");
Ok(response_bytes) Ok(response_vec_with_length)
} }
Err(err) => { Err(err) => {
warn!("Airdrop transaction failed: {:?}", err); warn!("Airdrop transaction failed: {:?}", err);
@ -270,7 +277,8 @@ pub fn run_local_faucet_with_port(
per_time_cap, per_time_cap,
None, None,
))); )));
run_faucet(faucet, faucet_addr, Some(sender)); let runtime = Runtime::new().unwrap();
runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender)));
}); });
} }
@ -283,14 +291,14 @@ pub fn run_local_faucet(
run_local_faucet_with_port(faucet_keypair, sender, per_time_cap, 0) run_local_faucet_with_port(faucet_keypair, sender, per_time_cap, 0)
} }
pub fn run_faucet( pub async fn run_faucet(
faucet: Arc<Mutex<Faucet>>, faucet: Arc<Mutex<Faucet>>,
faucet_addr: SocketAddr, faucet_addr: SocketAddr,
send_addr: Option<Sender<SocketAddr>>, send_addr: Option<Sender<SocketAddr>>,
) { ) {
let socket = TcpListener::bind(&faucet_addr).unwrap(); let listener = TcpListener::bind(&faucet_addr).await.unwrap();
if let Some(send_addr) = send_addr { if let Some(send_addr) = send_addr {
send_addr.send(socket.local_addr().unwrap()).unwrap(); send_addr.send(listener.local_addr().unwrap()).unwrap();
} }
info!("Faucet started. Listening on: {}", faucet_addr); info!("Faucet started. Listening on: {}", faucet_addr);
info!( info!(
@ -298,43 +306,48 @@ pub fn run_faucet(
faucet.lock().unwrap().faucet_keypair.pubkey() faucet.lock().unwrap().faucet_keypair.pubkey()
); );
let done = socket loop {
.incoming() let _faucet = faucet.clone();
.map_err(|e| debug!("failed to accept socket; error = {:?}", e)) match listener.accept().await {
.for_each(move |socket| { Ok((stream, _)) => {
let faucet2 = faucet.clone(); tokio::spawn(async move {
let framed = BytesCodec::new().framed(socket); if let Err(e) = process(stream, _faucet).await {
let (writer, reader) = framed.split(); info!("failed to process request; error = {:?}", e);
}
});
}
Err(e) => debug!("failed to accept socket; error = {:?}", e),
}
}
}
let processor = reader.and_then(move |bytes| { async fn process(
match faucet2.lock().unwrap().process_faucet_request(&bytes) { mut stream: TokioTcpStream,
Ok(response_bytes) => { faucet: Arc<Mutex<Faucet>>,
trace!("Airdrop response_bytes: {:?}", response_bytes.to_vec()); ) -> Result<(), Box<dyn std::error::Error>> {
Ok(response_bytes) let mut request = vec![0u8; serialized_size(&FaucetRequest::default()).unwrap() as usize];
} while stream.read_exact(&mut request).await.is_ok() {
Err(e) => { trace!("{:?}", request);
info!("Error in request: {:?}", e);
Ok(Bytes::from(0u16.to_le_bytes().to_vec())) let response = match faucet.lock().unwrap().process_faucet_request(&request) {
} Ok(response_bytes) => {
} trace!("Airdrop response_bytes: {:?}", response_bytes);
}); response_bytes
let server = writer }
.send_all(processor.or_else(|err| { Err(e) => {
Err(io::Error::new( info!("Error in request: {:?}", e);
io::ErrorKind::Other, 0u16.to_le_bytes().to_vec()
format!("Faucet response: {:?}", err), }
)) };
})) stream.write_all(&response).await?;
.then(|_| Ok(())); }
tokio::spawn(server)
}); Ok(())
tokio::run(done);
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bytes::BufMut;
use solana_sdk::system_instruction::SystemInstruction; use solana_sdk::system_instruction::SystemInstruction;
use std::time::Duration; use std::time::Duration;
@ -446,8 +459,6 @@ mod tests {
to, to,
}; };
let req = serialize(&req).unwrap(); let req = serialize(&req).unwrap();
let mut bytes = BytesMut::with_capacity(req.len());
bytes.put(&req[..]);
let keypair = Keypair::new(); let keypair = Keypair::new();
let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports); let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports);
@ -459,12 +470,11 @@ mod tests {
expected_vec_with_length.extend_from_slice(&expected_bytes); expected_vec_with_length.extend_from_slice(&expected_bytes);
let mut faucet = Faucet::new(keypair, None, None, None); let mut faucet = Faucet::new(keypair, None, None, None);
let response = faucet.process_faucet_request(&bytes); let response = faucet.process_faucet_request(&req);
let response_vec = response.unwrap().to_vec(); let response_vec = response.unwrap().to_vec();
assert_eq!(expected_vec_with_length, response_vec); assert_eq!(expected_vec_with_length, response_vec);
let mut bad_bytes = BytesMut::with_capacity(9); let bad_bytes = "bad bytes".as_bytes();
bad_bytes.put("bad bytes");
assert!(faucet.process_faucet_request(&bad_bytes).is_err()); assert!(faucet.process_faucet_request(&bad_bytes).is_err());
} }
} }