From 729d28d910edfbab7a7e2720471671b055bf5688 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Mon, 12 Nov 2018 12:41:19 -0800 Subject: [PATCH] Add poh verification before processing entries - Replicate stage now verifies entries delivered by the window - Minor refactor of entries_from_blobs --- .gitignore | 2 +- benches/ledger.rs | 5 +++-- src/entry.rs | 22 +++++++++++++++++++++- src/fullnode.rs | 14 ++++++++------ src/ledger.rs | 31 ++++--------------------------- src/packet.rs | 2 ++ src/replicate_stage.rs | 41 +++++++++++++++++++++++++++++------------ src/tvu.rs | 21 ++++++++++++++------- src/window.rs | 8 +++----- 9 files changed, 85 insertions(+), 61 deletions(-) diff --git a/.gitignore b/.gitignore index 7d4f47eba..ef0e4c7ba 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,4 @@ log-*.txt # intellij files /.idea/ -/solana.iml \ No newline at end of file +/solana.iml diff --git a/benches/ledger.rs b/benches/ledger.rs index dd885c4ea..c2311b2f9 100644 --- a/benches/ledger.rs +++ b/benches/ledger.rs @@ -2,8 +2,9 @@ extern crate solana; extern crate test; +use solana::entry::reconstruct_entries_from_blobs; use solana::hash::{hash, Hash}; -use solana::ledger::{next_entries, reconstruct_entries_from_blobs, Block}; +use solana::ledger::{next_entries, Block}; use solana::signature::{Keypair, KeypairUtil}; use solana::system_transaction::SystemTransaction; use solana::transaction::Transaction; @@ -20,6 +21,6 @@ fn bench_block_to_blobs_to_block(bencher: &mut Bencher) { bencher.iter(|| { let blobs = entries.to_blobs(); - assert_eq!(reconstruct_entries_from_blobs(blobs).unwrap(), entries); + assert_eq!(reconstruct_entries_from_blobs(blobs).unwrap().0, entries); }); } diff --git a/src/entry.rs b/src/entry.rs index 2ec57c4e4..10b1bc483 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -2,10 +2,11 @@ //! unique ID that is the hash of the Entry before it, plus the hash of the //! transactions within it. Entries cannot be reordered, and its field `num_hashes` //! represents an approximate amount of time since the last Entry was created. -use bincode::{serialize_into, serialized_size}; +use bincode::{deserialize, serialize_into, serialized_size}; use hash::Hash; use packet::{SharedBlob, BLOB_DATA_SIZE}; use poh::Poh; +use result::Result; use solana_sdk::pubkey::Pubkey; use std::io::Cursor; use std::mem::size_of; @@ -249,6 +250,25 @@ pub fn next_entry(prev_id: &Hash, num_hashes: u64, transactions: Vec) -> Result<(Vec, u64)> { + let mut entries: Vec = Vec::with_capacity(blobs.len()); + let mut num_ticks = 0; + + for blob in blobs { + let entry: Entry = { + let msg = blob.read().unwrap(); + let msg_size = msg.size()?; + deserialize(&msg.data()[..msg_size]).expect("Error reconstructing entry") + }; + + if entry.is_tick() { + num_ticks += 1 + } + entries.push(entry) + } + Ok((entries, num_ticks)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/fullnode.rs b/src/fullnode.rs index 88e8662d0..5d733a905 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -140,7 +140,7 @@ impl Fullnode { info!("creating bank..."); - let (bank, entry_height, last_id) = + let (bank, entry_height, last_entry_id) = Self::new_bank_from_ledger(ledger_path, leader_scheduler); info!("creating networking stack..."); @@ -161,7 +161,7 @@ impl Fullnode { vote_account_keypair, bank, entry_height, - &last_id, + &last_entry_id, node, leader_info.as_ref(), ledger_path, @@ -191,7 +191,7 @@ impl Fullnode { vote_account_keypair: Arc, bank: Bank, entry_height: u64, - last_id: &Hash, + last_entry_id: &Hash, mut node: Node, bootstrap_leader_info_option: Option<&NodeInfo>, ledger_path: &str, @@ -249,6 +249,7 @@ impl Fullnode { vote_account_keypair.clone(), &bank, entry_height, + *last_entry_id, cluster_info.clone(), shared_window.clone(), node.sockets @@ -285,7 +286,7 @@ impl Fullnode { ledger_path, sigverify_disabled, max_tick_height, - last_id, + last_entry_id, ); let broadcast_stage = BroadcastStage::new( @@ -395,6 +396,7 @@ impl Fullnode { self.vote_account_keypair.clone(), &self.bank, entry_height, + last_entry_id, self.cluster_info.clone(), self.shared_window.clone(), self.replicate_socket @@ -521,11 +523,11 @@ impl Fullnode { .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); info!("processing ledger..."); - let (entry_height, last_id) = bank.process_ledger(entries).expect("process_ledger"); + let (entry_height, last_entry_id) = bank.process_ledger(entries).expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); - (bank, entry_height, last_id) + (bank, entry_height, last_entry_id) } pub fn get_leader_scheduler(&self) -> &Arc> { diff --git a/src/ledger.rs b/src/ledger.rs index ece67648c..f42042e52 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -2,7 +2,7 @@ //! Proof of History ledger as well as iterative read, append write, and random //! access read to a persistent file-based ledger. -use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; +use bincode::{self, deserialize_from, serialize_into, serialized_size}; #[cfg(test)] use budget_transaction::BudgetTransaction; #[cfg(test)] @@ -15,7 +15,6 @@ use log::Level::Trace; use mint::Mint; use packet::{SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; -use result::{Error, Result}; use signature::{Keypair, KeypairUtil}; use solana_sdk::pubkey::Pubkey; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; @@ -502,28 +501,6 @@ impl Block for [Entry] { } } -// TODO: move this to the right file, entry.rs? -pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result> { - let mut entries: Vec = Vec::with_capacity(blobs.len()); - - for blob in blobs { - let entry = { - let blob = blob.read().unwrap(); - let blob_size = blob.size()?; - deserialize(&blob.data()[..blob_size]) - }; - - match entry { - Ok(entry) => entries.push(entry), - Err(err) => { - trace!("reconstruct_entry_from_blobs: {:?}", err); - return Err(Error::Serialize(err)); - } - } - } - Ok(entries) -} - /// Creates the next entries for given transactions, outputs /// updates start_hash to id of last Entry, sets num_hashes to 0 pub fn next_entries_mut( @@ -692,9 +669,9 @@ pub fn make_tiny_test_entries(num: usize) -> Vec { #[cfg(test)] mod tests { use super::*; - use bincode::serialized_size; + use bincode::{deserialize, serialized_size}; use budget_transaction::BudgetTransaction; - use entry::{next_entry, Entry}; + use entry::{next_entry, reconstruct_entries_from_blobs, Entry}; use hash::hash; use packet::{to_blobs, BLOB_DATA_SIZE, PACKET_DATA_SIZE}; use signature::{Keypair, KeypairUtil}; @@ -755,7 +732,7 @@ mod tests { let blob_q = entries.to_blobs(); - assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries); + assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap().0, entries); } #[test] diff --git a/src/packet.rs b/src/packet.rs index 383d0308f..150eee8e4 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -152,6 +152,8 @@ impl Default for Blob { pub enum BlobError { /// the Blob's meta and data are not self-consistent BadState, + /// Blob verification failed + VerificationFailed, } impl Packets { diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index c82437789..14ab6cbc9 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -6,8 +6,10 @@ use counter::Counter; use entry::{EntryReceiver, EntrySender}; use hash::Hash; use influx_db_client as influxdb; +use ledger::Block; use log::Level; use metrics; +use packet::BlobError; use result::{Error, Result}; use service::Service; use signature::{Keypair, KeypairUtil}; @@ -20,6 +22,7 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use std::time::Instant; use streamer::{responder, BlobSender}; +use timing::duration_as_ms; use vote_stage::send_validator_vote; #[derive(Debug, PartialEq, Eq, Clone)] @@ -61,7 +64,7 @@ impl ReplicateStage { vote_blob_sender: Option<&BlobSender>, ledger_entry_sender: &EntrySender, entry_height: &mut u64, - last_entry_id: &mut Option, + last_entry_id: &mut Hash, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote @@ -78,6 +81,15 @@ impl ReplicateStage { let mut res = Ok(()); let mut num_entries_to_write = entries.len(); + let now = Instant::now(); + if !entries.as_slice().verify(last_entry_id) { + inc_new_counter_info!("replicate_stage-verify-fail", entries.len()); + return Err(Error::BlobError(BlobError::VerificationFailed)); + } + inc_new_counter_info!( + "replicate_stage-verify-duration", + duration_as_ms(&now.elapsed()) as usize + ); let (current_leader, _) = bank .get_current_leader() .expect("Scheduled leader id should never be unknown while processing entries"); @@ -110,12 +122,10 @@ impl ReplicateStage { // If leader rotation happened, only write the entries up to leader rotation. entries.truncate(num_entries_to_write); - *last_entry_id = Some( - entries - .last() - .expect("Entries cannot be empty at this point") - .id, - ); + *last_entry_id = entries + .last() + .expect("Entries cannot be empty at this point") + .id; inc_new_counter_info!( "replicate-transactions", @@ -148,6 +158,7 @@ impl ReplicateStage { window_receiver: EntryReceiver, exit: Arc, entry_height: u64, + last_entry_id: Hash, ) -> (Self, EntryReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel(); @@ -163,7 +174,7 @@ impl ReplicateStage { let now = Instant::now(); let mut next_vote_secs = 1; let mut entry_height_ = entry_height; - let mut last_entry_id = None; + let mut last_entry_id = last_entry_id; loop { let (leader_id, _) = bank .get_current_leader() @@ -177,7 +188,7 @@ impl ReplicateStage { // rotation (Fullnode should automatically transition on startup if it detects // are no longer a validator. Hence we can assume that some entry must have // triggered leader rotation - last_entry_id.expect("Must exist an entry that triggered rotation"), + last_entry_id, )); } @@ -307,7 +318,8 @@ mod test { Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); // Set up the bank - let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); + let (bank, _, last_entry_id) = + Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the replicate stage let (entry_sender, entry_receiver) = channel(); @@ -320,6 +332,7 @@ mod test { entry_receiver, exit.clone(), initial_entry_len, + last_entry_id, ); // Send enough ticks to trigger leader rotation @@ -392,7 +405,8 @@ mod test { let initial_entry_len = genesis_entries.len(); // Set up the bank - let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); + let (bank, _, last_entry_id) = + Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new( @@ -412,6 +426,7 @@ mod test { entry_receiver, exit.clone(), initial_entry_len as u64, + last_entry_id, ); // Vote sender should error because no leader contact info is found in the @@ -504,7 +519,8 @@ mod test { Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); // Set up the bank - let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); + let (bank, _, last_entry_id) = + Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the cluster info let cluster_info_me = Arc::new(RwLock::new( @@ -524,6 +540,7 @@ mod test { entry_receiver, exit.clone(), initial_entry_len as u64, + last_entry_id, ); // Vote sender should error because no leader contact info is found in the diff --git a/src/tvu.rs b/src/tvu.rs index f0aa65f5d..6a61db845 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -60,6 +60,7 @@ impl Tvu { vote_account_keypair: Arc, bank: &Arc, entry_height: u64, + last_entry_id: Hash, cluster_info: Arc>, window: SharedWindow, replicate_sockets: Vec, @@ -97,6 +98,7 @@ impl Tvu { blob_window_receiver, exit.clone(), entry_height, + last_entry_id, ); let (ledger_write_stage, storage_entry_receiver) = @@ -165,7 +167,7 @@ pub mod tests { use bincode::serialize; use cluster_info::{ClusterInfo, Node}; use entry::Entry; - use hash::{hash, Hash}; + use hash::Hash; use leader_scheduler::LeaderScheduler; use logger; use mint::Mint; @@ -258,11 +260,13 @@ pub mod tests { let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); let vote_account_keypair = Arc::new(Keypair::new()); + let mut cur_hash = Hash::default(); let tvu = Tvu::new( Arc::new(target1_keypair), vote_account_keypair, &bank, 0, + cur_hash, cref1, dr_1.1, target1.sockets.replicate, @@ -273,15 +277,16 @@ pub mod tests { let mut alice_ref_balance = starting_balance; let mut msgs = Vec::new(); - let mut cur_hash = Hash::default(); let mut blob_idx = 0; let num_transfers = 10; let transfer_amount = 501; let bob_keypair = Keypair::new(); for i in 0..num_transfers { let entry0 = Entry::new(&cur_hash, i, vec![]); + cur_hash = entry0.id; bank.register_tick(&cur_hash); - cur_hash = hash(&cur_hash.as_ref()); + let entry_tick0 = Entry::new(&cur_hash, i + 1, vec![]); + cur_hash = entry_tick0.id; let tx0 = Transaction::system_new( &mint.keypair(), @@ -290,14 +295,16 @@ pub mod tests { cur_hash, ); bank.register_tick(&cur_hash); - cur_hash = hash(&cur_hash.as_ref()); + let entry_tick1 = Entry::new(&cur_hash, i + 1, vec![]); + cur_hash = entry_tick1.id; let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]); - bank.register_tick(&cur_hash); - cur_hash = hash(&cur_hash.as_ref()); + bank.register_tick(&entry1.id); + let entry_tick2 = Entry::new(&entry1.id, i + 1, vec![]); + cur_hash = entry_tick2.id; alice_ref_balance -= transfer_amount; - for entry in vec![entry0, entry1] { + for entry in vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2] { let mut b = SharedBlob::default(); { let mut w = b.write().unwrap(); diff --git a/src/window.rs b/src/window.rs index c81f82daf..50b237e19 100644 --- a/src/window.rs +++ b/src/window.rs @@ -2,11 +2,11 @@ //! use cluster_info::ClusterInfo; use counter::Counter; +use entry::reconstruct_entries_from_blobs; use entry::Entry; #[cfg(feature = "erasure")] use erasure; use leader_scheduler::LeaderScheduler; -use ledger::reconstruct_entries_from_blobs; use log::Level; use packet::SharedBlob; use solana_sdk::pubkey::Pubkey; @@ -352,11 +352,9 @@ impl WindowUtil for Window { // Check that we can get the entries from this blob match reconstruct_entries_from_blobs(vec![k_data_blob]) { - Ok(entries) => { - for entry in &entries { - *tick_height += entry.is_tick() as u64; - } + Ok((entries, num_ticks)) => { consume_queue.extend(entries); + *tick_height += num_ticks; } Err(_) => { // If the blob can't be deserialized, then remove it from the