Add logic for deserialzing packets embedded in blobs
This commit is contained in:
parent
536c8accf8
commit
b60b8ec5ae
|
@ -10,6 +10,8 @@ use solana_sdk::pubkey::Pubkey;
|
|||
use std::cmp;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::io::Cursor;
|
||||
use std::io::Write;
|
||||
use std::mem::size_of;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
@ -23,7 +25,7 @@ pub const BLOB_SIZE: usize = (64 * 1024 - 128); // wikipedia says there should b
|
|||
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2);
|
||||
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
|
||||
|
||||
#[derive(Clone, Default, Debug, PartialEq)]
|
||||
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[repr(C)]
|
||||
pub struct Meta {
|
||||
pub size: usize,
|
||||
|
@ -40,6 +42,12 @@ pub struct Packet {
|
|||
pub meta: Meta,
|
||||
}
|
||||
|
||||
impl Packet {
|
||||
pub fn new(data: [u8; PACKET_DATA_SIZE], meta: Meta) -> Self {
|
||||
Packet { data, meta }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Packet {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
|
@ -60,6 +68,12 @@ impl Default for Packet {
|
|||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Packet {
|
||||
fn eq(&self, other: &Packet) -> bool {
|
||||
self.data.iter().zip(other.data.iter()).all(|(a, b)| a == b) && self.meta == other.meta
|
||||
}
|
||||
}
|
||||
|
||||
impl Meta {
|
||||
pub fn addr(&self) -> SocketAddr {
|
||||
if !self.v6 {
|
||||
|
@ -405,6 +419,19 @@ impl Blob {
|
|||
self.set_data_size(new_size as u64);
|
||||
}
|
||||
|
||||
pub fn store_packets(&mut self, packets: &[Packet]) -> Result<()> {
|
||||
let mut cursor = Cursor::new(&mut self.data[..]);
|
||||
cursor.set_position(BLOB_HEADER_SIZE as u64);
|
||||
for packet in packets {
|
||||
serialize_into(&mut cursor, &packet.meta)?;
|
||||
cursor.write(&packet.data[..])?;
|
||||
}
|
||||
let size = cursor.position();
|
||||
self.set_size(size as usize);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
|
||||
let mut p = r.write().unwrap();
|
||||
trace!("receiving on {}", socket.local_addr().unwrap());
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
|
||||
//!
|
||||
|
||||
use crate::packet::{Blob, Packet, Packets, SharedBlobs, SharedPackets};
|
||||
use crate::packet::{Blob, Meta, Packet, Packets, SharedBlobs, SharedPackets, PACKET_DATA_SIZE};
|
||||
use crate::result::{Error, Result};
|
||||
use bincode::deserialize;
|
||||
use bincode::{deserialize, serialized_size};
|
||||
use solana_metrics::{influxdb, submit};
|
||||
use solana_sdk::timing::duration_as_ms;
|
||||
use std::net::UdpSocket;
|
||||
|
@ -140,19 +140,58 @@ pub fn blob_receiver(
|
|||
.unwrap()
|
||||
}
|
||||
|
||||
fn deserialize_single_packet_in_blob(data: &[u8], serialized_meta_size: usize) -> Result<Packet> {
|
||||
let meta = deserialize(&data[..serialized_meta_size])?;
|
||||
let mut packet_data = [0; PACKET_DATA_SIZE];
|
||||
packet_data
|
||||
.copy_from_slice(&data[serialized_meta_size..serialized_meta_size + PACKET_DATA_SIZE]);
|
||||
Ok(Packet::new(packet_data, meta))
|
||||
}
|
||||
|
||||
fn deserialize_packets_in_blob(
|
||||
data: &[u8],
|
||||
serialized_packet_size: usize,
|
||||
serialized_meta_size: usize,
|
||||
) -> Result<Vec<Packet>> {
|
||||
let mut packets: Vec<Packet> = Vec::with_capacity(data.len() / serialized_packet_size);
|
||||
let mut pos = 0;
|
||||
while pos + serialized_packet_size <= data.len() {
|
||||
let packet = deserialize_single_packet_in_blob(
|
||||
&data[pos..pos + serialized_packet_size],
|
||||
serialized_meta_size,
|
||||
)?;
|
||||
pos += serialized_packet_size;
|
||||
packets.push(packet);
|
||||
}
|
||||
Ok(packets)
|
||||
}
|
||||
|
||||
fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
|
||||
trace!(
|
||||
"recv_blob_packets: receiving on {}",
|
||||
sock.local_addr().unwrap()
|
||||
);
|
||||
|
||||
let meta = Meta::default();
|
||||
let serialized_meta_size = serialized_size(&meta)? as usize;
|
||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||
let blobs = Blob::recv_from(sock)?;
|
||||
for blob in blobs {
|
||||
let msgs: Vec<Packet> = {
|
||||
let r_blob = blob.read().unwrap();
|
||||
let r_blob = blob.read().unwrap();
|
||||
let data = {
|
||||
let msg_size = r_blob.size();
|
||||
deserialize(&r_blob.data()[..msg_size])?
|
||||
&r_blob.data()[..msg_size]
|
||||
};
|
||||
s.send(Arc::new(RwLock::new(Packets::new(msgs))))?;
|
||||
|
||||
let packets =
|
||||
deserialize_packets_in_blob(data, serialized_packet_size, serialized_meta_size);
|
||||
|
||||
if packets.is_err() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let packets = packets?;
|
||||
s.send(Arc::new(RwLock::new(Packets::new(packets))))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -182,9 +221,10 @@ pub fn blob_packet_receiver(
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
|
||||
use crate::streamer::PacketReceiver;
|
||||
use super::*;
|
||||
use crate::packet::{Blob, Meta, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
|
||||
use crate::streamer::{receiver, responder};
|
||||
use rand::Rng;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::net::UdpSocket;
|
||||
|
@ -246,4 +286,36 @@ mod test {
|
|||
t_receiver.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn streamer_test_deserialize_packets_in_blob() {
|
||||
let meta = Meta::default();
|
||||
let serialized_meta_size = serialized_size(&meta).unwrap() as usize;
|
||||
let serialized_packet_size = serialized_meta_size + PACKET_DATA_SIZE;
|
||||
let num_packets = 10;
|
||||
let mut rng = rand::thread_rng();
|
||||
let packets: Vec<_> = (0..num_packets)
|
||||
.map(|_| {
|
||||
let mut packet = Packet::default();
|
||||
for i in 0..packet.meta.addr.len() {
|
||||
packet.meta.addr[i] = rng.gen_range(1, std::u16::MAX);
|
||||
}
|
||||
for i in 0..packet.data.len() {
|
||||
packet.data[i] = rng.gen_range(1, std::u8::MAX);
|
||||
}
|
||||
packet
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut blob = Blob::default();
|
||||
blob.store_packets(&packets[..]).unwrap();
|
||||
let result = deserialize_packets_in_blob(
|
||||
&blob.data()[..blob.size()],
|
||||
serialized_packet_size,
|
||||
serialized_meta_size,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result, packets);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue