From d3068c3918c2bb4101fc75f7ecac6ef5903dbfbb Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 17 Oct 2019 11:37:30 -0600 Subject: [PATCH] Remove circular dependencies in core (#6408) * Remove core::result dependency from blocktree * Remove core::result dependency from shred * Move Packet from core::packet to sdk::packet This way we don't need to split perf_libs yet. * Disable packet when compiling BPF programs --- core/src/blocktree.rs | 33 ++++++-- core/src/blocktree/db.rs | 4 +- core/src/blocktree/rocks.rs | 9 +- core/src/blocktree/rooted_slot_iterator.rs | 2 +- core/src/packet.rs | 98 +--------------------- core/src/perf_libs.rs | 2 +- core/src/replay_stage.rs | 4 +- core/src/shred.rs | 18 ++-- sdk/src/lib.rs | 3 +- sdk/src/packet.rs | 97 +++++++++++++++++++++ 10 files changed, 148 insertions(+), 122 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 0a2cc266e..314a41037 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -3,7 +3,6 @@ //! access read to a persistent file-based ledger. use crate::entry::Entry; use crate::erasure::ErasureConfig; -use crate::result::{Error, Result}; use crate::shred::{Shred, Shredder}; use bincode::deserialize; @@ -23,6 +22,7 @@ use std::cmp; use std::fs; use std::path::{Path, PathBuf}; use std::rc::Rc; +use std::result; use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -70,6 +70,29 @@ pub enum BlocktreeError { InvalidShredData(Box), RocksDb(rocksdb::Error), SlotNotRooted, + IO(std::io::Error), + Serialize(std::boxed::Box), +} +pub type Result = result::Result; + +impl std::error::Error for BlocktreeError {} + +impl std::fmt::Display for BlocktreeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "blocktree error") + } +} + +impl std::convert::From for BlocktreeError { + fn from(e: std::io::Error) -> BlocktreeError { + BlocktreeError::IO(e) + } +} + +impl std::convert::From> for BlocktreeError { + fn from(e: std::boxed::Box) -> BlocktreeError { + BlocktreeError::Serialize(e) + } } // ledger window @@ -616,8 +639,8 @@ impl Blocktree { // `insert_coding_shred` is called if shred_index < u64::from(pos) { error!("Due to earlier validation, shred index must be >= pos"); - return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData( - Box::new(bincode::ErrorKind::Custom("shred index < pos".to_string())), + return Err(BlocktreeError::InvalidShredData(Box::new( + bincode::ErrorKind::Custom("shred index < pos".to_string()), ))); } @@ -1035,11 +1058,11 @@ impl Blocktree { debug!("{:?} shreds in last FEC set", shred_chunk.len(),); let entries: Vec = bincode::deserialize(&deshred_payload).map_err(|_| { - Error::BlocktreeError(BlocktreeError::InvalidShredData(Box::new( + BlocktreeError::InvalidShredData(Box::new( bincode::ErrorKind::Custom( "could not construct entries".to_string(), ), - ))) + )) })?; return Ok((entries, shred_chunk.len())); } else { diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 5b0f3fdf6..87f41acfa 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -1,4 +1,4 @@ -use crate::result::{Error, Result}; +use crate::blocktree::{BlocktreeError, Result}; use bincode::{deserialize, serialize}; @@ -64,7 +64,7 @@ pub trait Backend: Sized + Send + Sync { type Cursor: DbCursor; type Iter: Iterator, Box<[u8]>)>; type WriteBatch: IWriteBatch; - type Error: Into; + type Error: Into; fn open(path: &Path) -> Result; diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index a4d31cea6..020f998c4 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -2,8 +2,7 @@ use crate::blocktree::db::columns as cf; use crate::blocktree::db::{ Backend, Column, DbCursor, IWriteBatch, IteratorDirection, IteratorMode, TypedColumn, }; -use crate::blocktree::BlocktreeError; -use crate::result::{Error, Result}; +use crate::blocktree::{BlocktreeError, Result}; use solana_sdk::clock::Slot; use byteorder::{BigEndian, ByteOrder}; @@ -412,9 +411,9 @@ impl IWriteBatch for RWriteBatch { } } -impl std::convert::From for Error { - fn from(e: rocksdb::Error) -> Error { - Error::BlocktreeError(BlocktreeError::RocksDb(e)) +impl std::convert::From for BlocktreeError { + fn from(e: rocksdb::Error) -> BlocktreeError { + BlocktreeError::RocksDb(e) } } diff --git a/core/src/blocktree/rooted_slot_iterator.rs b/core/src/blocktree/rooted_slot_iterator.rs index f3b691c68..d0f52be51 100644 --- a/core/src/blocktree/rooted_slot_iterator.rs +++ b/core/src/blocktree/rooted_slot_iterator.rs @@ -13,7 +13,7 @@ impl<'a> RootedSlotIterator<'a> { blocktree, }) } else { - Err(Error::BlocktreeError(BlocktreeError::SlotNotRooted)) + Err(BlocktreeError::SlotNotRooted) } } } diff --git a/core/src/packet.rs b/core/src/packet.rs index 6dc966f34..8f46fb30a 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -8,7 +8,7 @@ use bincode; use byteorder::{ByteOrder, LittleEndian}; use serde::Serialize; use solana_metrics::inc_new_counter_debug; -pub use solana_sdk::packet::PACKET_DATA_SIZE; +pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signable; use solana_sdk::signature::Signature; @@ -19,7 +19,7 @@ use std::io; use std::io::Cursor; use std::mem; use std::mem::size_of; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; +use std::net::{SocketAddr, UdpSocket}; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, RwLock}; use std::time::Instant; @@ -36,100 +36,6 @@ pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; pub const PACKETS_PER_BATCH: usize = 256; pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE); -#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] -#[repr(C)] -pub struct Meta { - pub size: usize, - pub forward: bool, - pub repair: bool, - pub addr: [u16; 8], - pub port: u16, - pub v6: bool, - pub seed: [u8; 32], - pub slot: u64, -} - -#[derive(Clone)] -#[repr(C)] -pub struct Packet { - pub data: [u8; PACKET_DATA_SIZE], - pub meta: Meta, -} - -impl Packet { - pub fn new(data: [u8; PACKET_DATA_SIZE], meta: Meta) -> Self { - Self { data, meta } - } -} - -impl fmt::Debug for Packet { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Packet {{ size: {:?}, addr: {:?} }}", - self.meta.size, - self.meta.addr() - ) - } -} - -impl Default for Packet { - fn default() -> Packet { - Packet { - data: unsafe { std::mem::MaybeUninit::uninit().assume_init() }, - meta: Meta::default(), - } - } -} - -impl PartialEq for Packet { - fn eq(&self, other: &Packet) -> bool { - let self_data: &[u8] = self.data.as_ref(); - let other_data: &[u8] = other.data.as_ref(); - self.meta == other.meta && self_data[..self.meta.size] == other_data[..other.meta.size] - } -} - -impl Meta { - pub fn addr(&self) -> SocketAddr { - if !self.v6 { - let addr = [ - self.addr[0] as u8, - self.addr[1] as u8, - self.addr[2] as u8, - self.addr[3] as u8, - ]; - let ipv4: Ipv4Addr = From::<[u8; 4]>::from(addr); - SocketAddr::new(IpAddr::V4(ipv4), self.port) - } else { - let ipv6: Ipv6Addr = From::<[u16; 8]>::from(self.addr); - SocketAddr::new(IpAddr::V6(ipv6), self.port) - } - } - - pub fn set_addr(&mut self, a: &SocketAddr) { - match *a { - SocketAddr::V4(v4) => { - let ip = v4.ip().octets(); - self.addr[0] = u16::from(ip[0]); - self.addr[1] = u16::from(ip[1]); - self.addr[2] = u16::from(ip[2]); - self.addr[3] = u16::from(ip[3]); - self.addr[4] = 0; - self.addr[5] = 0; - self.addr[6] = 0; - self.addr[7] = 0; - self.v6 = false; - } - SocketAddr::V6(v6) => { - self.addr = v6.ip().segments(); - self.v6 = true; - } - } - self.port = a.port(); - } -} - #[derive(Debug, Clone)] pub struct Packets { pub packets: PinnedVec, diff --git a/core/src/perf_libs.rs b/core/src/perf_libs.rs index 0337f12ec..1c42474d6 100644 --- a/core/src/perf_libs.rs +++ b/core/src/perf_libs.rs @@ -1,6 +1,6 @@ -use crate::packet::Packet; use core::ffi::c_void; use dlopen::symbor::{Container, SymBorApi, Symbol}; +use solana_sdk::packet::Packet; use std::env; use std::ffi::OsStr; use std::fs; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5c57a0b33..5468426d6 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -743,7 +743,9 @@ impl ReplayStage { bank_progress: &mut ForkProgress, ) -> Result<(Vec, usize, u64, u64)> { let bank_slot = bank.slot(); - blocktree.get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64) + let entries_and_count = blocktree + .get_slot_entries_with_shred_count(bank_slot, bank_progress.num_shreds as u64)?; + Ok(entries_and_count) } fn replay_entries_into_bank( diff --git a/core/src/shred.rs b/core/src/shred.rs index 8f93f3558..5b6a3efc6 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -3,8 +3,6 @@ use crate::blocktree::BlocktreeError; use crate::entry::create_ticks; use crate::entry::Entry; use crate::erasure::Session; -use crate::result; -use crate::result::Error; use bincode::serialized_size; use core::cell::RefCell; use lazy_static::lazy_static; @@ -147,7 +145,7 @@ impl Shred { Self::new(header, shred_buf) } - pub fn new_from_serialized_shred(shred_buf: Vec) -> result::Result { + pub fn new_from_serialized_shred(shred_buf: Vec) -> Result { let shred_type: ShredType = bincode::deserialize(&shred_buf[..*SIZE_OF_SHRED_TYPE])?; let mut header = if shred_type == ShredType(CODING_SHRED) { let start = *SIZE_OF_SHRED_TYPE; @@ -162,8 +160,8 @@ impl Shred { header.data_header = bincode::deserialize(&shred_buf[start..end])?; header } else { - return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData( - Box::new(bincode::ErrorKind::Custom("Invalid shred type".to_string())), + return Err(BlocktreeError::InvalidShredData(Box::new( + bincode::ErrorKind::Custom("Invalid shred type".to_string()), ))); }; header.shred_type = shred_type; @@ -314,23 +312,23 @@ impl Shredder { parent_slot: u64, fec_rate: f32, keypair: Arc, - ) -> result::Result { + ) -> Result { if fec_rate > 1.0 || fec_rate < 0.0 { - Err(Error::IO(IOError::new( + Err(IOError::new( ErrorKind::Other, format!( "FEC rate {:?} must be more than 0.0 and less than 1.0", fec_rate ), - ))) + )) } else if slot < parent_slot || slot - parent_slot > u64::from(std::u16::MAX) { - Err(Error::IO(IOError::new( + Err(IOError::new( ErrorKind::Other, format!( "Current slot {:?} must be > Parent slot {:?}, but the difference must not be > {:?}", slot, parent_slot, std::u16::MAX ), - ))) + )) } else { Ok(Shredder { slot, diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 41398446b..63f58c7ac 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -12,7 +12,6 @@ pub mod loader_instruction; pub mod message; pub mod native_loader; pub mod native_token; -pub mod packet; pub mod poh_config; pub mod pubkey; pub mod rent_calculator; @@ -37,6 +36,8 @@ pub mod client; #[cfg(not(feature = "program"))] pub mod genesis_block; #[cfg(not(feature = "program"))] +pub mod packet; +#[cfg(not(feature = "program"))] pub mod signature; #[cfg(not(feature = "program"))] pub mod system_transaction; diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index ba0a9cac9..2a3ddf2fd 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -1,5 +1,102 @@ +use std::fmt; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + /// Maximum over-the-wire size of a Transaction /// 1280 is IPv6 minimum MTU /// 40 bytes is the size of the IPv6 header /// 8 bytes is the size of the fragment header pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; + +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] +#[repr(C)] +pub struct Meta { + pub size: usize, + pub forward: bool, + pub repair: bool, + pub addr: [u16; 8], + pub port: u16, + pub v6: bool, + pub seed: [u8; 32], + pub slot: u64, +} + +#[derive(Clone)] +#[repr(C)] +pub struct Packet { + pub data: [u8; PACKET_DATA_SIZE], + pub meta: Meta, +} + +impl Packet { + pub fn new(data: [u8; PACKET_DATA_SIZE], meta: Meta) -> Self { + Self { data, meta } + } +} + +impl fmt::Debug for Packet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Packet {{ size: {:?}, addr: {:?} }}", + self.meta.size, + self.meta.addr() + ) + } +} + +impl Default for Packet { + fn default() -> Packet { + Packet { + data: unsafe { std::mem::MaybeUninit::uninit().assume_init() }, + meta: Meta::default(), + } + } +} + +impl PartialEq for Packet { + fn eq(&self, other: &Packet) -> bool { + let self_data: &[u8] = self.data.as_ref(); + let other_data: &[u8] = other.data.as_ref(); + self.meta == other.meta && self_data[..self.meta.size] == other_data[..other.meta.size] + } +} + +impl Meta { + pub fn addr(&self) -> SocketAddr { + if !self.v6 { + let addr = [ + self.addr[0] as u8, + self.addr[1] as u8, + self.addr[2] as u8, + self.addr[3] as u8, + ]; + let ipv4: Ipv4Addr = From::<[u8; 4]>::from(addr); + SocketAddr::new(IpAddr::V4(ipv4), self.port) + } else { + let ipv6: Ipv6Addr = From::<[u16; 8]>::from(self.addr); + SocketAddr::new(IpAddr::V6(ipv6), self.port) + } + } + + pub fn set_addr(&mut self, a: &SocketAddr) { + match *a { + SocketAddr::V4(v4) => { + let ip = v4.ip().octets(); + self.addr[0] = u16::from(ip[0]); + self.addr[1] = u16::from(ip[1]); + self.addr[2] = u16::from(ip[2]); + self.addr[3] = u16::from(ip[3]); + self.addr[4] = 0; + self.addr[5] = 0; + self.addr[6] = 0; + self.addr[7] = 0; + self.v6 = false; + } + SocketAddr::V6(v6) => { + self.addr = v6.ip().segments(); + self.v6 = true; + } + } + self.port = a.port(); + } +}