Refactor packing packets into blobs into separate packets_to_blob() function in packets.rs
This commit is contained in:
parent
3ddf4b6c24
commit
a4acc631ee
|
@ -6,8 +6,9 @@ use crate::cluster_info::ClusterInfo;
|
||||||
use crate::entry::Entry;
|
use crate::entry::Entry;
|
||||||
use crate::leader_confirmation_service::LeaderConfirmationService;
|
use crate::leader_confirmation_service::LeaderConfirmationService;
|
||||||
use crate::leader_schedule_utils;
|
use crate::leader_schedule_utils;
|
||||||
|
use crate::packet;
|
||||||
use crate::packet::SharedPackets;
|
use crate::packet::SharedPackets;
|
||||||
use crate::packet::{Blob, Packets};
|
use crate::packet::{Packet, Packets};
|
||||||
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
|
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
|
||||||
use crate::poh_service::{PohService, PohServiceConfig};
|
use crate::poh_service::{PohService, PohServiceConfig};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
|
@ -79,23 +80,17 @@ impl BankingStage {
|
||||||
forwarder: &std::net::SocketAddr,
|
forwarder: &std::net::SocketAddr,
|
||||||
unprocessed_packets: &[(SharedPackets, usize)],
|
unprocessed_packets: &[(SharedPackets, usize)],
|
||||||
) -> std::io::Result<()> {
|
) -> std::io::Result<()> {
|
||||||
let mut blob = Blob::default();
|
let locked_packets: Vec<_> = unprocessed_packets
|
||||||
for (packets, start_index) in unprocessed_packets {
|
.iter()
|
||||||
let packets = packets.read().unwrap();
|
.map(|(p, start_index)| (p.read().unwrap(), start_index))
|
||||||
let mut current_index = *start_index;
|
.collect();
|
||||||
while current_index < packets.packets.len() {
|
let packets: Vec<&Packet> = locked_packets
|
||||||
current_index += blob.store_packets(&packets.packets[current_index..]) as usize;
|
.iter()
|
||||||
if current_index < packets.packets.len() {
|
.flat_map(|(p, start_index)| &p.packets[**start_index..])
|
||||||
// Blob is full, send it
|
.collect();
|
||||||
socket.send_to(&blob.data[..blob.meta.size], forwarder)?;
|
let blobs = packet::packets_to_blobs(&packets[..]);
|
||||||
blob = Blob::default();
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if blob.size() > 0 {
|
for blob in blobs {
|
||||||
socket.send_to(&blob.data[..blob.meta.size], forwarder)?;
|
socket.send_to(&blob.data[..blob.meta.size], forwarder)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1273,7 +1273,7 @@ impl ClusterInfo {
|
||||||
let daddr = socketaddr_any!();
|
let daddr = socketaddr_any!();
|
||||||
|
|
||||||
let node = ContactInfo::new(
|
let node = ContactInfo::new(
|
||||||
*id,
|
id,
|
||||||
daddr,
|
daddr,
|
||||||
daddr,
|
daddr,
|
||||||
daddr,
|
daddr,
|
||||||
|
|
|
@ -58,7 +58,7 @@ pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num
|
||||||
assert!(bal > 0);
|
assert!(bal > 0);
|
||||||
let mut transaction = SystemTransaction::new_move(
|
let mut transaction = SystemTransaction::new_move(
|
||||||
&funding_keypair,
|
&funding_keypair,
|
||||||
random_keypair.pubkey(),
|
&random_keypair.pubkey(),
|
||||||
1,
|
1,
|
||||||
client.get_recent_blockhash(),
|
client.get_recent_blockhash(),
|
||||||
0,
|
0,
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
//! The `packet` module defines data structures and methods to pull data from the network.
|
//! The `packet` module defines data structures and methods to pull data from the network.
|
||||||
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
|
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use bincode::{serialize, serialize_into};
|
use bincode;
|
||||||
use byteorder::{ByteOrder, LittleEndian};
|
use byteorder::{ByteOrder, LittleEndian};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use solana_metrics::counter::Counter;
|
use solana_metrics::counter::Counter;
|
||||||
pub use solana_sdk::packet::PACKET_DATA_SIZE;
|
pub use solana_sdk::packet::PACKET_DATA_SIZE;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use std::borrow::Borrow;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
@ -243,7 +244,7 @@ pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<SharedPa
|
||||||
.resize(x.len(), Packet::default());
|
.resize(x.len(), Packet::default());
|
||||||
for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) {
|
for (i, o) in x.iter().zip(p.write().unwrap().packets.iter_mut()) {
|
||||||
let mut wr = io::Cursor::new(&mut o.data[..]);
|
let mut wr = io::Cursor::new(&mut o.data[..]);
|
||||||
serialize_into(&mut wr, &i).expect("serialize request");
|
bincode::serialize_into(&mut wr, &i).expect("serialize request");
|
||||||
let len = wr.position() as usize;
|
let len = wr.position() as usize;
|
||||||
o.meta.size = len;
|
o.meta.size = len;
|
||||||
}
|
}
|
||||||
|
@ -258,7 +259,7 @@ pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<SharedPackets> {
|
||||||
|
|
||||||
pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<Blob> {
|
pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<Blob> {
|
||||||
let mut b = Blob::default();
|
let mut b = Blob::default();
|
||||||
let v = serialize(&resp)?;
|
let v = bincode::serialize(&resp)?;
|
||||||
let len = v.len();
|
let len = v.len();
|
||||||
assert!(len <= BLOB_SIZE);
|
assert!(len <= BLOB_SIZE);
|
||||||
b.data[..len].copy_from_slice(&v);
|
b.data[..len].copy_from_slice(&v);
|
||||||
|
@ -288,6 +289,44 @@ pub fn to_shared_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<Share
|
||||||
Ok(blobs)
|
Ok(blobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn packets_to_blobs<T: Borrow<Packet>>(packets: &[T]) -> Vec<Blob> {
|
||||||
|
let mut current_index = 0;
|
||||||
|
let mut blobs = vec![];
|
||||||
|
while current_index < packets.len() {
|
||||||
|
let mut blob = Blob::default();
|
||||||
|
current_index += blob.store_packets(&packets[current_index..]) as usize;
|
||||||
|
blobs.push(blob);
|
||||||
|
}
|
||||||
|
|
||||||
|
blobs
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize_packets_in_blob(
|
||||||
|
data: &[u8],
|
||||||
|
serialized_packet_size: usize,
|
||||||
|
serialized_meta_size: usize,
|
||||||
|
) -> Result<Vec<Packet>> {
|
||||||
|
let mut packets: Vec<Packet> = Vec::with_capacity(data.len() / serialized_packet_size);
|
||||||
|
let mut pos = 0;
|
||||||
|
while pos + serialized_packet_size <= data.len() {
|
||||||
|
let packet = deserialize_single_packet_in_blob(
|
||||||
|
&data[pos..pos + serialized_packet_size],
|
||||||
|
serialized_meta_size,
|
||||||
|
)?;
|
||||||
|
pos += serialized_packet_size;
|
||||||
|
packets.push(packet);
|
||||||
|
}
|
||||||
|
Ok(packets)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result<Packet> {
|
||||||
|
let meta = bincode::deserialize(&data[..serialized_meta_size])?;
|
||||||
|
let mut packet_data = [0; PACKET_DATA_SIZE];
|
||||||
|
packet_data
|
||||||
|
.copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]);
|
||||||
|
Ok(Packet::new(packet_data, meta))
|
||||||
|
}
|
||||||
|
|
||||||
macro_rules! range {
|
macro_rules! range {
|
||||||
($prev:expr, $type:ident) => {
|
($prev:expr, $type:ident) => {
|
||||||
$prev..$prev + size_of::<$type>()
|
$prev..$prev + size_of::<$type>()
|
||||||
|
@ -419,16 +458,16 @@ impl Blob {
|
||||||
self.set_data_size(new_size as u64);
|
self.set_data_size(new_size as u64);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn store_packets(&mut self, packets: &[Packet]) -> u64 {
|
pub fn store_packets<T: Borrow<Packet>>(&mut self, packets: &[T]) -> u64 {
|
||||||
let size = self.size();
|
let size = self.size();
|
||||||
let mut cursor = Cursor::new(&mut self.data_mut()[size..]);
|
let mut cursor = Cursor::new(&mut self.data_mut()[size..]);
|
||||||
let mut written = 0;
|
let mut written = 0;
|
||||||
let mut last_index = 0;
|
let mut last_index = 0;
|
||||||
for packet in packets {
|
for packet in packets {
|
||||||
if serialize_into(&mut cursor, &packet.meta).is_err() {
|
if bincode::serialize_into(&mut cursor, &packet.borrow().meta).is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if cursor.write_all(&packet.data[..]).is_err() {
|
if cursor.write_all(&packet.borrow().data[..]).is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -518,12 +557,9 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot:
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::packet::{
|
use super::*;
|
||||||
to_packets, Blob, Meta, Packet, Packets, SharedBlob, SharedPackets, NUM_PACKETS,
|
use bincode;
|
||||||
PACKET_DATA_SIZE,
|
use rand::Rng;
|
||||||
};
|
|
||||||
use crate::packet::{BLOB_HEADER_SIZE, BLOB_SIZE};
|
|
||||||
use bincode::serialized_size;
|
|
||||||
use solana_sdk::hash::Hash;
|
use solana_sdk::hash::Hash;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
use solana_sdk::system_transaction::SystemTransaction;
|
use solana_sdk::system_transaction::SystemTransaction;
|
||||||
|
@ -642,7 +678,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_store_blobs_max() {
|
fn test_store_blobs_max() {
|
||||||
let meta = Meta::default();
|
let meta = Meta::default();
|
||||||
let serialized_meta_size = serialized_size(&meta).unwrap() as usize;
|
let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize;
|
||||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||||
let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1;
|
let num_packets = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size + 1;
|
||||||
let mut blob = Blob::default();
|
let mut blob = Blob::default();
|
||||||
|
@ -664,4 +700,78 @@ mod tests {
|
||||||
// Blob is now full
|
// Blob is now full
|
||||||
assert_eq!(blob.store_packets(&packets), 0);
|
assert_eq!(blob.store_packets(&packets), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_packets_to_blobs() {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let meta = Meta::default();
|
||||||
|
let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize;
|
||||||
|
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||||
|
|
||||||
|
let packets_per_blob = (BLOB_SIZE - BLOB_HEADER_SIZE) / serialized_packet_size;
|
||||||
|
assert!(packets_per_blob > 1);
|
||||||
|
let num_packets = packets_per_blob * 10 + packets_per_blob - 1;
|
||||||
|
|
||||||
|
let packets: Vec<_> = (0..num_packets)
|
||||||
|
.map(|_| {
|
||||||
|
let mut packet = Packet::default();
|
||||||
|
for i in 0..packet.meta.addr.len() {
|
||||||
|
packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX);
|
||||||
|
}
|
||||||
|
for i in 0..packet.data.len() {
|
||||||
|
packet.data[i] = rng.gen_range(1, std::u8::MAX);
|
||||||
|
}
|
||||||
|
packet
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let blobs = packets_to_blobs(&packets[..]);
|
||||||
|
assert_eq!(blobs.len(), 11);
|
||||||
|
|
||||||
|
let reconstructed_packets: Vec<Packet> = blobs
|
||||||
|
.iter()
|
||||||
|
.flat_map(|b| {
|
||||||
|
deserialize_packets_in_blob(
|
||||||
|
&b.data()[..b.size()],
|
||||||
|
serialized_packet_size,
|
||||||
|
serialized_meta_size,
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
assert_eq!(reconstructed_packets, packets);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deserialize_packets_in_blob() {
|
||||||
|
let meta = Meta::default();
|
||||||
|
let serialized_meta_size = bincode::serialized_size(&meta).unwrap() as usize;
|
||||||
|
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||||
|
let num_packets = 10;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let packets: Vec<_> = (0..num_packets)
|
||||||
|
.map(|_| {
|
||||||
|
let mut packet = Packet::default();
|
||||||
|
for i in 0..packet.meta.addr.len() {
|
||||||
|
packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX);
|
||||||
|
}
|
||||||
|
for i in 0..packet.data.len() {
|
||||||
|
packet.data[i] = rng.gen_range(1, std::u8::MAX);
|
||||||
|
}
|
||||||
|
packet
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut blob = Blob::default();
|
||||||
|
assert_eq!(blob.store_packets(&packets[..]), num_packets);
|
||||||
|
let result = deserialize_packets_in_blob(
|
||||||
|
&blob.data()[..blob.size()],
|
||||||
|
serialized_packet_size,
|
||||||
|
serialized_meta_size,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(result, packets);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
||||||
//!
|
//!
|
||||||
|
|
||||||
use crate::packet::{Blob, Meta, Packet, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE};
|
use crate::packet::{
|
||||||
|
deserialize_packets_in_blob, Blob, Meta, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE,
|
||||||
|
};
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use bincode::{deserialize, serialized_size};
|
use bincode;
|
||||||
use solana_metrics::{influxdb, submit};
|
use solana_metrics::{influxdb, submit};
|
||||||
use solana_sdk::timing::duration_as_ms;
|
use solana_sdk::timing::duration_as_ms;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
|
@ -140,32 +142,6 @@ pub fn blob_receiver(
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result<Packet> {
|
|
||||||
let meta = deserialize(&data[..serialized_meta_size])?;
|
|
||||||
let mut packet_data = [0; PACKET_DATA_SIZE];
|
|
||||||
packet_data
|
|
||||||
.copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]);
|
|
||||||
Ok(Packet::new(packet_data, meta))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deserialize_packets_in_blob(
|
|
||||||
data: &[u8],
|
|
||||||
serialized_packet_size: usize,
|
|
||||||
serialized_meta_size: usize,
|
|
||||||
) -> Result<Vec<Packet>> {
|
|
||||||
let mut packets: Vec<Packet> = Vec::with_capacity(data.len() / serialized_packet_size);
|
|
||||||
let mut pos = 0;
|
|
||||||
while pos + serialized_packet_size <= data.len() {
|
|
||||||
let packet = deserialize_single_packet_in_blob(
|
|
||||||
&data[pos..pos + serialized_packet_size],
|
|
||||||
serialized_meta_size,
|
|
||||||
)?;
|
|
||||||
pos += serialized_packet_size;
|
|
||||||
packets.push(packet);
|
|
||||||
}
|
|
||||||
Ok(packets)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
|
fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
|
||||||
trace!(
|
trace!(
|
||||||
"recv_blob_packets: receiving on {}",
|
"recv_blob_packets: receiving on {}",
|
||||||
|
@ -173,7 +149,7 @@ fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
|
||||||
);
|
);
|
||||||
|
|
||||||
let meta = Meta::default();
|
let meta = Meta::default();
|
||||||
let serialized_meta_size = serialized_size(&meta)? as usize;
|
let serialized_meta_size = bincode::serialized_size(&meta)? as usize;
|
||||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||||
let blobs = Blob::recv_from(sock)?;
|
let blobs = Blob::recv_from(sock)?;
|
||||||
for blob in blobs {
|
for blob in blobs {
|
||||||
|
@ -222,9 +198,8 @@ pub fn blob_packet_receiver(
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::packet::{Blob, Meta, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
|
use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
|
||||||
use crate::streamer::{receiver, responder};
|
use crate::streamer::{receiver, responder};
|
||||||
use rand::Rng;
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
|
@ -286,36 +261,4 @@ mod test {
|
||||||
t_receiver.join().expect("join");
|
t_receiver.join().expect("join");
|
||||||
t_responder.join().expect("join");
|
t_responder.join().expect("join");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_streamer_deserialize_packets_in_blob() {
|
|
||||||
let meta = Meta::default();
|
|
||||||
let serialized_meta_size = serialized_size(&meta).unwrap() as usize;
|
|
||||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
|
||||||
let num_packets = 10;
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
let packets: Vec<_> = (0..num_packets)
|
|
||||||
.map(|_| {
|
|
||||||
let mut packet = Packet::default();
|
|
||||||
for i in 0..packet.meta.addr.len() {
|
|
||||||
packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX);
|
|
||||||
}
|
|
||||||
for i in 0..packet.data.len() {
|
|
||||||
packet.data[i] = rng.gen_range(1, std::u8::MAX);
|
|
||||||
}
|
|
||||||
packet
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut blob = Blob::default();
|
|
||||||
assert_eq!(blob.store_packets(&packets[..]), num_packets);
|
|
||||||
let result = deserialize_packets_in_blob(
|
|
||||||
&blob.data()[..blob.size()],
|
|
||||||
serialized_packet_size,
|
|
||||||
serialized_meta_size,
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(result, packets);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue