Add poh verification before processing entries

- Replicate stage now verifies entries delivered
  by the window
- Minor refactor of entries_from_blobs
This commit is contained in:
Sagar Dhawan 2018-11-12 12:41:19 -08:00
parent 66e9d30fda
commit 729d28d910
9 changed files with 85 additions and 61 deletions

2
.gitignore vendored
View File

@ -19,4 +19,4 @@ log-*.txt
# intellij files
/.idea/
/solana.iml
/solana.iml

View File

@ -2,8 +2,9 @@
extern crate solana;
extern crate test;
use solana::entry::reconstruct_entries_from_blobs;
use solana::hash::{hash, Hash};
use solana::ledger::{next_entries, reconstruct_entries_from_blobs, Block};
use solana::ledger::{next_entries, Block};
use solana::signature::{Keypair, KeypairUtil};
use solana::system_transaction::SystemTransaction;
use solana::transaction::Transaction;
@ -20,6 +21,6 @@ fn bench_block_to_blobs_to_block(bencher: &mut Bencher) {
bencher.iter(|| {
let blobs = entries.to_blobs();
assert_eq!(reconstruct_entries_from_blobs(blobs).unwrap(), entries);
assert_eq!(reconstruct_entries_from_blobs(blobs).unwrap().0, entries);
});
}

View File

@ -2,10 +2,11 @@
//! unique ID that is the hash of the Entry before it, plus the hash of the
//! transactions within it. Entries cannot be reordered, and its field `num_hashes`
//! represents an approximate amount of time since the last Entry was created.
use bincode::{serialize_into, serialized_size};
use bincode::{deserialize, serialize_into, serialized_size};
use hash::Hash;
use packet::{SharedBlob, BLOB_DATA_SIZE};
use poh::Poh;
use result::Result;
use solana_sdk::pubkey::Pubkey;
use std::io::Cursor;
use std::mem::size_of;
@ -249,6 +250,25 @@ pub fn next_entry(prev_id: &Hash, num_hashes: u64, transactions: Vec<Transaction
}
}
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<(Vec<Entry>, u64)> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
let mut num_ticks = 0;
for blob in blobs {
let entry: Entry = {
let msg = blob.read().unwrap();
let msg_size = msg.size()?;
deserialize(&msg.data()[..msg_size]).expect("Error reconstructing entry")
};
if entry.is_tick() {
num_ticks += 1
}
entries.push(entry)
}
Ok((entries, num_ticks))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -140,7 +140,7 @@ impl Fullnode {
info!("creating bank...");
let (bank, entry_height, last_id) =
let (bank, entry_height, last_entry_id) =
Self::new_bank_from_ledger(ledger_path, leader_scheduler);
info!("creating networking stack...");
@ -161,7 +161,7 @@ impl Fullnode {
vote_account_keypair,
bank,
entry_height,
&last_id,
&last_entry_id,
node,
leader_info.as_ref(),
ledger_path,
@ -191,7 +191,7 @@ impl Fullnode {
vote_account_keypair: Arc<Keypair>,
bank: Bank,
entry_height: u64,
last_id: &Hash,
last_entry_id: &Hash,
mut node: Node,
bootstrap_leader_info_option: Option<&NodeInfo>,
ledger_path: &str,
@ -249,6 +249,7 @@ impl Fullnode {
vote_account_keypair.clone(),
&bank,
entry_height,
*last_entry_id,
cluster_info.clone(),
shared_window.clone(),
node.sockets
@ -285,7 +286,7 @@ impl Fullnode {
ledger_path,
sigverify_disabled,
max_tick_height,
last_id,
last_entry_id,
);
let broadcast_stage = BroadcastStage::new(
@ -395,6 +396,7 @@ impl Fullnode {
self.vote_account_keypair.clone(),
&self.bank,
entry_height,
last_entry_id,
self.cluster_info.clone(),
self.shared_window.clone(),
self.replicate_socket
@ -521,11 +523,11 @@ impl Fullnode {
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
let (entry_height, last_id) = bank.process_ledger(entries).expect("process_ledger");
let (entry_height, last_entry_id) = bank.process_ledger(entries).expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
(bank, entry_height, last_id)
(bank, entry_height, last_entry_id)
}
pub fn get_leader_scheduler(&self) -> &Arc<RwLock<LeaderScheduler>> {

View File

@ -2,7 +2,7 @@
//! Proof of History ledger as well as iterative read, append write, and random
//! access read to a persistent file-based ledger.
use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size};
use bincode::{self, deserialize_from, serialize_into, serialized_size};
#[cfg(test)]
use budget_transaction::BudgetTransaction;
#[cfg(test)]
@ -15,7 +15,6 @@ use log::Level::Trace;
use mint::Mint;
use packet::{SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*;
use result::{Error, Result};
use signature::{Keypair, KeypairUtil};
use solana_sdk::pubkey::Pubkey;
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
@ -502,28 +501,6 @@ impl Block for [Entry] {
}
}
// TODO: move this to the right file, entry.rs?
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<Vec<Entry>> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
for blob in blobs {
let entry = {
let blob = blob.read().unwrap();
let blob_size = blob.size()?;
deserialize(&blob.data()[..blob_size])
};
match entry {
Ok(entry) => entries.push(entry),
Err(err) => {
trace!("reconstruct_entry_from_blobs: {:?}", err);
return Err(Error::Serialize(err));
}
}
}
Ok(entries)
}
/// Creates the next entries for given transactions, outputs
/// updates start_hash to id of last Entry, sets num_hashes to 0
pub fn next_entries_mut(
@ -692,9 +669,9 @@ pub fn make_tiny_test_entries(num: usize) -> Vec<Entry> {
#[cfg(test)]
mod tests {
use super::*;
use bincode::serialized_size;
use bincode::{deserialize, serialized_size};
use budget_transaction::BudgetTransaction;
use entry::{next_entry, Entry};
use entry::{next_entry, reconstruct_entries_from_blobs, Entry};
use hash::hash;
use packet::{to_blobs, BLOB_DATA_SIZE, PACKET_DATA_SIZE};
use signature::{Keypair, KeypairUtil};
@ -755,7 +732,7 @@ mod tests {
let blob_q = entries.to_blobs();
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap().0, entries);
}
#[test]

View File

@ -152,6 +152,8 @@ impl Default for Blob {
pub enum BlobError {
/// the Blob's meta and data are not self-consistent
BadState,
/// Blob verification failed
VerificationFailed,
}
impl Packets {

View File

@ -6,8 +6,10 @@ use counter::Counter;
use entry::{EntryReceiver, EntrySender};
use hash::Hash;
use influx_db_client as influxdb;
use ledger::Block;
use log::Level;
use metrics;
use packet::BlobError;
use result::{Error, Result};
use service::Service;
use signature::{Keypair, KeypairUtil};
@ -20,6 +22,7 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;
use streamer::{responder, BlobSender};
use timing::duration_as_ms;
use vote_stage::send_validator_vote;
#[derive(Debug, PartialEq, Eq, Clone)]
@ -61,7 +64,7 @@ impl ReplicateStage {
vote_blob_sender: Option<&BlobSender>,
ledger_entry_sender: &EntrySender,
entry_height: &mut u64,
last_entry_id: &mut Option<Hash>,
last_entry_id: &mut Hash,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available entries into a single vote
@ -78,6 +81,15 @@ impl ReplicateStage {
let mut res = Ok(());
let mut num_entries_to_write = entries.len();
let now = Instant::now();
if !entries.as_slice().verify(last_entry_id) {
inc_new_counter_info!("replicate_stage-verify-fail", entries.len());
return Err(Error::BlobError(BlobError::VerificationFailed));
}
inc_new_counter_info!(
"replicate_stage-verify-duration",
duration_as_ms(&now.elapsed()) as usize
);
let (current_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader id should never be unknown while processing entries");
@ -110,12 +122,10 @@ impl ReplicateStage {
// If leader rotation happened, only write the entries up to leader rotation.
entries.truncate(num_entries_to_write);
*last_entry_id = Some(
entries
.last()
.expect("Entries cannot be empty at this point")
.id,
);
*last_entry_id = entries
.last()
.expect("Entries cannot be empty at this point")
.id;
inc_new_counter_info!(
"replicate-transactions",
@ -148,6 +158,7 @@ impl ReplicateStage {
window_receiver: EntryReceiver,
exit: Arc<AtomicBool>,
entry_height: u64,
last_entry_id: Hash,
) -> (Self, EntryReceiver) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let (ledger_entry_sender, ledger_entry_receiver) = channel();
@ -163,7 +174,7 @@ impl ReplicateStage {
let now = Instant::now();
let mut next_vote_secs = 1;
let mut entry_height_ = entry_height;
let mut last_entry_id = None;
let mut last_entry_id = last_entry_id;
loop {
let (leader_id, _) = bank
.get_current_leader()
@ -177,7 +188,7 @@ impl ReplicateStage {
// rotation (Fullnode should automatically transition on startup if it detects
// are no longer a validator. Hence we can assume that some entry must have
// triggered leader rotation
last_entry_id.expect("Must exist an entry that triggered rotation"),
last_entry_id,
));
}
@ -307,7 +318,8 @@ mod test {
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Set up the bank
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
let (bank, _, last_entry_id) =
Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
// Set up the replicate stage
let (entry_sender, entry_receiver) = channel();
@ -320,6 +332,7 @@ mod test {
entry_receiver,
exit.clone(),
initial_entry_len,
last_entry_id,
);
// Send enough ticks to trigger leader rotation
@ -392,7 +405,8 @@ mod test {
let initial_entry_len = genesis_entries.len();
// Set up the bank
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
let (bank, _, last_entry_id) =
Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(
@ -412,6 +426,7 @@ mod test {
entry_receiver,
exit.clone(),
initial_entry_len as u64,
last_entry_id,
);
// Vote sender should error because no leader contact info is found in the
@ -504,7 +519,8 @@ mod test {
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Set up the bank
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
let (bank, _, last_entry_id) =
Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(
@ -524,6 +540,7 @@ mod test {
entry_receiver,
exit.clone(),
initial_entry_len as u64,
last_entry_id,
);
// Vote sender should error because no leader contact info is found in the

View File

@ -60,6 +60,7 @@ impl Tvu {
vote_account_keypair: Arc<Keypair>,
bank: &Arc<Bank>,
entry_height: u64,
last_entry_id: Hash,
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
replicate_sockets: Vec<UdpSocket>,
@ -97,6 +98,7 @@ impl Tvu {
blob_window_receiver,
exit.clone(),
entry_height,
last_entry_id,
);
let (ledger_write_stage, storage_entry_receiver) =
@ -165,7 +167,7 @@ pub mod tests {
use bincode::serialize;
use cluster_info::{ClusterInfo, Node};
use entry::Entry;
use hash::{hash, Hash};
use hash::Hash;
use leader_scheduler::LeaderScheduler;
use logger;
use mint::Mint;
@ -258,11 +260,13 @@ pub mod tests {
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone());
let vote_account_keypair = Arc::new(Keypair::new());
let mut cur_hash = Hash::default();
let tvu = Tvu::new(
Arc::new(target1_keypair),
vote_account_keypair,
&bank,
0,
cur_hash,
cref1,
dr_1.1,
target1.sockets.replicate,
@ -273,15 +277,16 @@ pub mod tests {
let mut alice_ref_balance = starting_balance;
let mut msgs = Vec::new();
let mut cur_hash = Hash::default();
let mut blob_idx = 0;
let num_transfers = 10;
let transfer_amount = 501;
let bob_keypair = Keypair::new();
for i in 0..num_transfers {
let entry0 = Entry::new(&cur_hash, i, vec![]);
cur_hash = entry0.id;
bank.register_tick(&cur_hash);
cur_hash = hash(&cur_hash.as_ref());
let entry_tick0 = Entry::new(&cur_hash, i + 1, vec![]);
cur_hash = entry_tick0.id;
let tx0 = Transaction::system_new(
&mint.keypair(),
@ -290,14 +295,16 @@ pub mod tests {
cur_hash,
);
bank.register_tick(&cur_hash);
cur_hash = hash(&cur_hash.as_ref());
let entry_tick1 = Entry::new(&cur_hash, i + 1, vec![]);
cur_hash = entry_tick1.id;
let entry1 = Entry::new(&cur_hash, i + num_transfers, vec![tx0]);
bank.register_tick(&cur_hash);
cur_hash = hash(&cur_hash.as_ref());
bank.register_tick(&entry1.id);
let entry_tick2 = Entry::new(&entry1.id, i + 1, vec![]);
cur_hash = entry_tick2.id;
alice_ref_balance -= transfer_amount;
for entry in vec![entry0, entry1] {
for entry in vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2] {
let mut b = SharedBlob::default();
{
let mut w = b.write().unwrap();

View File

@ -2,11 +2,11 @@
//!
use cluster_info::ClusterInfo;
use counter::Counter;
use entry::reconstruct_entries_from_blobs;
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use leader_scheduler::LeaderScheduler;
use ledger::reconstruct_entries_from_blobs;
use log::Level;
use packet::SharedBlob;
use solana_sdk::pubkey::Pubkey;
@ -352,11 +352,9 @@ impl WindowUtil for Window {
// Check that we can get the entries from this blob
match reconstruct_entries_from_blobs(vec![k_data_blob]) {
Ok(entries) => {
for entry in &entries {
*tick_height += entry.is_tick() as u64;
}
Ok((entries, num_ticks)) => {
consume_queue.extend(entries);
*tick_height += num_ticks;
}
Err(_) => {
// If the blob can't be deserialized, then remove it from the