solana/src/packet.rs

553 lines
17 KiB
Rust
Raw Normal View History

2018-04-17 19:46:50 -07:00
//! The `packet` module defines data structures and methods to pull data from the network.
2018-04-28 00:31:20 -07:00
use bincode::{deserialize, serialize};
2018-03-26 21:07:11 -07:00
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
2018-05-30 21:24:21 -07:00
use counter::Counter;
2018-03-26 21:07:11 -07:00
use result::{Error, Result};
use serde::Serialize;
2018-04-28 00:31:20 -07:00
use signature::PublicKey;
2018-03-26 21:07:11 -07:00
use std::collections::VecDeque;
use std::fmt;
use std::io;
2018-05-02 09:24:25 -07:00
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
2018-05-30 21:24:21 -07:00
use std::sync::atomic::AtomicUsize;
2018-03-26 21:07:11 -07:00
use std::sync::{Arc, Mutex, RwLock};
2018-05-30 21:24:21 -07:00
use std::time::Instant;
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type SharedBlob = Arc<RwLock<Blob>>;
pub type PacketRecycler = Recycler<Packets>;
pub type BlobRecycler = Recycler<Blob>;
2018-04-11 17:30:53 -07:00
pub const NUM_PACKETS: usize = 1024 * 8;
2018-04-28 00:31:20 -07:00
pub const BLOB_SIZE: usize = 64 * 1024;
2018-06-14 16:11:15 -07:00
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
2018-03-26 21:07:11 -07:00
pub const PACKET_DATA_SIZE: usize = 256;
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
#[derive(Clone, Default)]
2018-03-26 21:07:11 -07:00
#[repr(C)]
pub struct Meta {
pub size: usize,
pub num_retransmits: u64,
pub addr: [u16; 8],
pub port: u16,
pub v6: bool,
}
#[derive(Clone)]
2018-03-26 21:07:11 -07:00
#[repr(C)]
pub struct Packet {
2018-03-26 21:07:11 -07:00
pub data: [u8; PACKET_DATA_SIZE],
pub meta: Meta,
}
impl fmt::Debug for Packet {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Packet {{ size: {:?}, addr: {:?} }}",
self.meta.size,
self.meta.addr()
)
}
}
impl Default for Packet {
fn default() -> Packet {
Packet {
2018-03-26 21:07:11 -07:00
data: [0u8; PACKET_DATA_SIZE],
meta: Meta::default(),
}
}
}
impl Meta {
pub fn addr(&self) -> SocketAddr {
if !self.v6 {
let addr = [
self.addr[0] as u8,
self.addr[1] as u8,
self.addr[2] as u8,
self.addr[3] as u8,
];
let ipv4: Ipv4Addr = From::<[u8; 4]>::from(addr);
SocketAddr::new(IpAddr::V4(ipv4), self.port)
} else {
let ipv6: Ipv6Addr = From::<[u16; 8]>::from(self.addr);
SocketAddr::new(IpAddr::V6(ipv6), self.port)
}
}
pub fn set_addr(&mut self, a: &SocketAddr) {
match *a {
SocketAddr::V4(v4) => {
let ip = v4.ip().octets();
self.addr[0] = u16::from(ip[0]);
self.addr[1] = u16::from(ip[1]);
self.addr[2] = u16::from(ip[2]);
self.addr[3] = u16::from(ip[3]);
self.port = a.port();
}
SocketAddr::V6(v6) => {
self.addr = v6.ip().segments();
self.port = a.port();
self.v6 = true;
}
}
}
}
#[derive(Debug)]
pub struct Packets {
pub packets: Vec<Packet>,
}
//auto derive doesn't support large arrays
impl Default for Packets {
fn default() -> Packets {
Packets {
packets: vec![Packet::default(); NUM_PACKETS],
}
}
}
#[derive(Clone)]
pub struct Blob {
pub data: [u8; BLOB_SIZE],
pub meta: Meta,
}
impl fmt::Debug for Blob {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Blob {{ size: {:?}, addr: {:?} }}",
self.meta.size,
self.meta.addr()
)
}
}
//auto derive doesn't support large arrays
impl Default for Blob {
fn default() -> Blob {
Blob {
data: [0u8; BLOB_SIZE],
meta: Meta::default(),
}
}
}
pub struct Recycler<T> {
gc: Arc<Mutex<Vec<Arc<RwLock<T>>>>>,
}
impl<T: Default> Default for Recycler<T> {
fn default() -> Recycler<T> {
Recycler {
gc: Arc::new(Mutex::new(vec![])),
}
}
}
impl<T: Default> Clone for Recycler<T> {
fn clone(&self) -> Recycler<T> {
Recycler {
gc: self.gc.clone(),
}
}
}
impl<T: Default> Recycler<T> {
pub fn allocate(&self) -> Arc<RwLock<T>> {
2018-05-10 17:11:31 -07:00
let mut gc = self.gc.lock().expect("recycler lock in pb fn allocate");
gc.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())))
}
pub fn recycle(&self, msgs: Arc<RwLock<T>>) {
assert_eq!(Arc::strong_count(&msgs), 1); // Ensure this function holds that last reference.
2018-05-10 17:11:31 -07:00
let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
gc.push(msgs);
}
}
impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
2018-05-30 21:24:21 -07:00
static mut COUNTER: Counter = create_counter!("packets", 10);
self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
2018-05-25 22:00:47 -07:00
// * block on the socket until it's readable
// * set the socket to non blocking
// * read until it fails
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
2018-05-30 21:24:21 -07:00
let mut start = Instant::now();
for p in &mut self.packets {
p.meta.size = 0;
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
2018-05-30 21:24:21 -07:00
inc_counter!(COUNTER, i, start);
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
2018-05-06 22:25:05 -07:00
break;
}
Err(e) => {
trace!("recv_from err {:?}", e);
return Err(Error::IO(e));
}
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
if i == 0 {
2018-05-30 21:24:21 -07:00
start = Instant::now();
socket.set_nonblocking(true)?;
}
}
}
i += 1;
}
Ok(i)
}
pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> {
let sz = self.run_read_from(socket)?;
self.packets.resize(sz, Packet::default());
debug!("recv_from: {}", sz);
Ok(())
}
pub fn send_to(&self, socket: &UdpSocket) -> Result<()> {
for p in &self.packets {
let a = p.meta.addr();
socket.send_to(&p.data[..p.meta.size], &a)?;
}
Ok(())
}
}
2018-06-08 10:09:45 -07:00
pub fn to_packets_chunked<T: Serialize>(
r: &PacketRecycler,
xs: Vec<T>,
chunks: usize,
) -> Vec<SharedPackets> {
let mut out = vec![];
2018-06-08 10:09:45 -07:00
for x in xs.chunks(chunks) {
let p = r.allocate();
p.write()
.unwrap()
.packets
.resize(x.len(), Default::default());
for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) {
let v = serialize(&i).expect("serialize request");
let len = v.len();
o.data[..len].copy_from_slice(&v);
o.meta.size = len;
}
out.push(p);
}
return out;
}
2018-06-08 10:09:45 -07:00
pub fn to_packets<T: Serialize>(r: &PacketRecycler, xs: Vec<T>) -> Vec<SharedPackets> {
to_packets_chunked(r, xs, NUM_PACKETS)
}
pub fn to_blob<T: Serialize>(
resp: T,
rsp_addr: SocketAddr,
blob_recycler: &BlobRecycler,
) -> Result<SharedBlob> {
let blob = blob_recycler.allocate();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
assert!(len < BLOB_SIZE);
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
}
Ok(blob)
}
pub fn to_blobs<T: Serialize>(
rsps: Vec<(T, SocketAddr)>,
blob_recycler: &BlobRecycler,
) -> Result<VecDeque<SharedBlob>> {
let mut blobs = VecDeque::new();
for (resp, rsp_addr) in rsps {
blobs.push_back(to_blob(resp, rsp_addr, blob_recycler)?);
}
Ok(blobs)
}
2018-04-28 00:31:20 -07:00
const BLOB_INDEX_END: usize = size_of::<u64>();
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<usize>() + size_of::<PublicKey>();
2018-05-31 11:21:07 -07:00
const BLOB_FLAGS_END: usize = BLOB_ID_END + size_of::<u32>();
const BLOB_SIZE_END: usize = BLOB_FLAGS_END + size_of::<u64>();
macro_rules! align {
2018-06-05 12:24:39 -07:00
($x:expr, $align:expr) => {
$x + ($align - 1) & !($align - 1)
};
}
2018-05-31 11:21:07 -07:00
pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
pub const BLOB_HEADER_SIZE: usize = align!(BLOB_SIZE_END, 64);
impl Blob {
pub fn get_index(&self) -> Result<u64> {
2018-04-28 00:31:20 -07:00
let mut rdr = io::Cursor::new(&self.data[0..BLOB_INDEX_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
}
pub fn set_index(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
2018-04-28 00:31:20 -07:00
self.data[..BLOB_INDEX_END].clone_from_slice(&wtr);
Ok(())
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use ffor spam filtering
2018-04-28 00:31:20 -07:00
pub fn get_id(&self) -> Result<PublicKey> {
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
Ok(e)
}
2018-04-28 00:31:20 -07:00
pub fn set_id(&mut self, id: PublicKey) -> Result<()> {
let wtr = serialize(&id)?;
self.data[BLOB_INDEX_END..BLOB_ID_END].clone_from_slice(&wtr);
Ok(())
}
2018-05-31 11:21:07 -07:00
pub fn get_flags(&self) -> Result<u32> {
let mut rdr = io::Cursor::new(&self.data[BLOB_ID_END..BLOB_FLAGS_END]);
let r = rdr.read_u32::<LittleEndian>()?;
Ok(r)
}
pub fn set_flags(&mut self, ix: u32) -> Result<()> {
let mut wtr = vec![];
wtr.write_u32::<LittleEndian>(ix)?;
self.data[BLOB_ID_END..BLOB_FLAGS_END].clone_from_slice(&wtr);
Ok(())
}
pub fn is_coding(&self) -> bool {
return (self.get_flags().unwrap() & BLOB_FLAG_IS_CODING) != 0;
}
pub fn set_coding(&mut self) -> Result<()> {
let flags = self.get_flags().unwrap();
self.set_flags(flags | BLOB_FLAG_IS_CODING)
}
pub fn get_data_size(&self) -> Result<u64> {
let mut rdr = io::Cursor::new(&self.data[BLOB_FLAGS_END..BLOB_SIZE_END]);
let r = rdr.read_u64::<LittleEndian>()?;
Ok(r)
}
pub fn set_data_size(&mut self, ix: u64) -> Result<()> {
let mut wtr = vec![];
wtr.write_u64::<LittleEndian>(ix)?;
self.data[BLOB_FLAGS_END..BLOB_SIZE_END].clone_from_slice(&wtr);
Ok(())
}
pub fn data(&self) -> &[u8] {
&self.data[BLOB_HEADER_SIZE..]
}
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[BLOB_HEADER_SIZE..]
}
pub fn set_size(&mut self, size: usize) {
let new_size = size + BLOB_HEADER_SIZE;
self.meta.size = new_size;
self.set_data_size(new_size as u64).unwrap();
}
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
let mut v = VecDeque::new();
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
2018-05-25 22:00:47 -07:00
// * block on the socket until it's readable
// * set the socket to non blocking
// * read until it fails
// * set it back to blocking before returning
socket.set_nonblocking(false)?;
for i in 0..NUM_BLOBS {
let r = re.allocate();
{
2018-05-10 17:11:31 -07:00
let mut p = r.write().expect("'r' write lock in pub fn recv_from");
trace!("receiving on {}", socket.local_addr().unwrap());
match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => {
trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
break;
}
Err(e) => {
2018-05-12 19:00:22 -07:00
if e.kind() != io::ErrorKind::WouldBlock {
info!("recv_from err {:?}", e);
}
return Err(Error::IO(e));
}
Ok((nrecv, from)) => {
p.meta.size = nrecv;
p.meta.set_addr(&from);
if i == 0 {
socket.set_nonblocking(true)?;
}
}
}
}
v.push_back(r);
}
Ok(v)
}
pub fn send_to(
re: &BlobRecycler,
socket: &UdpSocket,
v: &mut VecDeque<SharedBlob>,
) -> Result<()> {
while let Some(r) = v.pop_front() {
{
2018-05-10 17:11:31 -07:00
let p = r.read().expect("'r' read lock in pub fn send_to");
let a = p.meta.addr();
socket.send_to(&p.data[..p.meta.size], &a)?;
}
re.recycle(r);
}
Ok(())
}
}
#[cfg(test)]
mod test {
use packet::{to_packets, Blob, BlobRecycler, Packet, PacketRecycler, Packets, NUM_PACKETS};
use request::Request;
2018-03-26 21:07:11 -07:00
use std::collections::VecDeque;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
#[test]
pub fn packet_recycler_test() {
let r = PacketRecycler::default();
let p = r.allocate();
r.recycle(p);
assert_eq!(r.gc.lock().unwrap().len(), 1);
let _ = r.allocate();
assert_eq!(r.gc.lock().unwrap().len(), 0);
}
#[test]
pub fn blob_recycler_test() {
let r = BlobRecycler::default();
let p = r.allocate();
r.recycle(p);
assert_eq!(r.gc.lock().unwrap().len(), 1);
let _ = r.allocate();
assert_eq!(r.gc.lock().unwrap().len(), 0);
}
#[test]
pub fn packet_send_recv() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let saddr = sender.local_addr().unwrap();
let r = PacketRecycler::default();
let p = r.allocate();
p.write().unwrap().packets.resize(10, Packet::default());
for m in p.write().unwrap().packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = 256;
}
p.read().unwrap().send_to(&sender).unwrap();
p.write().unwrap().recv_from(&reader).unwrap();
for m in p.write().unwrap().packets.iter_mut() {
assert_eq!(m.meta.size, 256);
assert_eq!(m.meta.addr(), saddr);
}
r.recycle(p);
}
#[test]
fn test_to_packets() {
2018-05-25 15:05:37 -07:00
let tx = Request::GetTransactionCount;
let re = PacketRecycler::default();
2018-05-25 15:05:37 -07:00
let rv = to_packets(&re, vec![tx.clone(); 1]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
2018-05-25 15:05:37 -07:00
let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS]);
assert_eq!(rv.len(), 1);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
2018-05-25 15:05:37 -07:00
let rv = to_packets(&re, vec![tx.clone(); NUM_PACKETS + 1]);
assert_eq!(rv.len(), 2);
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
}
#[test]
pub fn blob_send_recv() {
trace!("start");
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let r = BlobRecycler::default();
let p = r.allocate();
p.write().unwrap().meta.set_addr(&addr);
p.write().unwrap().meta.size = 1024;
let mut v = VecDeque::new();
v.push_back(p);
assert_eq!(v.len(), 1);
Blob::send_to(&r, &sender, &mut v).unwrap();
trace!("send_to");
assert_eq!(v.len(), 0);
let mut rv = Blob::recv_from(&r, &reader).unwrap();
trace!("recv_from");
assert_eq!(rv.len(), 1);
let rp = rv.pop_front().unwrap();
assert_eq!(rp.write().unwrap().meta.size, 1024);
r.recycle(rp);
}
#[cfg(all(feature = "ipv6", test))]
#[test]
pub fn blob_ipv6_send_recv() {
let reader = UdpSocket::bind("[::1]:0").expect("bind");
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("[::1]:0").expect("bind");
let r = BlobRecycler::default();
let p = r.allocate();
p.write().unwrap().meta.set_addr(&addr);
p.write().unwrap().meta.size = 1024;
let mut v = VecDeque::default();
v.push_back(p);
Blob::send_to(&r, &sender, &mut v).unwrap();
let mut rv = Blob::recv_from(&r, &reader).unwrap();
let rp = rv.pop_front().unwrap();
assert_eq!(rp.write().unwrap().meta.size, 1024);
r.recycle(rp);
}
#[test]
pub fn debug_trait() {
write!(io::sink(), "{:?}", Packet::default()).unwrap();
write!(io::sink(), "{:?}", Packets::default()).unwrap();
write!(io::sink(), "{:?}", Blob::default()).unwrap();
}
#[test]
pub fn blob_test() {
let mut b = Blob::default();
b.set_index(<u64>::max_value()).unwrap();
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
b.data_mut()[0] = 1;
assert_eq!(b.data()[0], 1);
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
}
}