diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 199f7d7ab..5d4d603e6 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -6,8 +6,9 @@ use crate::cluster_info::ClusterInfo; use crate::entry::Entry; use crate::leader_confirmation_service::LeaderConfirmationService; use crate::leader_schedule_utils; +use crate::packet; use crate::packet::SharedPackets; -use crate::packet::{Blob, Packets}; +use crate::packet::{Packet, Packets}; use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries}; use crate::poh_service::{PohService, PohServiceConfig}; use crate::result::{Error, Result}; @@ -79,23 +80,17 @@ impl BankingStage { forwarder: &std::net::SocketAddr, unprocessed_packets: &[(SharedPackets, usize)], ) -> std::io::Result<()> { - let mut blob = Blob::default(); - for (packets, start_index) in unprocessed_packets { - let packets = packets.read().unwrap(); - let mut current_index = *start_index; - while current_index < packets.packets.len() { - current_index += blob.store_packets(&packets.packets[current_index..]) as usize; - if current_index < packets.packets.len() { - // Blob is full, send it - socket.send_to(&blob.data[..blob.meta.size], forwarder)?; - blob = Blob::default(); - } else { - break; - } - } - } + let locked_packets: Vec<_> = unprocessed_packets + .iter() + .map(|(p, start_index)| (p.read().unwrap(), start_index)) + .collect(); + let packets: Vec<&Packet> = locked_packets + .iter() + .flat_map(|(p, start_index)| &p.packets[**start_index..]) + .collect(); + let blobs = packet::packets_to_blobs(&packets[..]); - if blob.size() > 0 { + for blob in blobs { socket.send_to(&blob.data[..blob.meta.size], forwarder)?; } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 74880c136..d4e88cfaa 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1273,7 +1273,7 @@ impl ClusterInfo { let daddr = socketaddr_any!(); let node = ContactInfo::new( - *id, + id, daddr, daddr, daddr, diff --git a/core/src/cluster_tests.rs b/core/src/cluster_tests.rs index 1e48624a4..214cc831d 100644 --- a/core/src/cluster_tests.rs +++ b/core/src/cluster_tests.rs @@ -58,7 +58,7 @@ pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num assert!(bal > 0); let mut transaction = SystemTransaction::new_move( &funding_keypair, - random_keypair.pubkey(), + &random_keypair.pubkey(), 1, client.get_recent_blockhash(), 0, diff --git a/core/src/packet.rs b/core/src/packet.rs index 4902ea9e2..b1e7ada4c 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -1,12 +1,13 @@ //! The `packet` module defines data structures and methods to pull data from the network. use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; use crate::result::{Error, Result}; -use bincode::{serialize, serialize_into}; +use bincode; use byteorder::{ByteOrder, LittleEndian}; use serde::Serialize; use solana_metrics::counter::Counter; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; +use std::borrow::Borrow; use std::cmp; use std::fmt; use std::io; @@ -243,7 +244,7 @@ pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec(xs: &[T]) -> Vec { pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { let mut b = Blob::default(); - let v = serialize(&resp)?; + let v = bincode::serialize(&resp)?; let len = v.len(); assert!(len <= BLOB_SIZE); b.data[..len].copy_from_slice(&v); @@ -288,6 +289,44 @@ pub fn to_shared_blobs(rsps: Vec<(T, SocketAddr)>) -> Result>(packets: &[T]) -> Vec { + let mut current_index = 0; + let mut blobs = vec![]; + while current_index < packets.len() { + let mut blob = Blob::default(); + current_index += blob.store_packets(&packets[current_index..]) as usize; + blobs.push(blob); + } + + blobs +} + +pub fn deserialize_packets_in_blob( + data: &[u8], + serialized_packet_size: usize, + serialized_meta_size: usize, +) -> Result> { + let mut packets: Vec = Vec::with_capacity(data.len() / serialized_packet_size); + let mut pos = 0; + while pos + serialized_packet_size <= data.len() { + let packet = deserialize_single_packet_in_blob( + &data[pos..pos + serialized_packet_size], + serialized_meta_size, + )?; + pos += serialized_packet_size; + packets.push(packet); + } + Ok(packets) +} + +fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result { + let meta = bincode::deserialize(&data[..serialized_meta_size])?; + let mut packet_data = [0; PACKET_DATA_SIZE]; + packet_data + .copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]); + Ok(Packet::new(packet_data, meta)) +} + macro_rules! range { ($prev:expr, $type:ident) => { $prev..$prev + size_of::<$type>() @@ -419,16 +458,16 @@ impl Blob { self.set_data_size(new_size as u64); } - pub fn store_packets(&mut self, packets: &[Packet]) -> u64 { + pub fn store_packets>(&mut self, packets: &[T]) -> u64 { let size = self.size(); let mut cursor = Cursor::new(&mut self.data_mut()[size..]); let mut written = 0; let mut last_index = 0; for packet in packets { - if serialize_into(&mut cursor, &packet.meta).is_err() { + if bincode::serialize_into(&mut cursor, &packet.borrow().meta).is_err() { break; } - if cursor.write_all(&packet.data[..]).is_err() { + if cursor.write_all(&packet.borrow().data[..]).is_err() { break; } @@ -518,12 +557,9 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: #[cfg(test)] mod tests { - use crate::packet::{ - to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS, - PACKET_DATA_SIZE, - }; - use crate::packet::{BLOB_HEADER_SIZE, BLOB_SIZE}; - use bincode::serialized_size; + use super::*; + use bincode; + use rand::Rng; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; @@ -642,7 +678,7 @@ mod tests { #[test] fn test_store_blobs_max() { let meta = Meta::default(); - let serialized_meta_size = serialized_size(&meta).unwrap() as usize; + let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize; let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1; let mut blob = Blob::default(); @@ -664,4 +700,78 @@ mod tests { // Blob is now full assert_eq!(blob.store_packets(&packets), 0); } + + #[test] + fn test_packets_to_blobs() { + let mut rng = rand::thread_rng(); + let meta = Meta::default(); + let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize; + let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; + + let packets_per_blob = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size; + assert!(packets_per_blob > 1); + let num_packets = packets_per_blob * 10 + packets_per_blob - 1; + + let packets: Vec<_> = (0..num_packets) + .map(|_| { + let mut packet = Packet::default(); + for i in 0..packet.meta.addr.len() { + packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX); + } + for i in 0..packet.data.len() { + packet.data[i] = rng.gen_range(1, std::u8::MAX); + } + packet + }) + .collect(); + + let blobs = packets_to_blobs(&packets[..]); + assert_eq!(blobs.len(), 11); + + let reconstructed_packets: Vec = blobs + .iter() + .flat_map(|b| { + deserialize_packets_in_blob( + &b.data()[..b.size()], + serialized_packet_size, + serialized_meta_size, + ) + .unwrap() + }) + .collect(); + + assert_eq!(reconstructed_packets, packets); + } + + #[test] + fn test_deserialize_packets_in_blob() { + let meta = Meta::default(); + let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize; + let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; + let num_packets = 10; + let mut rng = rand::thread_rng(); + let packets: Vec<_> = (0..num_packets) + .map(|_| { + let mut packet = Packet::default(); + for i in 0..packet.meta.addr.len() { + packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX); + } + for i in 0..packet.data.len() { + packet.data[i] = rng.gen_range(1, std::u8::MAX); + } + packet + }) + .collect(); + + let mut blob = Blob::default(); + assert_eq!(blob.store_packets(&packets[..]), num_packets); + let result = deserialize_packets_in_blob( + &blob.data()[..blob.size()], + serialized_packet_size, + serialized_meta_size, + ) + .unwrap(); + + assert_eq!(result, packets); + } } diff --git a/core/src/streamer.rs b/core/src/streamer.rs index fea8df4e9..3ee2b070c 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,9 +1,11 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{Blob, Meta, Packet, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE}; +use crate::packet::{ + deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE, +}; use crate::result::{Error, Result}; -use bincode::{deserialize, serialized_size}; +use bincode; use solana_metrics::{influxdb, submit}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -140,32 +142,6 @@ pub fn blob_receiver( .unwrap() } -fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result { - let meta = deserialize(&data[..serialized_meta_size])?; - let mut packet_data = [0; PACKET_DATA_SIZE]; - packet_data - .copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]); - Ok(Packet::new(packet_data, meta)) -} - -fn deserialize_packets_in_blob( - data: &[u8], - serialized_packet_size: usize, - serialized_meta_size: usize, -) -> Result> { - let mut packets: Vec = Vec::with_capacity(data.len() / serialized_packet_size); - let mut pos = 0; - while pos + serialized_packet_size <= data.len() { - let packet = deserialize_single_packet_in_blob( - &data[pos..pos + serialized_packet_size], - serialized_meta_size, - )?; - pos += serialized_packet_size; - packets.push(packet); - } - Ok(packets) -} - fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { trace!( "recv_blob_packets: receiving on {}", @@ -173,7 +149,7 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { ); let meta = Meta::default(); - let serialized_meta_size = serialized_size(&meta)? as usize; + let serialized_meta_size = bincode::serialized_size(&meta)? as usize; let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; let blobs = Blob::recv_from(sock)?; for blob in blobs { @@ -222,9 +198,8 @@ pub fn blob_packet_receiver( #[cfg(test)] mod test { use super::*; - use crate::packet::{Blob, Meta, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; + use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder}; - use rand::Rng; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -286,36 +261,4 @@ mod test { t_receiver.join().expect("join"); t_responder.join().expect("join"); } - - #[test] - fn test_streamer_deserialize_packets_in_blob() { - let meta = Meta::default(); - let serialized_meta_size = serialized_size(&meta).unwrap() as usize; - let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE; - let num_packets = 10; - let mut rng = rand::thread_rng(); - let packets: Vec<_> = (0..num_packets) - .map(|_| { - let mut packet = Packet::default(); - for i in 0..packet.meta.addr.len() { - packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX); - } - for i in 0..packet.data.len() { - packet.data[i] = rng.gen_range(1, std::u8::MAX); - } - packet - }) - .collect(); - - let mut blob = Blob::default(); - assert_eq!(blob.store_packets(&packets[..]), num_packets); - let result = deserialize_packets_in_blob( - &blob.data()[..blob.size()], - serialized_packet_size, - serialized_meta_size, - ) - .unwrap(); - - assert_eq!(result, packets); - } }