From f108f483b750dcc9b2555ee68ca667b110a1fe6a Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 14 Nov 2019 10:24:53 -0800 Subject: [PATCH] Remove Blobs and switch to Packets (#6937) * Remove Blobs and switch to Packets * Fix some gossip messages not respecting MTU size * Failure to serialize is not fatal * Add log macros * Remove unused extern * Apparently macro use is required * Explicitly scope macro * Fix test compile --- bench-streamer/src/main.rs | 3 +- core/src/archiver.rs | 8 +- core/src/blob.rs | 526 ----------------------- core/src/cluster_info.rs | 282 +++++++----- core/src/cluster_info_repair_listener.rs | 39 +- core/src/crds_gossip_push.rs | 5 +- core/src/gossip_service.rs | 9 +- core/src/lib.rs | 1 - core/src/streamer.rs | 61 +-- core/src/window_service.rs | 2 +- core/tests/crds_gossip.rs | 4 +- perf/src/packet.rs | 22 +- sdk/src/lib.rs | 2 + sdk/src/packet.rs | 27 +- 14 files changed, 279 insertions(+), 712 deletions(-) delete mode 100644 core/src/blob.rs diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index f7f33360c..0285be1b1 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -1,5 +1,4 @@ use clap::{crate_description, crate_name, App, Arg}; -use solana_core::blob::BLOB_SIZE; use solana_core::packet::{Packet, Packets, PacketsRecycler, PACKET_DATA_SIZE}; use solana_core::result::Result; use solana_core::streamer::{receiver, PacketReceiver}; @@ -29,7 +28,7 @@ fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { let mut num = 0; for p in &msgs.packets { let a = p.meta.addr(); - assert!(p.meta.size < BLOB_SIZE); + assert!(p.meta.size < PACKET_DATA_SIZE); send.send_to(&p.data[..p.meta.size], &a).unwrap(); num += 1; } diff --git a/core/src/archiver.rs b/core/src/archiver.rs index 8e2096781..df47fb5c9 100644 --- a/core/src/archiver.rs +++ b/core/src/archiver.rs @@ -1,5 +1,4 @@ use crate::{ - blob::to_shared_blob, chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}, cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, contact_info::ContactInfo, @@ -23,7 +22,9 @@ use solana_ledger::{ blocktree::Blocktree, leader_schedule_cache::LeaderScheduleCache, shred::Shred, }; use solana_net_utils::bind_in_range; +use solana_perf::packet::Packets; use solana_perf::recycler::Recycler; +use solana_sdk::packet::Packet; use solana_sdk::{ account_utils::State, client::{AsyncClient, SyncClient}, @@ -165,9 +166,8 @@ fn create_request_processor( limited_deserialize(&packet.data[..packet.meta.size]); match req { Ok(ArchiverRequest::GetSlotHeight(from)) => { - if let Ok(blob) = to_shared_blob(slot, from) { - let _ = s_responder.send(vec![blob]); - } + let packet = Packet::from_data(&from, slot); + let _ = s_responder.send(Packets::new(vec![packet])); } Err(e) => { info!("invalid request: {:?}", e); diff --git a/core/src/blob.rs b/core/src/blob.rs deleted file mode 100644 index 68068f9d9..000000000 --- a/core/src/blob.rs +++ /dev/null @@ -1,526 +0,0 @@ -//! The `packet` module defines data structures and methods to pull data from the network. -use crate::{ - packet::NUM_PACKETS, - result::{Error, Result}, -}; -use bincode; -use byteorder::{ByteOrder, LittleEndian}; -use serde::Serialize; -use solana_ledger::erasure::ErasureConfig; -pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; -use solana_sdk::{ - clock::Slot, - pubkey::Pubkey, - signature::{Signable, Signature}, -}; -use std::{ - borrow::Cow, - cmp, fmt, io, - io::Cursor, - mem::size_of, - net::{SocketAddr, UdpSocket}, - ops::{Deref, DerefMut}, - sync::{Arc, RwLock}, -}; - -pub type SharedBlob = Arc>; -pub type SharedBlobs = Vec; - -pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers -pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2); -pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers -pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; - -#[repr(align(16))] // 16 === BLOB_DATA_ALIGN -pub struct BlobData { - pub data: [u8; BLOB_SIZE], -} - -impl Clone for BlobData { - fn clone(&self) -> Self { - BlobData { data: self.data } - } -} - -impl Default for BlobData { - fn default() -> Self { - BlobData { - data: [0u8; BLOB_SIZE], - } - } -} - -impl PartialEq for BlobData { - fn eq(&self, other: &BlobData) -> bool { - let self_data: &[u8] = self.data.as_ref(); - let other_data: &[u8] = other.data.as_ref(); - self_data == other_data - } -} - -// this code hides _data, maps it to _data.data -impl Deref for Blob { - type Target = BlobData; - - fn deref(&self) -> &Self::Target { - &self._data - } -} -impl DerefMut for Blob { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self._data - } -} - -#[derive(Clone, Default, PartialEq)] -pub struct Blob { - _data: BlobData, // hidden member, passed through by Deref - pub meta: Meta, -} - -impl fmt::Debug for Blob { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Blob {{ size: {:?}, addr: {:?} }}", - self.meta.size, - self.meta.addr() - ) - } -} - -pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { - let mut b = Blob::default(); - let v = bincode::serialize(&resp)?; - let len = v.len(); - if len > BLOB_SIZE { - return Err(Error::ToBlobError); - } - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - Ok(b) -} - -pub fn to_blobs(rsps: Vec<(T, SocketAddr)>) -> Result> { - let mut blobs = Vec::new(); - for (resp, rsp_addr) in rsps { - blobs.push(to_blob(resp, rsp_addr)?); - } - Ok(blobs) -} - -pub fn to_shared_blob(resp: T, rsp_addr: SocketAddr) -> Result { - let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?)); - Ok(blob) -} - -pub fn to_shared_blobs(rsps: Vec<(T, SocketAddr)>) -> Result { - let mut blobs = Vec::new(); - for (resp, rsp_addr) in rsps { - blobs.push(to_shared_blob(resp, rsp_addr)?); - } - Ok(blobs) -} - -macro_rules! range { - ($prev:expr, $type:ident) => { - $prev..$prev + size_of::<$type>() - }; -} - -const SIGNATURE_RANGE: std::ops::Range = range!(0, Signature); -const FORWARDED_RANGE: std::ops::Range = range!(SIGNATURE_RANGE.end, bool); -const PARENT_RANGE: std::ops::Range = range!(FORWARDED_RANGE.end, u64); -const VERSION_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); -const SLOT_RANGE: std::ops::Range = range!(VERSION_RANGE.end, u64); -const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); -const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); -const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); -const ERASURE_CONFIG_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, ErasureConfig); -const SIZE_RANGE: std::ops::Range = range!(ERASURE_CONFIG_RANGE.end, u64); - -macro_rules! align { - ($x:expr, $align:expr) => { - $x + ($align - 1) & !($align - 1) - }; -} - -pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure -pub const SIGNABLE_START: usize = PARENT_RANGE.start; - -pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; - -pub const BLOB_FLAG_IS_CODING: u32 = 0x1; - -impl Blob { - pub fn new(data: &[u8]) -> Self { - let mut blob = Self::default(); - - assert!(data.len() <= blob.data.len()); - - let data_len = cmp::min(data.len(), blob.data.len()); - - let bytes = &data[..data_len]; - blob.data[..data_len].copy_from_slice(bytes); - blob.meta.size = data_len; - blob - } - - pub fn from_serializable(data: &T) -> Self { - let mut blob = Self::default(); - let pos = { - let mut out = Cursor::new(blob.data_mut()); - bincode::serialize_into(&mut out, data).expect("failed to serialize output"); - out.position() as usize - }; - blob.set_size(pos); - blob.set_erasure_config(&ErasureConfig::default()); - blob - } - - pub fn parent(&self) -> u64 { - LittleEndian::read_u64(&self.data[PARENT_RANGE]) - } - pub fn set_parent(&mut self, ix: u64) { - LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix); - } - pub fn version(&self) -> u64 { - LittleEndian::read_u64(&self.data[VERSION_RANGE]) - } - pub fn set_version(&mut self, version: u64) { - LittleEndian::write_u64(&mut self.data[VERSION_RANGE], version); - } - pub fn slot(&self) -> u64 { - LittleEndian::read_u64(&self.data[SLOT_RANGE]) - } - pub fn set_slot(&mut self, ix: u64) { - LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix); - } - pub fn index(&self) -> u64 { - LittleEndian::read_u64(&self.data[INDEX_RANGE]) - } - pub fn set_index(&mut self, ix: u64) { - LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix); - } - - pub fn set_erasure_config(&mut self, config: &ErasureConfig) { - self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap()) - } - - pub fn erasure_config(&self) -> ErasureConfig { - bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default() - } - - pub fn seed(&self) -> [u8; 32] { - let mut seed = [0; 32]; - let seed_len = seed.len(); - let signature_bytes = self.get_signature_bytes(); - seed[0..seed_len].copy_from_slice(&signature_bytes[(signature_bytes.len() - seed_len)..]); - seed - } - - /// sender id, we use this for identifying if its a blob from the leader that we should - /// retransmit. eventually blobs should have a signature that we can use for spam filtering - pub fn id(&self) -> Pubkey { - Pubkey::new(&self.data[ID_RANGE]) - } - - pub fn set_id(&mut self, id: &Pubkey) { - self.data[ID_RANGE].copy_from_slice(id.as_ref()) - } - - /// Used to determine whether or not this blob should be forwarded in retransmit - /// A bool is used here instead of a flag because this item is not intended to be signed when - /// blob signatures are introduced - pub fn should_forward(&self) -> bool { - self.data[FORWARDED_RANGE][0] & 0x1 == 0 - } - - /// Mark this blob's forwarded status - pub fn set_forwarded(&mut self, forward: bool) { - self.data[FORWARDED_RANGE][0] = u8::from(forward) - } - - pub fn flags(&self) -> u32 { - LittleEndian::read_u32(&self.data[FLAGS_RANGE]) - } - pub fn set_flags(&mut self, ix: u32) { - LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix); - } - - pub fn is_coding(&self) -> bool { - (self.flags() & BLOB_FLAG_IS_CODING) != 0 - } - - pub fn set_coding(&mut self) { - let flags = self.flags(); - self.set_flags(flags | BLOB_FLAG_IS_CODING); - } - - pub fn set_is_last_in_slot(&mut self) { - let flags = self.flags(); - self.set_flags(flags | BLOB_FLAG_IS_LAST_IN_SLOT); - } - - pub fn is_last_in_slot(&self) -> bool { - (self.flags() & BLOB_FLAG_IS_LAST_IN_SLOT) != 0 - } - - pub fn data_size(&self) -> u64 { - cmp::min( - LittleEndian::read_u64(&self.data[SIZE_RANGE]), - BLOB_SIZE as u64, - ) - } - - pub fn set_data_size(&mut self, size: u64) { - LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size); - } - - pub fn data(&self) -> &[u8] { - &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] - } - pub fn data_mut(&mut self) -> &mut [u8] { - &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] - } - pub fn size(&self) -> usize { - let size = self.data_size() as usize; - - if size > BLOB_HEADER_SIZE && size == self.meta.size { - size - BLOB_HEADER_SIZE - } else { - 0 - } - } - - pub fn set_size(&mut self, size: usize) { - let new_size = size + BLOB_HEADER_SIZE; - self.meta.size = new_size; - self.set_data_size(new_size as u64); - } - - pub fn get_signature_bytes(&self) -> &[u8] { - &self.data[SIGNATURE_RANGE] - } - - pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { - let mut p = r.write().unwrap(); - trace!("receiving on {}", socket.local_addr().unwrap()); - - let (nrecv, from) = socket.recv_from(&mut p.data)?; - p.meta.size = nrecv; - p.meta.set_addr(&from); - trace!("got {} bytes from {}", nrecv, from); - Ok(()) - } - - pub fn recv_from(socket: &UdpSocket) -> Result { - let mut v = Vec::new(); - //DOCUMENTED SIDE-EFFECT - //Performance out of the IO without poll - // * block on the socket until it's readable - // * set the socket to non blocking - // * read until it fails - // * set it back to blocking before returning - socket.set_nonblocking(false)?; - for i in 0..NUM_BLOBS { - let r = SharedBlob::default(); - - match Blob::recv_blob(socket, &r) { - Err(_) if i > 0 => { - trace!("got {:?} messages on {}", i, socket.local_addr().unwrap()); - break; - } - Err(e) => { - if e.kind() != io::ErrorKind::WouldBlock && e.kind() != io::ErrorKind::TimedOut - { - info!("recv_from err {:?}", e); - } - return Err(Error::IO(e)); - } - Ok(()) => { - if i == 0 { - socket.set_nonblocking(true)?; - } - } - } - v.push(r); - } - Ok(v) - } - pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> { - for r in v { - { - let p = r.read().unwrap(); - let a = p.meta.addr(); - if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) { - warn!( - "error sending {} byte packet to {:?}: {:?}", - p.meta.size, a, e - ); - return Err(e.into()); - } - } - } - Ok(()) - } -} - -impl Signable for Blob { - fn pubkey(&self) -> Pubkey { - self.id() - } - - fn signable_data(&self) -> Cow<[u8]> { - let end = cmp::max(SIGNABLE_START, self.data_size() as usize); - Cow::Borrowed(&self.data[SIGNABLE_START..end]) - } - - fn get_signature(&self) -> Signature { - Signature::new(self.get_signature_bytes()) - } - - fn set_signature(&mut self, signature: Signature) { - self.data[SIGNATURE_RANGE].copy_from_slice(signature.as_ref()) - } -} - -pub fn index_blobs( - blobs: &[SharedBlob], - id: &Pubkey, - mut blob_index: u64, - slot: Slot, - parent: Slot, -) { - // enumerate all the blobs, those are the indices - for blob in blobs.iter() { - let mut blob = blob.write().unwrap(); - blob.set_index(blob_index); - blob.set_slot(slot); - blob.set_parent(parent); - blob.set_id(id); - blob_index += 1; - } -} - -pub fn limited_deserialize(data: &[u8]) -> bincode::Result -where - T: serde::de::DeserializeOwned, -{ - bincode::config().limit(BLOB_SIZE as u64).deserialize(data) -} - -#[cfg(test)] -mod tests { - use super::*; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use std::io; - use std::io::Write; - use std::net::UdpSocket; - - #[test] - pub fn blob_send_recv() { - trace!("start"); - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); - let p = SharedBlob::default(); - p.write().unwrap().meta.set_addr(&addr); - p.write().unwrap().meta.size = 1024; - let v = vec![p]; - Blob::send_to(&sender, v).unwrap(); - trace!("send_to"); - let rv = Blob::recv_from(&reader).unwrap(); - trace!("recv_from"); - assert_eq!(rv.len(), 1); - assert_eq!(rv[0].read().unwrap().meta.size, 1024); - } - - #[cfg(all(feature = "ipv6", test))] - #[test] - pub fn blob_ipv6_send_recv() { - let reader = UdpSocket::bind("[::1]:0").expect("bind"); - let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("[::1]:0").expect("bind"); - let p = SharedBlob::default(); - p.as_mut().unwrap().meta.set_addr(&addr); - p.as_mut().unwrap().meta.size = 1024; - let mut v = VecDeque::default(); - v.push_back(p); - Blob::send_to(&r, &sender, &mut v).unwrap(); - let mut rv = Blob::recv_from(&reader).unwrap(); - let rp = rv.pop_front().unwrap(); - assert_eq!(rp.as_mut().meta.size, 1024); - } - - #[test] - pub fn blob_test() { - let mut b = Blob::default(); - b.set_index(::max_value()); - assert_eq!(b.index(), ::max_value()); - b.data_mut()[0] = 1; - assert_eq!(b.data()[0], 1); - assert_eq!(b.index(), ::max_value()); - assert_eq!(b.meta, Meta::default()); - } - #[test] - fn test_blob_forward() { - let mut b = Blob::default(); - assert!(b.should_forward()); - b.set_forwarded(true); - assert!(!b.should_forward()); - } - - #[test] - fn test_blob_erasure_config() { - let mut b = Blob::default(); - let config = ErasureConfig::new(32, 16); - b.set_erasure_config(&config); - - assert_eq!(config, b.erasure_config()); - } - - #[test] - fn test_blob_data_align() { - assert_eq!(std::mem::align_of::(), BLOB_DATA_ALIGN); - } - - #[test] - fn test_blob_partial_eq() { - let p1 = Blob::default(); - let mut p2 = Blob::default(); - - assert!(p1 == p2); - p2.data[1] = 4; - assert!(p1 != p2); - } - - #[test] - fn test_sign_blob() { - let mut b = Blob::default(); - let k = Keypair::new(); - let p = k.pubkey(); - b.set_id(&p); - b.sign(&k); - assert!(b.verify()); - - // Set a bigger chunk of data to sign - b.set_size(80); - b.sign(&k); - assert!(b.verify()); - } - - #[test] - fn test_version() { - let mut b = Blob::default(); - assert_eq!(b.version(), 0); - b.set_version(1); - assert_eq!(b.version(), 1); - } - - #[test] - pub fn debug_trait() { - write!(io::sink(), "{:?}", Blob::default()).unwrap(); - } -} diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 3eacc2a68..6536d5fd7 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -12,8 +12,9 @@ //! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes. //! //! Bank needs to provide an interface for us to query the stake weight +use crate::packet::limited_deserialize; +use crate::streamer::{PacketReceiver, PacketSender}; use crate::{ - blob::{limited_deserialize, to_shared_blob, Blob, SharedBlob}, contact_info::ContactInfo, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, @@ -23,7 +24,6 @@ use crate::{ repair_service::RepairType, result::{Error, Result}, sendmmsg::{multicast, send_mmsg}, - streamer::{BlobReceiver, BlobSender}, weighted_shuffle::{weighted_best, weighted_shuffle}, }; use bincode::{serialize, serialized_size}; @@ -36,6 +36,7 @@ use solana_net_utils::{ bind_common, bind_common_in_range, bind_in_range, find_available_port_in_range, multi_bind_in_range, PortRange, }; +use solana_perf::packet::{to_packets_with_destination, Packets}; use solana_sdk::{ clock::Slot, pubkey::Pubkey, @@ -64,9 +65,12 @@ pub const GOSSIP_SLEEP_MILLIS: u64 = 100; /// the number of slots to respond with when responding to `Orphan` requests pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; - +/// The maximum size of a bloom filter +pub const MAX_BLOOM_SIZE: usize = 1030; /// The maximum size of a protocol payload -const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64; +const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; +/// The largest protocol header size +const MAX_PROTOCOL_HEADER_SIZE: u64 = 202; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -109,7 +113,7 @@ impl fmt::Debug for Locality { } } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Default, Deserialize, Serialize)] pub struct PruneData { /// Pubkey of the node that sent this prune data pub pubkey: Pubkey, @@ -892,7 +896,7 @@ impl ClusterInfo { return self .gossip .pull - .build_crds_filters(&self.gossip.crds, Self::max_bloom_size()) + .build_crds_filters(&self.gossip.crds, MAX_BLOOM_SIZE) .into_iter() .for_each(|filter| { pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone())) @@ -908,8 +912,8 @@ impl ClusterInfo { fn split_gossip_messages(mut msgs: Vec) -> Vec> { let mut messages = vec![]; while !msgs.is_empty() { - let mut size = 0; let mut payload = vec![]; + let mut size = serialized_size(&payload).expect("Couldn't check size"); while let Some(msg) = msgs.pop() { let msg_size = msg.size(); if size + msg_size > MAX_PROTOCOL_PAYLOAD_SIZE as u64 { @@ -931,24 +935,11 @@ impl ClusterInfo { messages } - // computes the maximum size for pull request blooms - pub fn max_bloom_size() -> usize { - let filter_size = serialized_size(&CrdsFilter::default()) - .expect("unable to serialize default filter") as usize; - let protocol = Protocol::PullRequest( - CrdsFilter::default(), - CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())), - ); - let protocol_size = - serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; - PACKET_DATA_SIZE - (protocol_size - filter_size) - } - fn new_pull_requests(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); let mut pulls: Vec<_> = self .gossip - .new_pull_request(now, stakes, Self::max_bloom_size()) + .new_pull_request(now, stakes, MAX_BLOOM_SIZE) .ok() .into_iter() .filter_map(|(peer, filters, me)| { @@ -1006,14 +997,11 @@ impl ClusterInfo { fn run_gossip( obj: &Arc>, stakes: &HashMap, - blob_sender: &BlobSender, + sender: &PacketSender, ) -> Result<()> { let reqs = obj.write().unwrap().gossip_request(&stakes); - let blobs = reqs - .into_iter() - .filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok()) - .collect(); - blob_sender.send(blobs)?; + let packets = to_packets_with_destination(&reqs); + sender.send(packets)?; Ok(()) } @@ -1021,7 +1009,7 @@ impl ClusterInfo { pub fn gossip( obj: Arc>, bank_forks: Option>>, - blob_sender: BlobSender, + sender: PacketSender, exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); @@ -1044,7 +1032,7 @@ impl ClusterInfo { } None => HashMap::new(), }; - let _ = Self::run_gossip(&obj, &stakes, &blob_sender); + let _ = Self::run_gossip(&obj, &stakes, &sender); if exit.load(Ordering::Relaxed) { return; } @@ -1065,13 +1053,20 @@ impl ClusterInfo { .unwrap() } - fn get_data_shred_as_blob( + fn get_data_shred_as_packet( blocktree: &Arc, slot: Slot, shred_index: u64, - ) -> Result> { - let bytes = blocktree.get_data_shred(slot, shred_index)?; - Ok(bytes.map(|bytes| Blob::new(&bytes))) + dest: &SocketAddr, + ) -> Result> { + let data = blocktree.get_data_shred(slot, shred_index)?; + Ok(data.map(|data| { + let mut packet = Packet::default(); + packet.meta.size = data.len(); + packet.meta.set_addr(dest); + packet.data.copy_from_slice(&data); + packet + })) } fn run_window_request( @@ -1080,17 +1075,15 @@ impl ClusterInfo { blocktree: Option<&Arc>, me: &ContactInfo, slot: Slot, - blob_index: u64, - ) -> Vec { + shred_index: u64, + ) -> Packets { if let Some(blocktree) = blocktree { // Try to find the requested index in one of the slots - let blob = Self::get_data_shred_as_blob(blocktree, slot, blob_index); + let packet = Self::get_data_shred_as_packet(blocktree, slot, shred_index, from_addr); - if let Ok(Some(mut blob)) = blob { + if let Ok(Some(packet)) = packet { inc_new_counter_debug!("cluster_info-window-request-ledger", 1); - blob.meta.set_addr(from_addr); - - return vec![Arc::new(RwLock::new(blob))]; + return Packets::new(vec![packet]); } } @@ -1100,10 +1093,10 @@ impl ClusterInfo { me.id, from.id, slot, - blob_index, + shred_index, ); - vec![] + Packets::default() } fn run_highest_window_request( @@ -1111,7 +1104,7 @@ impl ClusterInfo { blocktree: Option<&Arc>, slot: Slot, highest_index: u64, - ) -> Vec { + ) -> Packets { if let Some(blocktree) = blocktree { // Try to find the requested index in one of the slots let meta = blocktree.meta(slot); @@ -1119,17 +1112,21 @@ impl ClusterInfo { if let Ok(Some(meta)) = meta { if meta.received > highest_index { // meta.received must be at least 1 by this point - let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1); + let packet = Self::get_data_shred_as_packet( + blocktree, + slot, + meta.received - 1, + from_addr, + ); - if let Ok(Some(mut blob)) = blob { - blob.meta.set_addr(from_addr); - return vec![Arc::new(RwLock::new(blob))]; + if let Ok(Some(packet)) = packet { + return Packets::new(vec![packet]); } } } } - vec![] + Packets::default() } fn run_orphan( @@ -1137,20 +1134,20 @@ impl ClusterInfo { blocktree: Option<&Arc>, mut slot: Slot, max_responses: usize, - ) -> Vec { - let mut res = vec![]; + ) -> Packets { + let mut res = Packets::default(); if let Some(blocktree) = blocktree { // Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blocktree.meta(slot) { if meta.received == 0 { break; } - let blob = Self::get_data_shred_as_blob(blocktree, slot, meta.received - 1); - if let Ok(Some(mut blob)) = blob { - blob.meta.set_addr(from_addr); - res.push(Arc::new(RwLock::new(blob))); + let packet = + Self::get_data_shred_as_packet(blocktree, slot, meta.received - 1, from_addr); + if let Ok(Some(packet)) = packet { + res.packets.push(packet); } - if meta.is_parent_set() && res.len() <= max_responses { + if meta.is_parent_set() && res.packets.len() <= max_responses { slot = meta.parent_slot; } else { break; @@ -1161,19 +1158,18 @@ impl ClusterInfo { res } - fn handle_blobs( + fn handle_packets( me: &Arc>, blocktree: Option<&Arc>, stakes: &HashMap, - blobs: &[SharedBlob], - response_sender: &BlobSender, + packets: Packets, + response_sender: &PacketSender, ) { // iter over the blobs, collect pulls separately and process everything else let mut gossip_pull_data: Vec = vec![]; - blobs.iter().for_each(|blob| { - let blob = blob.read().unwrap(); - let from_addr = blob.meta.addr(); - limited_deserialize(&blob.data[..blob.meta.size]) + packets.packets.iter().for_each(|packet| { + let from_addr = packet.meta.addr(); + limited_deserialize(&packet.data[..packet.meta.size]) .into_iter() .for_each(|request| match request { Protocol::PullRequest(filter, caller) => { @@ -1259,7 +1255,7 @@ impl ClusterInfo { response_sender.send(Self::handle_pull_requests(me, gossip_pull_data)); } - fn handle_pull_requests(me: &Arc>, requests: Vec) -> Vec { + fn handle_pull_requests(me: &Arc>, requests: Vec) -> Packets { // split the requests into addrs and filters let mut caller_and_filters = vec![]; let mut addrs = vec![]; @@ -1274,24 +1270,27 @@ impl ClusterInfo { .unwrap() .gossip .process_pull_requests(caller_and_filters, now); + let mut packets = Packets::default(); pull_responses .into_iter() .zip(addrs.into_iter()) - .flat_map(|(response, from_addr)| { + .for_each(|(response, from_addr)| { let len = response.len(); trace!("get updates since response {}", len); inc_new_counter_debug!("cluster_info-pull_request-rsp", len); Self::split_gossip_messages(response) .into_iter() - .filter_map(move |payload| { + .for_each(|payload| { let protocol = Protocol::PullResponse(self_id, payload); // The remote node may not know its public IP:PORT. Instead of responding to the caller's // gossip addr, respond to the origin addr. The last origin addr is picked from the list of // addrs. - to_shared_blob(protocol, from_addr).ok() + packets + .packets + .push(Packet::from_data(&from_addr, protocol)) }) - }) - .collect() + }); + packets } fn handle_pull_response(me: &Arc>, from: &Pubkey, data: Vec) { @@ -1314,7 +1313,7 @@ impl ClusterInfo { from: &Pubkey, data: Vec, stakes: &HashMap, - ) -> Vec { + ) -> Packets { let self_id = me.read().unwrap().gossip.id; inc_new_counter_debug!("cluster_info-push_message", 1); @@ -1331,9 +1330,9 @@ impl ClusterInfo { .gossip .prune_received_cache(updated_labels, stakes); - let mut rsp: Vec<_> = prunes_map + let rsp: Vec<_> = prunes_map .into_iter() - .map(|(from, prune_set)| { + .filter_map(|(from, prune_set)| { inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len()); me.read().unwrap().lookup(&from).cloned().and_then(|ci| { let mut prune_msg = PruneData { @@ -1345,25 +1344,22 @@ impl ClusterInfo { }; prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); - to_shared_blob(rsp, ci.gossip).ok() + Some((ci.gossip, rsp)) }) }) - .flatten() .collect(); + let mut packets = to_packets_with_destination(&rsp); - if !rsp.is_empty() { + if !packets.is_empty() { let pushes: Vec<_> = me.write().unwrap().new_push_requests(); inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); - let mut blobs: Vec<_> = pushes - .into_iter() - .filter_map(|(remote_gossip_addr, req)| { - to_shared_blob(req, remote_gossip_addr).ok() - }) - .collect(); - rsp.append(&mut blobs); - rsp + pushes.into_iter().for_each(|(remote_gossip_addr, req)| { + let p = Packet::from_data(&remote_gossip_addr, &req); + packets.packets.push(p); + }); + packets } else { - vec![] + Packets::default() } } @@ -1381,7 +1377,7 @@ impl ClusterInfo { from_addr: &SocketAddr, blocktree: Option<&Arc>, request: Protocol, - ) -> Vec { + ) -> Packets { let now = Instant::now(); //TODO this doesn't depend on cluster_info module, could be moved @@ -1396,7 +1392,7 @@ impl ClusterInfo { self_id, from.id, ); inc_new_counter_debug!("cluster_info-handle-repair--eq", 1); - return vec![]; + return Packets::default(); } me.write() @@ -1456,15 +1452,12 @@ impl ClusterInfo { obj: &Arc>, blocktree: Option<&Arc>, bank_forks: Option<&Arc>>, - requests_receiver: &BlobReceiver, - response_sender: &BlobSender, + requests_receiver: &PacketReceiver, + response_sender: &PacketSender, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); - let mut reqs = requests_receiver.recv_timeout(timeout)?; - while let Ok(mut more) = requests_receiver.try_recv() { - reqs.append(&mut more); - } + let reqs = requests_receiver.recv_timeout(timeout)?; let stakes: HashMap<_, _> = match bank_forks { Some(ref bank_forks) => { @@ -1473,15 +1466,15 @@ impl ClusterInfo { None => HashMap::new(), }; - Self::handle_blobs(obj, blocktree, &stakes, &reqs, response_sender); + Self::handle_packets(obj, blocktree, &stakes, reqs, response_sender); Ok(()) } pub fn listen( me: Arc>, blocktree: Option>, bank_forks: Option>>, - requests_receiver: BlobReceiver, - response_sender: BlobSender, + requests_receiver: PacketReceiver, + response_sender: PacketSender, exit: &Arc, ) -> JoinHandle<()> { let exit = exit.clone(); @@ -2005,10 +1998,9 @@ mod tests { ); assert!(!rv.is_empty()); let rv: Vec = rv + .packets .into_iter() - .filter_map(|b| { - Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok() - }) + .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) .collect(); assert_eq!(rv[0].index(), 1); assert_eq!(rv[0].slot(), 2); @@ -2039,10 +2031,9 @@ mod tests { let rv = ClusterInfo::run_highest_window_request(&socketaddr_any!(), Some(&blocktree), 2, 1); let rv: Vec = rv + .packets .into_iter() - .filter_map(|b| { - Shred::new_from_serialized_shred(b.read().unwrap().data.to_vec()).ok() - }) + .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) .collect(); assert!(!rv.is_empty()); let index = blocktree.meta(2).unwrap().unwrap().received - 1; @@ -2084,16 +2075,22 @@ mod tests { // For slot 3, we should return the highest blobs from slots 3, 2, 1 respectively // for this request let rv: Vec<_> = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 3, 5) + .packets .iter() - .map(|b| b.read().unwrap().clone()) + .map(|b| b.clone()) .collect(); let expected: Vec<_> = (1..=3) .rev() .map(|slot| { let index = blocktree.meta(slot).unwrap().unwrap().received - 1; - ClusterInfo::get_data_shred_as_blob(&blocktree, slot, index) - .unwrap() - .unwrap() + ClusterInfo::get_data_shred_as_packet( + &blocktree, + slot, + index, + &socketaddr_any!(), + ) + .unwrap() + .unwrap() }) .collect(); assert_eq!(rv, expected) @@ -2200,7 +2197,7 @@ mod tests { let (_, _, val) = cluster_info .gossip - .new_pull_request(timestamp(), &HashMap::new(), ClusterInfo::max_bloom_size()) + .new_pull_request(timestamp(), &HashMap::new(), MAX_BLOOM_SIZE) .ok() .unwrap(); assert!(val.verify()); @@ -2457,7 +2454,7 @@ mod tests { check_pull_request_size(CrdsFilter::new_rand(1000, 10)); check_pull_request_size(CrdsFilter::new_rand(1000, 1000)); check_pull_request_size(CrdsFilter::new_rand(100000, 1000)); - check_pull_request_size(CrdsFilter::new_rand(100000, ClusterInfo::max_bloom_size())); + check_pull_request_size(CrdsFilter::new_rand(100000, MAX_BLOOM_SIZE)); } fn check_pull_request_size(filter: CrdsFilter) { @@ -2543,4 +2540,77 @@ mod tests { assert_eq!(1, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); } + + #[test] + fn test_max_bloom_size() { + assert_eq!(MAX_BLOOM_SIZE, max_bloom_size()); + } + + #[test] + fn test_protocol_size() { + let contact_info = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); + let dummy_vec = + vec![CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())); 10]; + let dummy_vec_size = serialized_size(&dummy_vec).unwrap(); + let mut max_protocol_size; + + max_protocol_size = + serialized_size(&Protocol::PullRequest(CrdsFilter::default(), contact_info)).unwrap() + - serialized_size(&CrdsFilter::default()).unwrap(); + max_protocol_size = max_protocol_size.max( + serialized_size(&Protocol::PullResponse( + Pubkey::default(), + dummy_vec.clone(), + )) + .unwrap() + - dummy_vec_size, + ); + max_protocol_size = max_protocol_size.max( + serialized_size(&Protocol::PushMessage(Pubkey::default(), dummy_vec)).unwrap() + - dummy_vec_size, + ); + max_protocol_size = max_protocol_size.max( + serialized_size(&Protocol::PruneMessage( + Pubkey::default(), + PruneData::default(), + )) + .unwrap() + - serialized_size(&PruneData::default()).unwrap(), + ); + + // make sure repairs are always smaller than the gossip messages + assert!( + max_protocol_size + > serialized_size(&Protocol::RequestWindowIndex(ContactInfo::default(), 0, 0)) + .unwrap() + ); + assert!( + max_protocol_size + > serialized_size(&Protocol::RequestHighestWindowIndex( + ContactInfo::default(), + 0, + 0 + )) + .unwrap() + ); + assert!( + max_protocol_size + > serialized_size(&Protocol::RequestOrphan(ContactInfo::default(), 0)).unwrap() + ); + // finally assert the header size estimation is correct + assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size); + } + + // computes the maximum size for pull request blooms + fn max_bloom_size() -> usize { + let filter_size = serialized_size(&CrdsFilter::default()) + .expect("unable to serialize default filter") as usize; + let protocol = Protocol::PullRequest( + CrdsFilter::default(), + CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default())), + ); + let protocol_size = + serialized_size(&protocol).expect("unable to serialize gossip protocol") as usize; + PACKET_DATA_SIZE - (protocol_size - filter_size) + } } diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index 403e5d60b..73dd117ce 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -526,22 +526,23 @@ impl ClusterInfoRepairListener { #[cfg(test)] mod tests { use super::*; - use crate::blob::{Blob, SharedBlob}; use crate::cluster_info::Node; + use crate::packet::Packets; use crate::streamer; + use crate::streamer::PacketReceiver; use solana_ledger::blocktree::make_many_slot_entries; use solana_ledger::get_tmp_ledger_path; + use solana_perf::recycler::Recycler; use std::collections::BTreeSet; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; - use std::sync::mpsc::Receiver; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; struct MockRepairee { id: Pubkey, - receiver: Receiver>, + receiver: PacketReceiver, tvu_address: SocketAddr, repairee_exit: Arc, repairee_receiver_thread_hdl: JoinHandle<()>, @@ -550,7 +551,7 @@ mod tests { impl MockRepairee { pub fn new( id: Pubkey, - receiver: Receiver>, + receiver: PacketReceiver, tvu_address: SocketAddr, repairee_exit: Arc, repairee_receiver_thread_hdl: JoinHandle<()>, @@ -570,8 +571,13 @@ mod tests { let repairee_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); let repairee_tvu_addr = repairee_socket.local_addr().unwrap(); let repairee_exit = Arc::new(AtomicBool::new(false)); - let repairee_receiver_thread_hdl = - streamer::blob_receiver(repairee_socket, &repairee_exit, repairee_sender); + let repairee_receiver_thread_hdl = streamer::receiver( + repairee_socket, + &repairee_exit, + repairee_sender, + Recycler::default(), + "mock_repairee_receiver", + ); Self::new( id, @@ -788,19 +794,30 @@ mod tests { .unwrap(); } - let mut received_shreds: Vec>> = vec![]; + let mut received_shreds: Vec = vec![]; // This repairee was missing exactly `num_slots / 2` slots, so we expect to get // `(num_slots / 2) * num_shreds_per_slot * REPAIR_REDUNDANCY` blobs. let num_expected_shreds = (num_slots / 2) * num_shreds_per_slot * REPAIR_REDUNDANCY as u64; - while (received_shreds.len() as u64) < num_expected_shreds { - received_shreds.extend(mock_repairee.receiver.recv().unwrap()); + while (received_shreds + .iter() + .map(|p| p.packets.len() as u64) + .sum::()) + < num_expected_shreds + { + received_shreds.push(mock_repairee.receiver.recv().unwrap()); } - // Make sure no extra blobs get sent + // Make sure no extra shreds get sent sleep(Duration::from_millis(1000)); assert!(mock_repairee.receiver.try_recv().is_err()); - assert_eq!(received_shreds.len() as u64, num_expected_shreds); + assert_eq!( + received_shreds + .iter() + .map(|p| p.packets.len() as u64) + .sum::(), + num_expected_shreds + ); // Shutdown mock_repairee.close().unwrap(); diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index 983d19db6..41d5e6032 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -8,7 +8,6 @@ //! the local nodes wallclock window they are drooped silently. //! 2. The prune set is stored in a Bloom filter. -use crate::blob::BLOB_DATA_SIZE; use crate::contact_info::ContactInfo; use crate::crds::{Crds, VersionedCrdsValue}; use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS}; @@ -23,6 +22,7 @@ use rand::seq::SliceRandom; use rand::{thread_rng, RngCore}; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; +use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::cmp; @@ -53,7 +53,8 @@ pub struct CrdsGossipPush { impl Default for CrdsGossipPush { fn default() -> Self { Self { - max_bytes: BLOB_DATA_SIZE, + // Allow upto 64 Crds Values per PUSH + max_bytes: PACKET_DATA_SIZE * 64, active_set: IndexMap::new(), push_messages: HashMap::new(), received_cache: HashMap::new(), diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index a87a71404..843852fa4 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -7,6 +7,7 @@ use rand::{thread_rng, Rng}; use solana_client::thin_client::{create_client, ThinClient}; use solana_ledger::bank_forks::BankForks; use solana_ledger::blocktree::Blocktree; +use solana_perf::recycler::Recycler; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::net::{SocketAddr, TcpListener, UdpSocket}; @@ -35,7 +36,13 @@ impl GossipService { &cluster_info.read().unwrap().my_data().id, gossip_socket.local_addr().unwrap() ); - let t_receiver = streamer::blob_receiver(gossip_socket.clone(), &exit, request_sender); + let t_receiver = streamer::receiver( + gossip_socket.clone(), + &exit, + request_sender, + Recycler::default(), + "gossip_receiver", + ); let (response_sender, response_receiver) = channel(); let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); let t_listen = ClusterInfo::listen( diff --git a/core/src/lib.rs b/core/src/lib.rs index 5e574d927..a04a54966 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -6,7 +6,6 @@ //! pub mod banking_stage; -pub mod blob; pub mod broadcast_stage; pub mod chacha; pub mod chacha_cuda; diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 99246221c..579b90dc2 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,8 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::blob::{Blob, SharedBlobs}; -use crate::packet::{self, Packets, PacketsRecycler, PACKETS_PER_BATCH}; +use crate::packet::{self, send_to, Packets, PacketsRecycler, PACKETS_PER_BATCH}; use crate::recvmmsg::NUM_RCVMMSGS; use crate::result::{Error, Result}; use solana_sdk::timing::duration_as_ms; @@ -15,8 +14,6 @@ use std::time::{Duration, Instant}; pub type PacketReceiver = Receiver; pub type PacketSender = Sender; -pub type BlobSender = Sender; -pub type BlobReceiver = Receiver; fn recv_loop( sock: &UdpSocket, @@ -33,7 +30,7 @@ fn recv_loop( let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name); loop { // Check for exit signal, even if socket is busy - // (for instance the leader trasaction socket) + // (for instance the leader transaction socket) if exit.load(Ordering::Relaxed) { return Ok(()); } @@ -83,10 +80,10 @@ pub fn receiver( .unwrap() } -fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, r: &PacketReceiver) -> Result<()> { let timer = Duration::new(1, 0); let msgs = r.recv_timeout(timer)?; - Blob::send_to(sock, msgs)?; + send_to(&msgs, sock)?; Ok(()) } @@ -110,7 +107,7 @@ pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec, r: BlobReceiver) -> JoinHandle<()> { +pub fn responder(name: &'static str, sock: Arc, r: PacketReceiver) -> JoinHandle<()> { Builder::new() .name(format!("solana-responder-{}", name)) .spawn(move || loop { @@ -125,43 +122,9 @@ pub fn responder(name: &'static str, sock: Arc, r: BlobReceiver) -> J .unwrap() } -//TODO, we would need to stick block authentication before we create the -//window. -fn recv_blobs(sock: &UdpSocket, s: &BlobSender) -> Result<()> { - trace!("recv_blobs: receiving on {}", sock.local_addr().unwrap()); - let dq = Blob::recv_from(sock)?; - if !dq.is_empty() { - s.send(dq)?; - } - Ok(()) -} - -pub fn blob_receiver( - sock: Arc, - exit: &Arc, - s: BlobSender, -) -> JoinHandle<()> { - //DOCUMENTED SIDE-EFFECT - //1 second timeout on socket read - let timer = Duration::new(1, 0); - sock.set_read_timeout(Some(timer)) - .expect("set socket timeout"); - let exit = exit.clone(); - Builder::new() - .name("solana-blob_receiver".to_string()) - .spawn(move || loop { - if exit.load(Ordering::Relaxed) { - break; - } - let _ = recv_blobs(&sock, &s); - }) - .unwrap() -} - #[cfg(test)] mod test { use super::*; - use crate::blob::{Blob, SharedBlob}; use crate::packet::{Packet, Packets, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder}; use solana_perf::recycler::Recycler; @@ -193,7 +156,6 @@ mod test { fn streamer_debug() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); write!(io::sink(), "{:?}", Packets::default()).unwrap(); - write!(io::sink(), "{:?}", Blob::default()).unwrap(); } #[test] fn streamer_send_test() { @@ -208,16 +170,15 @@ mod test { let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); - let mut msgs = Vec::new(); + let mut msgs = Packets::default(); for i in 0..5 { - let b = SharedBlob::default(); + let mut b = Packet::default(); { - let mut w = b.write().unwrap(); - w.data[0] = i as u8; - w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&addr); + b.data[0] = i as u8; + b.meta.size = PACKET_DATA_SIZE; + b.meta.set_addr(&addr); } - msgs.push(b); + msgs.packets.push(b); } s_responder.send(msgs).expect("send"); t_responder diff --git a/core/src/window_service.rs b/core/src/window_service.rs index af6f0e4c1..1a8c237c5 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -126,7 +126,7 @@ where trace!("{} num total shreds received: {}", my_pubkey, total_packets); for packets in packets.into_iter() { - if !packets.packets.is_empty() { + if !packets.is_empty() { // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) let _ = retransmit.send(packets); } diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index e13a06787..0342c49b2 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -1,7 +1,7 @@ use bincode::serialized_size; use log::*; use rayon::prelude::*; -use solana_core::cluster_info::ClusterInfo; +use solana_core::cluster_info; use solana_core::contact_info::ContactInfo; use solana_core::crds_gossip::*; use solana_core::crds_gossip_error::CrdsGossipError; @@ -403,7 +403,7 @@ fn network_run_pull( .filter_map(|from| { from.lock() .unwrap() - .new_pull_request(now, &HashMap::new(), ClusterInfo::max_bloom_size()) + .new_pull_request(now, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE) .ok() }) .collect() diff --git a/perf/src/packet.rs b/perf/src/packet.rs index b2d70a795..8fc146379 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -5,7 +5,7 @@ use crate::{ }; use serde::Serialize; pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; -use std::{io, mem, net::SocketAddr}; +use std::{mem, net::SocketAddr}; pub const NUM_PACKETS: usize = 1024 * 8; @@ -76,6 +76,10 @@ impl Packets { m.meta.set_addr(&addr); } } + + pub fn is_empty(&self) -> bool { + self.packets.is_empty() + } } pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec { @@ -84,10 +88,7 @@ pub fn to_packets_chunked(xs: &[T], chunks: usize) -> Vec let mut p = Packets::default(); p.packets.resize(x.len(), Packet::default()); for (i, o) in x.iter().zip(p.packets.iter_mut()) { - let mut wr = io::Cursor::new(&mut o.data[..]); - bincode::serialize_into(&mut wr, &i).expect("serialize request"); - let len = wr.position() as usize; - o.meta.size = len; + Packet::populate_packet(o, None, i).expect("serialize request"); } out.push(p); } @@ -98,6 +99,17 @@ pub fn to_packets(xs: &[T]) -> Vec { to_packets_chunked(xs, NUM_PACKETS) } +pub fn to_packets_with_destination(dests_and_data: &[(SocketAddr, T)]) -> Packets { + let mut out = Packets::default(); + out.packets.resize(dests_and_data.len(), Packet::default()); + for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) { + if let Err(e) = Packet::populate_packet(o, Some(&dest_and_data.0), &dest_and_data.1) { + error!("Couldn't write to packet {:?}. Data skipped.", e); + } + } + out +} + pub fn limited_deserialize(data: &[u8]) -> bincode::Result where T: serde::de::DeserializeOwned, diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 37007bace..2f841fb8d 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -51,3 +51,5 @@ pub mod transport; #[macro_use] extern crate serde_derive; + +extern crate log as logger; diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 924833988..a75431546 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -1,6 +1,8 @@ use crate::clock::Slot; -use std::fmt; +use bincode::Result; +use serde::Serialize; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::{fmt, io}; /// Maximum over-the-wire size of a Transaction /// 1280 is IPv6 minimum MTU @@ -33,6 +35,29 @@ impl Packet { pub fn new(data: [u8; PACKET_DATA_SIZE], meta: Meta) -> Self { Self { data, meta } } + + pub fn from_data(dest: &SocketAddr, data: T) -> Self { + let mut me = Packet::default(); + if let Err(e) = Self::populate_packet(&mut me, Some(dest), &data) { + logger::error!("Couldn't write to packet {:?}. Data skipped.", e); + } + me + } + + pub fn populate_packet( + packet: &mut Packet, + dest: Option<&SocketAddr>, + data: &T, + ) -> Result<()> { + let mut wr = io::Cursor::new(&mut packet.data[..]); + bincode::serialize_into(&mut wr, data)?; + let len = wr.position() as usize; + packet.meta.size = len; + if let Some(dest) = dest { + packet.meta.set_addr(dest); + } + Ok(()) + } } impl fmt::Debug for Packet {