From 8c1b9a0b67d417165824c7359429e6c7e0807d18 Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 12 Jun 2019 16:43:05 -0700 Subject: [PATCH] Data plane verification (#4639) * Add signature to blob * Change Signable trait to support returning references to signable data * Add signing to broadcast * Verify signatures in window_service * Add testing for signatures to erasure * Add RPC for getting current slot, consume RPC call in test_repairman_catchup for more deterministic results --- client/src/mock_rpc_client_request.rs | 1 + client/src/rpc_client.rs | 19 +++++++++ client/src/rpc_request.rs | 6 +++ client/src/thin_client.rs | 10 +++++ core/src/broadcast_stage.rs | 22 +++++++++- core/src/chacha.rs | 2 +- core/src/cluster.rs | 2 + core/src/cluster_info.rs | 5 ++- core/src/contact_info.rs | 5 ++- core/src/crds_value.rs | 11 ++--- core/src/entry.rs | 2 +- core/src/erasure.rs | 2 + core/src/local_cluster.rs | 11 +++++ core/src/packet.rs | 54 ++++++++++++++++++------ core/src/rpc.rs | 11 +++++ core/src/window_service.rs | 60 ++++++++++++++++++++------- core/tests/local_cluster.rs | 42 +++++++------------ core/tests/tvu.rs | 15 +++++-- install/src/update_manifest.rs | 5 ++- runtime/src/bank_client.rs | 8 +++- sdk/src/client.rs | 7 +++- sdk/src/signature.rs | 10 +++-- 22 files changed, 229 insertions(+), 81 deletions(-) diff --git a/client/src/mock_rpc_client_request.rs b/client/src/mock_rpc_client_request.rs index df9ef0b20c..6b3eb2e86e 100644 --- a/client/src/mock_rpc_client_request.rs +++ b/client/src/mock_rpc_client_request.rs @@ -60,6 +60,7 @@ impl GenericRpcClientRequest for MockRpcClientRequest { serde_json::to_value(response).unwrap() } RpcRequest::GetTransactionCount => Value::Number(Number::from(1234)), + RpcRequest::GetSlot => Value::Number(Number::from(0)), RpcRequest::SendTransaction => Value::String(SIGNATURE.to_string()), _ => Value::Null, }; diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index f1646638c8..ee1aa73ea9 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -75,6 +75,25 @@ impl RpcClient { Ok(result) } + pub fn get_slot(&self) -> io::Result { + let response = self + .client + .send(&RpcRequest::GetSlot, None, 0) + .map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("GetSlot request failure: {:?}", err), + ) + })?; + + serde_json::from_value(response).map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("GetSlot parse failure: {}", err), + ) + }) + } + pub fn send_and_confirm_transaction( &self, transaction: &mut Transaction, diff --git a/client/src/rpc_request.rs b/client/src/rpc_request.rs index 0c9850927a..321858c1bc 100644 --- a/client/src/rpc_request.rs +++ b/client/src/rpc_request.rs @@ -12,6 +12,7 @@ pub enum RpcRequest { GetNumBlocksSinceSignatureConfirmation, GetRecentBlockhash, GetSignatureStatus, + GetSlot, GetSlotLeader, GetEpochVoteAccounts, GetStorageBlockhash, @@ -39,6 +40,7 @@ impl RpcRequest { } RpcRequest::GetRecentBlockhash => "getRecentBlockhash", RpcRequest::GetSignatureStatus => "getSignatureStatus", + RpcRequest::GetSlot => "getSlot", RpcRequest::GetSlotLeader => "getSlotLeader", RpcRequest::GetEpochVoteAccounts => "getEpochVoteAccounts", RpcRequest::GetStorageBlockhash => "getStorageBlockhash", @@ -104,6 +106,10 @@ mod tests { let request = test_request.build_request_json(1, None); assert_eq!(request["method"], "getRecentBlockhash"); + let test_request = RpcRequest::GetSlot; + let request = test_request.build_request_json(1, None); + assert_eq!(request["method"], "getSlot"); + let test_request = RpcRequest::GetTransactionCount; let request = test_request.build_request_json(1, None); assert_eq!(request["method"], "getTransactionCount"); diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 3602500217..8f281b6630 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -327,6 +327,16 @@ impl SyncClient for ThinClient { Ok(status) } + fn get_slot(&self) -> TransportResult { + let slot = self.rpc_client().get_slot().map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("send_transaction failed with error {:?}", err), + ) + })?; + Ok(slot) + } + fn get_recent_blockhash(&self) -> TransportResult<(Hash, FeeCalculator)> { let index = self.optimizer.experiment(); let now = Instant::now(); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index dd763dbbb0..42eb9b70e3 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -4,7 +4,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError}; use crate::entry::EntrySlice; use crate::erasure::CodingGenerator; -use crate::packet::index_blobs_with_genesis; +use crate::packet::index_blobs; use crate::poh_recorder::WorkingBankEntries; use crate::result::{Error, Result}; use crate::service::Service; @@ -15,6 +15,7 @@ use solana_metrics::{ datapoint, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info, }; use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signable; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -104,7 +105,7 @@ impl Broadcast { .map(|meta| meta.consumed) .unwrap_or(0); - index_blobs_with_genesis( + index_blobs( &blobs, &self.id, blob_index, @@ -116,10 +117,27 @@ impl Broadcast { blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); } + // Make sure not to modify the blob header or data after signing it here + self.thread_pool.install(|| { + blobs.par_iter().for_each(|b| { + b.write() + .unwrap() + .sign(&cluster_info.read().unwrap().keypair); + }) + }); + blocktree.write_shared_blobs(&blobs)?; let coding = self.coding_generator.next(&blobs); + self.thread_pool.install(|| { + coding.par_iter().for_each(|c| { + c.write() + .unwrap() + .sign(&cluster_info.read().unwrap().keypair); + }) + }); + let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); let broadcast_start = Instant::now(); diff --git a/core/src/chacha.rs b/core/src/chacha.rs index 7bb957ef8d..6ad2a948d1 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -133,7 +133,7 @@ mod tests { hasher.hash(&buf[..size]); // golden needs to be updated if blob stuff changes.... - let golden: Hash = "HZJWPVZcLtdQg34ov1vq9fjeqbgagHyhn4weLcvFsFnY" + let golden: Hash = "E2HZjSC6VgH4nmEiTbMDATTeBcFjwSYz7QYvU7doGNhD" .parse() .unwrap(); diff --git a/core/src/cluster.rs b/core/src/cluster.rs index ed587e8672..7d0ff34117 100644 --- a/core/src/cluster.rs +++ b/core/src/cluster.rs @@ -1,6 +1,8 @@ +use solana_client::thin_client::ThinClient; use solana_sdk::pubkey::Pubkey; pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; + fn get_validator_client(&self, pubkey: &Pubkey) -> Option; fn restart_node(&mut self, pubkey: Pubkey); } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 51d5bf1a31..088f3d7d1c 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -44,6 +44,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; +use std::borrow::Cow; use std::cmp::min; use std::collections::{BTreeSet, HashMap}; use std::fmt; @@ -126,7 +127,7 @@ impl Signable for PruneData { self.pubkey } - fn signable_data(&self) -> Vec { + fn signable_data(&self) -> Cow<[u8]> { #[derive(Serialize)] struct SignData { pubkey: Pubkey, @@ -140,7 +141,7 @@ impl Signable for PruneData { destination: self.destination, wallclock: self.wallclock, }; - serialize(&data).expect("serialize PruneData") + Cow::Owned(serialize(&data).expect("serialize PruneData")) } fn get_signature(&self) -> Signature { diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index c11fa6e43b..361a5cb40e 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -6,6 +6,7 @@ use solana_sdk::rpc_port; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Signable, Signature}; use solana_sdk::timing::timestamp; +use std::borrow::Cow; use std::cmp::{Ord, Ordering, PartialEq, PartialOrd}; use std::net::{IpAddr, SocketAddr}; @@ -225,7 +226,7 @@ impl Signable for ContactInfo { self.id } - fn signable_data(&self) -> Vec { + fn signable_data(&self) -> Cow<[u8]> { #[derive(Serialize)] struct SignData { id: Pubkey, @@ -251,7 +252,7 @@ impl Signable for ContactInfo { rpc_pubsub: me.rpc_pubsub, wallclock: me.wallclock, }; - serialize(&data).expect("failed to serialize ContactInfo") + Cow::Owned(serialize(&data).expect("failed to serialize ContactInfo")) } fn get_signature(&self) -> Signature { diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index 0fa14cc56c..eda1242cd4 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -3,6 +3,7 @@ use bincode::serialize; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::transaction::Transaction; +use std::borrow::Cow; use std::collections::BTreeSet; use std::fmt; @@ -43,7 +44,7 @@ impl Signable for EpochSlots { self.from } - fn signable_data(&self) -> Vec { + fn signable_data(&self) -> Cow<[u8]> { #[derive(Serialize)] struct SignData<'a> { root: u64, @@ -55,7 +56,7 @@ impl Signable for EpochSlots { slots: &self.slots, wallclock: self.wallclock, }; - serialize(&data).expect("unable to serialize EpochSlots") + Cow::Owned(serialize(&data).expect("unable to serialize EpochSlots")) } fn get_signature(&self) -> Signature { @@ -91,7 +92,7 @@ impl Signable for Vote { self.from } - fn signable_data(&self) -> Vec { + fn signable_data(&self) -> Cow<[u8]> { #[derive(Serialize)] struct SignData<'a> { transaction: &'a Transaction, @@ -101,7 +102,7 @@ impl Signable for Vote { transaction: &self.transaction, wallclock: self.wallclock, }; - serialize(&data).expect("unable to serialize Vote") + Cow::Owned(serialize(&data).expect("unable to serialize Vote")) } fn get_signature(&self) -> Signature { @@ -215,7 +216,7 @@ impl Signable for CrdsValue { } } - fn signable_data(&self) -> Vec { + fn signable_data(&self) -> Cow<[u8]> { unimplemented!() } diff --git a/core/src/entry.rs b/core/src/entry.rs index 76349bb592..29e1f9c9e2 100644 --- a/core/src/entry.rs +++ b/core/src/entry.rs @@ -41,7 +41,7 @@ pub type EntryReceiver = Receiver>; /// a Verifiable Delay Function (VDF) and a Proof of Work (not to be confused with Proof of /// Work consensus!) -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Entry { /// The number of hashes since the previous Entry ID. pub num_hashes: u64, diff --git a/core/src/erasure.rs b/core/src/erasure.rs index baa963b69d..99784fa053 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -281,6 +281,7 @@ pub mod test { use crate::blocktree::Blocktree; use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE}; use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::Signable; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::borrow::Borrow; @@ -739,6 +740,7 @@ pub mod test { let mut blob = Blob::default(); blob.data_mut()[..data.len()].copy_from_slice(&data); blob.set_size(data.len()); + blob.sign(&Keypair::new()); Arc::new(RwLock::new(blob)) }) .collect(); diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index c72468ea7c..9f6d3ec6ec 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -34,6 +34,7 @@ pub struct ValidatorInfo { pub voting_keypair: Arc, pub storage_keypair: Arc, pub ledger_path: String, + pub contact_info: ContactInfo, } pub struct ReplicatorInfo { @@ -173,6 +174,7 @@ impl LocalCluster { voting_keypair: leader_voting_keypair, storage_keypair: leader_storage_keypair, ledger_path: leader_ledger_path, + contact_info: leader_contact_info.clone(), }, ); @@ -247,6 +249,7 @@ impl LocalCluster { let storage_keypair = Arc::new(Keypair::new()); let validator_pubkey = validator_keypair.pubkey(); let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey()); + let contact_info = validator_node.info.clone(); let ledger_path = tmp_copy_blocktree!(&self.genesis_ledger_path); if validator_config.voting_disabled { @@ -299,6 +302,7 @@ impl LocalCluster { voting_keypair, storage_keypair, ledger_path, + contact_info, }, ); } else { @@ -309,6 +313,7 @@ impl LocalCluster { voting_keypair, storage_keypair, ledger_path, + contact_info, }, ); } @@ -513,6 +518,12 @@ impl Cluster for LocalCluster { self.fullnodes.keys().cloned().collect() } + fn get_validator_client(&self, pubkey: &Pubkey) -> Option { + self.fullnode_infos + .get(pubkey) + .map(|f| create_client(f.contact_info.client_facing_addr(), FULLNODE_PORT_RANGE)) + } + fn restart_node(&mut self, pubkey: Pubkey) { // Shut down the fullnode let node = self.fullnodes.remove(&pubkey).unwrap(); diff --git a/core/src/packet.rs b/core/src/packet.rs index ba58d2a4e9..d61c9bff91 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -7,7 +7,10 @@ use serde::Serialize; use solana_metrics::inc_new_counter_debug; pub use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signable; +use solana_sdk::signature::Signature; use std::borrow::Borrow; +use std::borrow::Cow; use std::cmp; use std::fmt; use std::io; @@ -335,12 +338,13 @@ macro_rules! range { }; } -const PARENT_RANGE: std::ops::Range = range!(0, u64); +const SIGNATURE_RANGE: std::ops::Range = range!(0, Signature); +const FORWARDED_RANGE: std::ops::Range = range!(SIGNATURE_RANGE.end, bool); +const PARENT_RANGE: std::ops::Range = range!(FORWARDED_RANGE.end, u64); const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); -const FORWARDED_RANGE: std::ops::Range = range!(ID_RANGE.end, bool); -const FLAGS_RANGE: std::ops::Range = range!(FORWARDED_RANGE.end, u32); +const FLAGS_RANGE: std::ops::Range = range!(ID_RANGE.end, u32); const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); macro_rules! align { @@ -350,6 +354,7 @@ macro_rules! align { } pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure +pub const SIGNABLE_START: usize = PARENT_RANGE.start; pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; @@ -593,21 +598,29 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: u64, slot: u64, parent: u64) { - index_blobs_with_genesis(blobs, id, blob_index, slot, parent) +impl Signable for Blob { + fn pubkey(&self) -> Pubkey { + self.id() + } + + fn signable_data(&self) -> Cow<[u8]> { + let end = cmp::max(SIGNABLE_START, self.data_size() as usize); + Cow::Borrowed(&self.data[SIGNABLE_START..end]) + } + + fn get_signature(&self) -> Signature { + Signature::new(&self.data[SIGNATURE_RANGE]) + } + + fn set_signature(&mut self, signature: Signature) { + self.data[SIGNATURE_RANGE].copy_from_slice(signature.as_ref()) + } } -pub fn index_blobs_with_genesis( - blobs: &[SharedBlob], - id: &Pubkey, - mut blob_index: u64, - slot: u64, - parent: u64, -) { +pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) { // enumerate all the blobs, those are the indices for blob in blobs.iter() { let mut blob = blob.write().unwrap(); - blob.set_index(blob_index); blob.set_slot(slot); blob.set_parent(parent); @@ -828,4 +841,19 @@ mod tests { p2.data[1] = 4; assert!(p1 != p2); } + + #[test] + fn test_sign_blob() { + let mut b = Blob::default(); + let k = Keypair::new(); + let p = k.pubkey(); + b.set_id(&p); + b.sign(&k); + assert!(b.verify()); + + // Set a bigger chunk of data to sign + b.set_size(80); + b.sign(&k); + assert!(b.verify()); + } } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 9b0592b8d2..fc0440d782 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -96,6 +96,10 @@ impl JsonRpcRequestProcessor { self.bank().get_signature_confirmation_status(&signature) } + fn get_slot(&self) -> Result { + Ok(self.bank().slot()) + } + fn get_transaction_count(&self) -> Result { Ok(self.bank().transaction_count() as u64) } @@ -218,6 +222,9 @@ pub trait RpcSol { _: String, ) -> Result>>; + #[rpc(meta, name = "getSlot")] + fn get_slot(&self, _: Self::Metadata) -> Result; + #[rpc(meta, name = "getTransactionCount")] fn get_transaction_count(&self, _: Self::Metadata) -> Result; @@ -334,6 +341,10 @@ impl RpcSol for RpcSolImpl { .map(|res| res.map(|x| x.1)) } + fn get_slot(&self, meta: Self::Metadata) -> Result { + meta.request_processor.read().unwrap().get_slot() + } + fn get_num_blocks_since_signature_confirmation( &self, meta: Self::Metadata, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 78fcb80310..657bdee006 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -9,9 +9,12 @@ use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{BlobReceiver, BlobSender}; +use rayon::prelude::*; +use rayon::ThreadPool; use solana_metrics::{inc_new_counter_debug, inc_new_counter_error}; use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::Signable; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -20,6 +23,8 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; +pub const NUM_THREADS: u32 = 10; + fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> { let mut retransmit_queue: Vec = Vec::new(); for blob in blobs { @@ -86,7 +91,10 @@ pub fn should_retransmit_and_persist( Some(bank) => leader_schedule_cache.slot_leader_at(blob.slot(), Some(&bank)), }; - if blob.id() == *my_pubkey { + if !blob.verify() { + inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); + false + } else if blob.id() == *my_pubkey { inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1); false } else if slot_leader_pubkey == None { @@ -108,9 +116,11 @@ fn recv_window( r: &BlobReceiver, retransmit: &BlobSender, blob_filter: F, + thread_pool: &ThreadPool, ) -> Result<()> where F: Fn(&Blob) -> bool, + F: Sync, { let timer = Duration::from_millis(200); let mut blobs = r.recv_timeout(timer)?; @@ -121,7 +131,12 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", blobs.len(), 0, 1000); - blobs.retain(|blob| blob_filter(&blob.read().unwrap())); + let blobs: Vec<_> = thread_pool.install(|| { + blobs + .into_par_iter() + .filter(|b| blob_filter(&b.read().unwrap())) + .collect() + }); retransmit_blobs(&blobs, retransmit, my_pubkey)?; @@ -200,20 +215,31 @@ impl WindowService { let _exit = Finalizer::new(exit.clone()); let id = cluster_info.read().unwrap().id(); trace!("{}: RECV_WINDOW started", id); + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .build() + .unwrap(); loop { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit, |blob| { - blob_filter( - &id, - blob, - bank_forks - .as_ref() - .map(|bank_forks| bank_forks.read().unwrap().working_bank()), - ) - }) { + if let Err(e) = recv_window( + &blocktree, + &id, + &r, + &retransmit, + |blob| { + blob_filter( + &id, + blob, + bank_forks + .as_ref() + .map(|bank_forks| bank_forks.read().unwrap().working_bank()), + ) + }, + &thread_pool, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -249,13 +275,14 @@ mod test { use crate::bank_forks::BankForks; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::cluster_info::{ClusterInfo, Node}; - use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, EntrySlice}; + use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry, EntrySlice}; use crate::genesis_utils::create_genesis_block_with_leader; - use crate::packet::{index_blobs, Blob}; + use crate::packet::index_blobs; use crate::service::Service; use crate::streamer::{blob_receiver, responder}; use solana_runtime::epoch_schedule::MINIMUM_SLOT_LENGTH; use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; @@ -289,14 +316,17 @@ mod test { #[test] fn test_should_retransmit_and_persist() { let me_id = Pubkey::new_rand(); - let leader_pubkey = Pubkey::new_rand(); + let leader_keypair = Keypair::new(); + let leader_pubkey = leader_keypair.pubkey(); let bank = Arc::new(Bank::new( &create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block, )); let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let mut blob = Blob::default(); + let entry = Entry::default(); + let mut blob = entry.to_blob(); blob.set_id(&leader_pubkey); + blob.sign(&leader_keypair); // without a Bank and blobs not from me, blob gets thrown out assert_eq!( diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index e7b5759793..68bc328cb0 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -1,6 +1,5 @@ extern crate solana; -use crate::solana::blocktree::Blocktree; use hashbrown::HashSet; use log::*; use solana::cluster::Cluster; @@ -9,8 +8,10 @@ use solana::gossip_service::discover_cluster; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::validator::ValidatorConfig; use solana_runtime::epoch_schedule::{EpochSchedule, MINIMUM_SLOT_LENGTH}; +use solana_sdk::client::SyncClient; use solana_sdk::poh_config::PohConfig; use solana_sdk::timing; +use std::thread::sleep; use std::time::Duration; #[test] @@ -265,11 +266,11 @@ fn run_repairman_catchup(num_repairmen: u64) { let repairman_pubkeys: HashSet<_> = cluster.get_node_pubkeys().into_iter().collect(); let epoch_schedule = EpochSchedule::new(num_slots_per_epoch, stakers_slot_offset, true); - let num_warmup_epochs = (epoch_schedule.get_stakers_epoch(0) + 1) as f64; + let num_warmup_epochs = epoch_schedule.get_stakers_epoch(0) + 1; // Sleep for longer than the first N warmup epochs, with a one epoch buffer for timing issues cluster_tests::sleep_n_epochs( - num_warmup_epochs + 1.0, + num_warmup_epochs as f64 + 1.0, &cluster.genesis_block.poh_config, num_ticks_per_slot, num_slots_per_epoch, @@ -278,7 +279,6 @@ fn run_repairman_catchup(num_repairmen: u64) { // Start up a new node, wait for catchup. Backwards repair won't be sufficient because the // leader is sending blobs past this validator's first two confirmed epochs. Thus, the repairman // protocol will have to kick in for this validator to repair. - cluster.add_validator(&validator_config, repairee_stake); let all_pubkeys = cluster.get_node_pubkeys(); @@ -288,28 +288,18 @@ fn run_repairman_catchup(num_repairmen: u64) { .unwrap(); // Wait for repairman protocol to catch this validator up - cluster_tests::sleep_n_epochs( - num_warmup_epochs + 1.0, - &cluster.genesis_block.poh_config, - num_ticks_per_slot, - num_slots_per_epoch, - ); + let repairee_client = cluster.get_validator_client(&repairee_id).unwrap(); + let mut current_slot = 0; - cluster.close_preserve_ledgers(); - let validator_ledger_path = cluster.fullnode_infos[&repairee_id].ledger_path.clone(); - - // Expect at least the the first two epochs to have been rooted after waiting 3 epochs. - let num_expected_slots = num_slots_per_epoch * 2; - let validator_ledger = Blocktree::open(&validator_ledger_path).unwrap(); - let validator_rooted_slots: Vec<_> = - validator_ledger.rooted_slot_iterator(0).unwrap().collect(); - - if validator_rooted_slots.len() as u64 <= num_expected_slots { - error!( - "Num expected slots: {}, number of rooted slots: {}", - num_expected_slots, - validator_rooted_slots.len() - ); + // Make sure this validator can get repaired past the first few warmup epochs + let target_slot = (num_warmup_epochs) * num_slots_per_epoch + 1; + while current_slot <= target_slot { + trace!("current_slot: {}", current_slot); + if let Ok(slot) = repairee_client.get_slot() { + current_slot = slot; + } else { + continue; + } + sleep(Duration::from_secs(1)); } - assert!(validator_rooted_slots.len() as u64 > num_expected_slots); } diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index 1cbca862c6..67dd27e8c2 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -18,6 +18,7 @@ use solana::streamer; use solana::tvu::{Sockets, Tvu}; use solana::validator; use solana_runtime::epoch_schedule::MINIMUM_SLOT_LENGTH; +use solana_sdk::signature::Signable; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction; use std::fs::remove_dir_all; @@ -39,9 +40,12 @@ fn new_gossip( #[test] fn test_replay() { solana_logger::setup(); - let leader = Node::new_localhost(); + let leader_keypair = Keypair::new(); + let leader = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); + let target1_keypair = Keypair::new(); let target1 = Node::new_localhost_with_pubkey(&target1_keypair.pubkey()); + let target2 = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); @@ -168,9 +172,12 @@ fn test_replay() { let blobs = entries.to_shared_blobs(); index_blobs(&blobs, &leader.info.id, blob_idx, 1, 0); blob_idx += blobs.len() as u64; - blobs - .iter() - .for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr)); + blobs.iter().for_each(|b| { + let mut b_w = b.write().unwrap(); + b_w.set_id(&leader_keypair.pubkey()); + b_w.meta.set_addr(&tvu_addr); + b_w.sign(&leader_keypair); + }); msgs.extend(blobs.into_iter()); } diff --git a/install/src/update_manifest.rs b/install/src/update_manifest.rs index eda2af2537..12666b89c6 100644 --- a/install/src/update_manifest.rs +++ b/install/src/update_manifest.rs @@ -2,6 +2,7 @@ use serde_derive::{Deserialize, Serialize}; use solana_config_api::ConfigState; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Signable, Signature}; +use std::borrow::Cow; use std::error; use std::io; @@ -27,8 +28,8 @@ impl Signable for SignedUpdateManifest { self.account_pubkey } - fn signable_data(&self) -> Vec { - bincode::serialize(&self.manifest).expect("serialize") + fn signable_data(&self) -> Cow<[u8]> { + Cow::Owned(bincode::serialize(&self.manifest).expect("serialize")) } fn get_signature(&self) -> Signature { self.manifest_signature diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index b5f9a04861..e24a8d25da 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -99,6 +99,10 @@ impl SyncClient for BankClient { Ok(self.bank.get_balance(pubkey)) } + fn get_recent_blockhash(&self) -> Result<(Hash, FeeCalculator)> { + Ok(self.bank.last_blockhash_with_fee_calculator()) + } + fn get_signature_status( &self, signature: &Signature, @@ -106,8 +110,8 @@ impl SyncClient for BankClient { Ok(self.bank.get_signature_status(signature)) } - fn get_recent_blockhash(&self) -> Result<(Hash, FeeCalculator)> { - Ok(self.bank.last_blockhash_with_fee_calculator()) + fn get_slot(&self) -> Result { + Ok(self.bank.slot()) } fn get_transaction_count(&self) -> Result { diff --git a/sdk/src/client.rs b/sdk/src/client.rs index b7aa3bf300..f2f34d590e 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -40,14 +40,17 @@ pub trait SyncClient { /// Get account balance or 0 if not found. fn get_balance(&self, pubkey: &Pubkey) -> Result; + /// Get recent blockhash + fn get_recent_blockhash(&self) -> Result<(Hash, FeeCalculator)>; + /// Get signature status. fn get_signature_status( &self, signature: &Signature, ) -> Result>>; - /// Get recent blockhash - fn get_recent_blockhash(&self) -> Result<(Hash, FeeCalculator)>; + /// Get last known slot + fn get_slot(&self) -> Result; /// Get transaction count fn get_transaction_count(&self) -> Result; diff --git a/sdk/src/signature.rs b/sdk/src/signature.rs index 7b73d4ec73..2c2833e0ec 100644 --- a/sdk/src/signature.rs +++ b/sdk/src/signature.rs @@ -7,6 +7,8 @@ use generic_array::GenericArray; use rand::rngs::OsRng; use serde_json; use solana_ed25519_dalek as ed25519_dalek; +use std::borrow::Borrow; +use std::borrow::Cow; use std::error; use std::fmt; use std::fs::{self, File}; @@ -40,16 +42,16 @@ impl Signature { pub trait Signable { fn sign(&mut self, keypair: &Keypair) { - let data = self.signable_data(); - self.set_signature(keypair.sign_message(&data)); + let signature = keypair.sign_message(self.signable_data().borrow()); + self.set_signature(signature); } fn verify(&self) -> bool { self.get_signature() - .verify(&self.pubkey().as_ref(), &self.signable_data()) + .verify(&self.pubkey().as_ref(), self.signable_data().borrow()) } fn pubkey(&self) -> Pubkey; - fn signable_data(&self) -> Vec; + fn signable_data(&self) -> Cow<[u8]>; fn get_signature(&self) -> Signature; fn set_signature(&mut self, signature: Signature); }