Forward transactions as packets instead of blobs (#5334)

* Forward transactions as packets instead of blobs

* clippy
This commit is contained in:
Pankaj Garg 2019-07-30 14:50:02 -07:00 committed by GitHub
parent 8d243221f0
commit a7a10e12c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 46 additions and 200 deletions

View File

@ -134,15 +134,13 @@ impl BankingStage {
fn forward_buffered_packets( fn forward_buffered_packets(
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
tpu_via_blobs: &std::net::SocketAddr, tpu_forwards: &std::net::SocketAddr,
unprocessed_packets: &[PacketsAndOffsets], unprocessed_packets: &[PacketsAndOffsets],
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets); let packets = Self::filter_valid_packets_for_forwarding(unprocessed_packets);
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
let blobs = packet::packets_to_blobs(&packets); for p in packets {
socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?;
for blob in blobs {
socket.send_to(&blob.data[..blob.meta.size], tpu_via_blobs)?;
} }
Ok(()) Ok(())
@ -316,7 +314,7 @@ impl BankingStage {
.read() .read()
.unwrap() .unwrap()
.lookup(&leader_pubkey) .lookup(&leader_pubkey)
.map(|leader| leader.tpu_via_blobs) .map(|leader| leader.tpu_forwards)
}; };
leader_addr.map_or(Ok(()), |leader_addr| { leader_addr.map_or(Ok(()), |leader_addr| {

View File

@ -1463,7 +1463,7 @@ pub struct Sockets {
pub gossip: UdpSocket, pub gossip: UdpSocket,
pub tvu: Vec<UdpSocket>, pub tvu: Vec<UdpSocket>,
pub tpu: Vec<UdpSocket>, pub tpu: Vec<UdpSocket>,
pub tpu_via_blobs: Vec<UdpSocket>, pub tpu_forwards: Vec<UdpSocket>,
pub broadcast: UdpSocket, pub broadcast: UdpSocket,
pub repair: UdpSocket, pub repair: UdpSocket,
pub retransmit: UdpSocket, pub retransmit: UdpSocket,
@ -1508,7 +1508,7 @@ impl Node {
gossip, gossip,
tvu: vec![tvu], tvu: vec![tvu],
tpu: vec![], tpu: vec![],
tpu_via_blobs: vec![], tpu_forwards: vec![],
broadcast, broadcast,
repair, repair,
retransmit, retransmit,
@ -1520,7 +1520,7 @@ impl Node {
let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu = UdpSocket::bind("127.0.0.1:0").unwrap();
let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap();
let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu = UdpSocket::bind("127.0.0.1:0").unwrap();
let tpu_via_blobs = UdpSocket::bind("127.0.0.1:0").unwrap(); let tpu_forwards = UdpSocket::bind("127.0.0.1:0").unwrap();
let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let rpc_port = find_available_port_in_range((1024, 65535)).unwrap(); let rpc_port = find_available_port_in_range((1024, 65535)).unwrap();
let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port);
@ -1536,7 +1536,7 @@ impl Node {
gossip.local_addr().unwrap(), gossip.local_addr().unwrap(),
tvu.local_addr().unwrap(), tvu.local_addr().unwrap(),
tpu.local_addr().unwrap(), tpu.local_addr().unwrap(),
tpu_via_blobs.local_addr().unwrap(), tpu_forwards.local_addr().unwrap(),
storage.local_addr().unwrap(), storage.local_addr().unwrap(),
rpc_addr, rpc_addr,
rpc_pubsub_addr, rpc_pubsub_addr,
@ -1548,7 +1548,7 @@ impl Node {
gossip, gossip,
tvu: vec![tvu], tvu: vec![tvu],
tpu: vec![tpu], tpu: vec![tpu],
tpu_via_blobs: vec![tpu_via_blobs], tpu_forwards: vec![tpu_forwards],
broadcast, broadcast,
repair, repair,
retransmit, retransmit,
@ -1582,7 +1582,7 @@ impl Node {
let (tpu_port, tpu_sockets) = multi_bind_in_range(port_range, 32).expect("tpu multi_bind"); let (tpu_port, tpu_sockets) = multi_bind_in_range(port_range, 32).expect("tpu multi_bind");
let (tpu_via_blobs_port, tpu_via_blobs_sockets) = let (tpu_forwards_port, tpu_forwards_sockets) =
multi_bind_in_range(port_range, 8).expect("tpu multi_bind"); multi_bind_in_range(port_range, 8).expect("tpu multi_bind");
let (_, repair) = Self::bind(port_range); let (_, repair) = Self::bind(port_range);
@ -1594,7 +1594,7 @@ impl Node {
SocketAddr::new(gossip_addr.ip(), gossip_port), SocketAddr::new(gossip_addr.ip(), gossip_port),
SocketAddr::new(gossip_addr.ip(), tvu_port), SocketAddr::new(gossip_addr.ip(), tvu_port),
SocketAddr::new(gossip_addr.ip(), tpu_port), SocketAddr::new(gossip_addr.ip(), tpu_port),
SocketAddr::new(gossip_addr.ip(), tpu_via_blobs_port), SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
socketaddr_any!(), socketaddr_any!(),
socketaddr_any!(), socketaddr_any!(),
socketaddr_any!(), socketaddr_any!(),
@ -1608,7 +1608,7 @@ impl Node {
gossip, gossip,
tvu: tvu_sockets, tvu: tvu_sockets,
tpu: tpu_sockets, tpu: tpu_sockets,
tpu_via_blobs: tpu_via_blobs_sockets, tpu_forwards: tpu_forwards_sockets,
broadcast, broadcast,
repair, repair,
retransmit, retransmit,
@ -1629,9 +1629,9 @@ impl Node {
let empty = socketaddr_any!(); let empty = socketaddr_any!();
new.info.tpu = empty; new.info.tpu = empty;
new.info.tpu_via_blobs = empty; new.info.tpu_forwards = empty;
new.sockets.tpu = vec![]; new.sockets.tpu = vec![];
new.sockets.tpu_via_blobs = vec![]; new.sockets.tpu_forwards = vec![];
new new
} }

View File

@ -23,7 +23,7 @@ pub struct ContactInfo {
/// transactions address /// transactions address
pub tpu: SocketAddr, pub tpu: SocketAddr,
/// address to forward unprocessed transactions to /// address to forward unprocessed transactions to
pub tpu_via_blobs: SocketAddr, pub tpu_forwards: SocketAddr,
/// storage data address /// storage data address
pub storage_addr: SocketAddr, pub storage_addr: SocketAddr,
/// address to which to send JSON-RPC requests /// address to which to send JSON-RPC requests
@ -78,7 +78,7 @@ impl Default for ContactInfo {
gossip: socketaddr_any!(), gossip: socketaddr_any!(),
tvu: socketaddr_any!(), tvu: socketaddr_any!(),
tpu: socketaddr_any!(), tpu: socketaddr_any!(),
tpu_via_blobs: socketaddr_any!(), tpu_forwards: socketaddr_any!(),
storage_addr: socketaddr_any!(), storage_addr: socketaddr_any!(),
rpc: socketaddr_any!(), rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(), rpc_pubsub: socketaddr_any!(),
@ -94,7 +94,7 @@ impl ContactInfo {
gossip: SocketAddr, gossip: SocketAddr,
tvu: SocketAddr, tvu: SocketAddr,
tpu: SocketAddr, tpu: SocketAddr,
tpu_via_blobs: SocketAddr, tpu_forwards: SocketAddr,
storage_addr: SocketAddr, storage_addr: SocketAddr,
rpc: SocketAddr, rpc: SocketAddr,
rpc_pubsub: SocketAddr, rpc_pubsub: SocketAddr,
@ -106,7 +106,7 @@ impl ContactInfo {
gossip, gossip,
tvu, tvu,
tpu, tpu,
tpu_via_blobs, tpu_forwards,
storage_addr, storage_addr,
rpc, rpc,
rpc_pubsub, rpc_pubsub,
@ -157,7 +157,7 @@ impl ContactInfo {
let tpu_addr = *bind_addr; let tpu_addr = *bind_addr;
let gossip_addr = next_port(&bind_addr, 1); let gossip_addr = next_port(&bind_addr, 1);
let tvu_addr = next_port(&bind_addr, 2); let tvu_addr = next_port(&bind_addr, 2);
let tpu_via_blobs_addr = next_port(&bind_addr, 3); let tpu_forwards_addr = next_port(&bind_addr, 3);
let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT);
let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
Self::new( Self::new(
@ -165,7 +165,7 @@ impl ContactInfo {
gossip_addr, gossip_addr,
tvu_addr, tvu_addr,
tpu_addr, tpu_addr,
tpu_via_blobs_addr, tpu_forwards_addr,
"0.0.0.0:0".parse().unwrap(), "0.0.0.0:0".parse().unwrap(),
rpc_addr, rpc_addr,
rpc_pubsub_addr, rpc_pubsub_addr,
@ -233,7 +233,7 @@ impl Signable for ContactInfo {
gossip: SocketAddr, gossip: SocketAddr,
tvu: SocketAddr, tvu: SocketAddr,
tpu: SocketAddr, tpu: SocketAddr,
tpu_via_blobs: SocketAddr, tpu_forwards: SocketAddr,
storage_addr: SocketAddr, storage_addr: SocketAddr,
rpc: SocketAddr, rpc: SocketAddr,
rpc_pubsub: SocketAddr, rpc_pubsub: SocketAddr,
@ -247,7 +247,7 @@ impl Signable for ContactInfo {
tvu: me.tvu, tvu: me.tvu,
tpu: me.tpu, tpu: me.tpu,
storage_addr: me.storage_addr, storage_addr: me.storage_addr,
tpu_via_blobs: me.tpu_via_blobs, tpu_forwards: me.tpu_forwards,
rpc: me.rpc, rpc: me.rpc,
rpc_pubsub: me.rpc_pubsub, rpc_pubsub: me.rpc_pubsub,
wallclock: me.wallclock, wallclock: me.wallclock,
@ -287,7 +287,7 @@ mod tests {
let ci = ContactInfo::default(); let ci = ContactInfo::default();
assert!(ci.gossip.ip().is_unspecified()); assert!(ci.gossip.ip().is_unspecified());
assert!(ci.tvu.ip().is_unspecified()); assert!(ci.tvu.ip().is_unspecified());
assert!(ci.tpu_via_blobs.ip().is_unspecified()); assert!(ci.tpu_forwards.ip().is_unspecified());
assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified());
@ -298,7 +298,7 @@ mod tests {
let ci = ContactInfo::new_multicast(); let ci = ContactInfo::new_multicast();
assert!(ci.gossip.ip().is_multicast()); assert!(ci.gossip.ip().is_multicast());
assert!(ci.tvu.ip().is_multicast()); assert!(ci.tvu.ip().is_multicast());
assert!(ci.tpu_via_blobs.ip().is_multicast()); assert!(ci.tpu_forwards.ip().is_multicast());
assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc.ip().is_multicast());
assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast());
assert!(ci.tpu.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast());
@ -310,7 +310,7 @@ mod tests {
let ci = ContactInfo::new_gossip_entry_point(&addr); let ci = ContactInfo::new_gossip_entry_point(&addr);
assert_eq!(ci.gossip, addr); assert_eq!(ci.gossip, addr);
assert!(ci.tvu.ip().is_unspecified()); assert!(ci.tvu.ip().is_unspecified());
assert!(ci.tpu_via_blobs.ip().is_unspecified()); assert!(ci.tpu_forwards.ip().is_unspecified());
assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified());
assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified());
@ -323,7 +323,7 @@ mod tests {
assert_eq!(ci.tpu, addr); assert_eq!(ci.tpu, addr);
assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.gossip.port(), 11);
assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.tvu.port(), 12);
assert_eq!(ci.tpu_via_blobs.port(), 13); assert_eq!(ci.tpu_forwards.port(), 13);
assert_eq!(ci.rpc.port(), 8899); assert_eq!(ci.rpc.port(), 8899);
assert_eq!(ci.rpc_pubsub.port(), 8900); assert_eq!(ci.rpc_pubsub.port(), 8900);
assert!(ci.storage_addr.ip().is_unspecified()); assert!(ci.storage_addr.ip().is_unspecified());
@ -338,7 +338,7 @@ mod tests {
assert_eq!(d1.id, keypair.pubkey()); assert_eq!(d1.id, keypair.pubkey());
assert_eq!(d1.gossip, socketaddr!("127.0.0.1:1235")); assert_eq!(d1.gossip, socketaddr!("127.0.0.1:1235"));
assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236")); assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236"));
assert_eq!(d1.tpu_via_blobs, socketaddr!("127.0.0.1:1237")); assert_eq!(d1.tpu_forwards, socketaddr!("127.0.0.1:1237"));
assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234")); assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234"));
assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899")); assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899"));
assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900")); assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900"));

View File

@ -22,28 +22,28 @@ impl FetchStage {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
sockets: Vec<UdpSocket>, sockets: Vec<UdpSocket>,
tpu_via_blobs_sockets: Vec<UdpSocket>, tpu_forwards_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> (Self, PacketReceiver) { ) -> (Self, PacketReceiver) {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
( (
Self::new_with_sender(sockets, tpu_via_blobs_sockets, exit, &sender, &poh_recorder), Self::new_with_sender(sockets, tpu_forwards_sockets, exit, &sender, &poh_recorder),
receiver, receiver,
) )
} }
pub fn new_with_sender( pub fn new_with_sender(
sockets: Vec<UdpSocket>, sockets: Vec<UdpSocket>,
tpu_via_blobs_sockets: Vec<UdpSocket>, tpu_forwards_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
sender: &PacketSender, sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Self { ) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect(); let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_via_blobs_sockets = tpu_via_blobs_sockets.into_iter().map(Arc::new).collect(); let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
Self::new_multi_socket( Self::new_multi_socket(
tx_sockets, tx_sockets,
tpu_via_blobs_sockets, tpu_forwards_sockets,
exit, exit,
&sender, &sender,
&poh_recorder, &poh_recorder,
@ -83,7 +83,7 @@ impl FetchStage {
fn new_multi_socket( fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>, sockets: Vec<Arc<UdpSocket>>,
tpu_via_blobs_sockets: Vec<Arc<UdpSocket>>, tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
sender: &PacketSender, sender: &PacketSender,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -100,9 +100,15 @@ impl FetchStage {
}); });
let (forward_sender, forward_receiver) = channel(); let (forward_sender, forward_receiver) = channel();
let tpu_via_blobs_threads = tpu_via_blobs_sockets let tpu_forwards_threads = tpu_forwards_sockets.into_iter().map(|socket| {
.into_iter() streamer::receiver(
.map(|socket| streamer::blob_packet_receiver(socket, &exit, forward_sender.clone())); socket,
&exit,
forward_sender.clone(),
recycler.clone(),
"fetch_forward_stage",
)
});
let sender = sender.clone(); let sender = sender.clone();
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
@ -124,7 +130,7 @@ impl FetchStage {
}) })
.unwrap(); .unwrap();
let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_via_blobs_threads).collect(); let mut thread_hdls: Vec<_> = tpu_threads.chain(tpu_forwards_threads).collect();
thread_hdls.push(fwd_thread_hdl); thread_hdls.push(fwd_thread_hdl);
Self { thread_hdls } Self { thread_hdls }
} }

View File

@ -12,13 +12,11 @@ pub use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signable; use solana_sdk::signature::Signable;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use std::borrow::Borrow;
use std::borrow::Cow; use std::borrow::Cow;
use std::cmp; use std::cmp;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::io::Cursor; use std::io::Cursor;
use std::io::Write;
use std::mem; use std::mem;
use std::mem::size_of; use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
@ -365,18 +363,6 @@ pub fn to_shared_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<Share
Ok(blobs) Ok(blobs)
} }
pub fn packets_to_blobs<T: Borrow<Packet>>(packets: &[T]) -> Vec<Blob> {
let mut current_index = 0;
let mut blobs = vec![];
while current_index < packets.len() {
let mut blob = Blob::default();
current_index += blob.store_packets(&packets[current_index..]) as usize;
blobs.push(blob);
}
blobs
}
macro_rules! range { macro_rules! range {
($prev:expr, $type:ident) => { ($prev:expr, $type:ident) => {
$prev..$prev + size_of::<$type>() $prev..$prev + size_of::<$type>()
@ -558,52 +544,6 @@ impl Blob {
&self.data[SIGNATURE_RANGE] &self.data[SIGNATURE_RANGE]
} }
pub fn store_packets<T: Borrow<Packet>>(&mut self, packets: &[T]) -> u64 {
let size = self.size();
let mut cursor = Cursor::new(&mut self.data_mut()[size..]);
let mut written = 0;
let mut last_index = 0;
for packet in packets {
if bincode::serialize_into(&mut cursor, &packet.borrow().meta.size).is_err() {
break;
}
let packet = packet.borrow();
if cursor.write_all(&packet.data[..packet.meta.size]).is_err() {
break;
}
written = cursor.position() as usize;
last_index += 1;
}
self.set_size(size + written);
last_index
}
// other side of store_packets
pub fn load_packets(&self, packets: &mut PinnedVec<Packet>) {
// rough estimate
let mut pos = 0;
let size_len = bincode::serialized_size(&0usize).unwrap() as usize;
while pos + size_len < self.size() {
let size: usize = bincode::deserialize_from(&self.data()[pos..]).unwrap();
pos += size_len;
if size > PACKET_DATA_SIZE || pos + size > self.size() {
break;
}
let mut packet = Packet::default();
packet.meta.size = size;
packet.data[..size].copy_from_slice(&self.data()[pos..pos + size]);
pos += size;
packets.push(packet);
}
}
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> { pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
let mut p = r.write().unwrap(); let mut p = r.write().unwrap();
trace!("receiving on {}", socket.local_addr().unwrap()); trace!("receiving on {}", socket.local_addr().unwrap());
@ -701,8 +641,6 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot:
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bincode;
use rand::Rng;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction; use solana_sdk::system_transaction;
@ -834,62 +772,6 @@ mod tests {
assert_eq!(config, b.erasure_config()); assert_eq!(config, b.erasure_config());
} }
#[test]
fn test_store_blobs_max() {
let serialized_size_size = bincode::serialized_size(&0usize).unwrap() as usize;
let serialized_packet_size = serialized_size_size + PACKET_DATA_SIZE;
let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1;
let mut blob = Blob::default();
let packets: Vec<_> = (0..num_packets)
.map(|_| {
let mut packet = Packet::default();
packet.meta.size = PACKET_DATA_SIZE;
packet
})
.collect();
// Everything except the last packet should have been written
assert_eq!(blob.store_packets(&packets[..]), (num_packets - 1) as u64);
blob = Blob::default();
// Store packets such that blob only has room for one more
assert_eq!(
blob.store_packets(&packets[..num_packets - 2]),
(num_packets - 2) as u64
);
// Fill the last packet in the blob
assert_eq!(blob.store_packets(&packets[..num_packets - 2]), 1);
// Blob is now full
assert_eq!(blob.store_packets(&packets), 0);
}
#[test]
fn test_packets_to_blobs() {
let mut rng = rand::thread_rng();
let packets: Vec<_> = (0..2)
.map(|_| {
let mut packet = Packet::default();
packet.meta.size = rng.gen_range(1, PACKET_DATA_SIZE);
for i in 0..packet.meta.size {
packet.data[i] = rng.gen_range(1, std::u8::MAX);
}
packet
})
.collect();
let blobs = packets_to_blobs(&packets[..]);
let mut reconstructed_packets = PinnedVec::default();
blobs
.iter()
.for_each(|b| b.load_packets(&mut reconstructed_packets));
assert_eq!(reconstructed_packets[..], packets[..]);
}
#[test] #[test]
fn test_blob_data_align() { fn test_blob_data_align() {
assert_eq!(std::mem::align_of::<BlobData>(), BLOB_DATA_ALIGN); assert_eq!(std::mem::align_of::<BlobData>(), BLOB_DATA_ALIGN);

View File

@ -134,46 +134,6 @@ pub fn blob_receiver(
.unwrap() .unwrap()
} }
fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender, recycler: &PacketsRecycler) -> Result<()> {
trace!(
"recv_blob_packets: receiving on {}",
sock.local_addr().unwrap()
);
let blobs = Blob::recv_from(sock)?;
for blob in blobs {
let mut packets =
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BLOB, "recv_blob_packets");
blob.read().unwrap().load_packets(&mut packets.packets);
s.send(packets)?;
}
Ok(())
}
pub fn blob_packet_receiver(
sock: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
s: PacketSender,
) -> JoinHandle<()> {
//DOCUMENTED SIDE-EFFECT
//1 second timeout on socket read
let timer = Duration::new(1, 0);
sock.set_read_timeout(Some(timer))
.expect("set socket timeout");
let exit = exit.clone();
let recycler = PacketsRecycler::default();
Builder::new()
.name("solana-blob_packet_receiver".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = recv_blob_packets(&sock, &s, &recycler);
})
.unwrap()
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;

View File

@ -33,7 +33,7 @@ impl Tpu {
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntries>, entry_receiver: Receiver<WorkingBankEntries>,
transactions_sockets: Vec<UdpSocket>, transactions_sockets: Vec<UdpSocket>,
tpu_via_blobs_sockets: Vec<UdpSocket>, tpu_forwards_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
sigverify_disabled: bool, sigverify_disabled: bool,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
@ -44,7 +44,7 @@ impl Tpu {
let (packet_sender, packet_receiver) = channel(); let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender( let fetch_stage = FetchStage::new_with_sender(
transactions_sockets, transactions_sockets,
tpu_via_blobs_sockets, tpu_forwards_sockets,
&exit, &exit,
&packet_sender, &packet_sender,
&poh_recorder, &poh_recorder,

View File

@ -267,7 +267,7 @@ impl Validator {
&poh_recorder, &poh_recorder,
entry_receiver, entry_receiver,
node.sockets.tpu, node.sockets.tpu,
node.sockets.tpu_via_blobs, node.sockets.tpu_forwards,
node.sockets.broadcast, node.sockets.broadcast,
config.sigverify_disabled, config.sigverify_disabled,
&blocktree, &blocktree,