diff --git a/Cargo.toml b/Cargo.toml index 0d1f39f3b..ecfb321ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,6 @@ unstable = [] ipv6 = [] cuda = [] erasure = [] -sigverify_cpu_disable = [] [dependencies] rayon = "1.0.0" diff --git a/src/drone.rs b/src/drone.rs index f5099f46f..b3f8d345c 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -283,6 +283,7 @@ mod tests { leader, exit.clone(), sink(), + false, ); //TODO: this seems unstable sleep(Duration::from_millis(900)); diff --git a/src/fullnode.rs b/src/fullnode.rs index 15b6ea5d2..0ed72b956 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -58,12 +58,13 @@ impl Config { } impl FullNode { - pub fn new( + fn new_internal( mut node: TestNode, leader: bool, ledger: LedgerFile, keypair_for_validator: Option, network_entry_for_validator: Option, + sigverify_disabled: bool, ) -> FullNode { info!("creating bank..."); let bank = Bank::default(); @@ -112,6 +113,7 @@ impl FullNode { node, &network_entry_point, exit.clone(), + sigverify_disabled, ); info!( "validator ready... local request address: {} (advertising {}) connected to: {}", @@ -130,6 +132,7 @@ impl FullNode { node, exit.clone(), outfile, + sigverify_disabled, ); info!( "leader ready... local request address: {} (advertising {})", @@ -139,6 +142,40 @@ impl FullNode { } } + pub fn new( + node: TestNode, + leader: bool, + ledger: LedgerFile, + keypair_for_validator: Option, + network_entry_for_validator: Option, + ) -> FullNode { + FullNode::new_internal( + node, + leader, + ledger, + keypair_for_validator, + network_entry_for_validator, + false, + ) + } + + pub fn new_without_sigverify( + node: TestNode, + leader: bool, + ledger: LedgerFile, + keypair_for_validator: Option, + network_entry_for_validator: Option, + ) -> FullNode { + FullNode::new_internal( + node, + leader, + ledger, + keypair_for_validator, + network_entry_for_validator, + true, + ) + } + fn new_window( ledger_tail: Option>, entry_height: u64, @@ -191,6 +228,7 @@ impl FullNode { node: TestNode, exit: Arc, writer: W, + sigverify_disabled: bool, ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; @@ -214,6 +252,7 @@ impl FullNode { &blob_recycler, exit.clone(), writer, + sigverify_disabled, ); thread_hdls.extend(tpu.thread_hdls()); let ncp = Ncp::new( @@ -275,6 +314,7 @@ impl FullNode { node: TestNode, entry_point: &NodeInfo, exit: Arc, + _sigverify_disabled: bool, ) -> Self { let bank = Arc::new(bank); let mut thread_hdls = vec![]; @@ -359,7 +399,7 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit); + let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false); v.exit(); v.join().unwrap(); } @@ -373,7 +413,7 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit) + FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false) }) .collect(); //each validator can exit in parallel to speed many sequential calls to `join` diff --git a/src/nat.rs b/src/nat.rs old mode 100755 new mode 100644 diff --git a/src/sigverify.rs b/src/sigverify.rs old mode 100755 new mode 100644 index d6951f864..107eea547 --- a/src/sigverify.rs +++ b/src/sigverify.rs @@ -6,13 +6,8 @@ use counter::Counter; use packet::{Packet, SharedPackets}; - -#[cfg(not(feature = "sigverify_cpu_disable"))] use std::mem::size_of; - use std::sync::atomic::AtomicUsize; - -#[cfg(not(feature = "sigverify_cpu_disable"))] use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; pub const TX_OFFSET: usize = 0; @@ -47,7 +42,6 @@ pub fn init() { } #[cfg(not(feature = "cuda"))] -#[cfg(not(feature = "sigverify_cpu_disable"))] fn verify_packet(packet: &Packet) -> u8 { use ring::signature; use signature::{PublicKey, Signature}; @@ -72,10 +66,9 @@ fn verify_packet(packet: &Packet) -> u8 { ).is_ok() as u8 } -#[cfg(feature = "sigverify_cpu_disable")] -fn verify_packet(_packet: &Packet) -> u8 { +fn verify_packet_disabled(_packet: &Packet) -> u8 { warn!("signature verification is disabled"); - return 1; + 1 } fn batch_size(batches: &[SharedPackets]) -> usize { @@ -106,6 +99,26 @@ pub fn ed25519_verify(batches: &Vec) -> Vec> { rv } +#[cfg_attr(feature = "cargo-clippy", allow(ptr_arg))] +pub fn ed25519_verify_disabled(batches: &Vec) -> Vec> { + use rayon::prelude::*; + let count = batch_size(batches); + info!("CPU ECDSA for {}", batch_size(batches)); + let rv = batches + .into_par_iter() + .map(|p| { + p.read() + .expect("'p' read lock in ed25519_verify") + .packets + .par_iter() + .map(verify_packet_disabled) + .collect() + }) + .collect(); + inc_new_counter!("ed25519_verify", count); + rv +} + #[cfg(feature = "cuda")] pub fn init() { unsafe { diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index 7c82a61c3..e965a82b5 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -24,21 +24,30 @@ pub struct SigVerifyStage { } impl SigVerifyStage { - pub fn new(packet_receiver: Receiver) -> (Self, Receiver) { + pub fn new( + packet_receiver: Receiver, + sigverify_disabled: bool, + ) -> (Self, Receiver) { sigverify::init(); let (verified_sender, verified_receiver) = channel(); - let thread_hdls = Self::verifier_services(packet_receiver, verified_sender); + let thread_hdls = + Self::verifier_services(packet_receiver, verified_sender, sigverify_disabled); (SigVerifyStage { thread_hdls }, verified_receiver) } - fn verify_batch(batch: Vec) -> VerifiedPackets { - let r = sigverify::ed25519_verify(&batch); + fn verify_batch(batch: Vec, sigverify_disabled: bool) -> VerifiedPackets { + let r = if sigverify_disabled { + sigverify::ed25519_verify_disabled(&batch) + } else { + sigverify::ed25519_verify(&batch) + }; batch.into_iter().zip(r).collect() } fn verifier( recvr: &Arc>, sendr: &Arc>>, + sigverify_disabled: bool, ) -> Result<()> { let (batch, len) = streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; @@ -53,7 +62,7 @@ impl SigVerifyStage { rand_id ); - let verified_batch = Self::verify_batch(batch); + let verified_batch = Self::verify_batch(batch, sigverify_disabled); sendr .lock() .expect("lock in fn verify_batch in tpu") @@ -76,9 +85,10 @@ impl SigVerifyStage { fn verifier_service( packet_receiver: Arc>, verified_sender: Arc>>, + sigverify_disabled: bool, ) -> JoinHandle<()> { spawn(move || loop { - if let Err(e) = Self::verifier(&packet_receiver, &verified_sender) { + if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, sigverify_disabled) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -91,11 +101,12 @@ impl SigVerifyStage { fn verifier_services( packet_receiver: PacketReceiver, verified_sender: Sender, + sigverify_disabled: bool, ) -> Vec> { let sender = Arc::new(Mutex::new(verified_sender)); let receiver = Arc::new(Mutex::new(packet_receiver)); (0..4) - .map(|_| Self::verifier_service(receiver.clone(), sender.clone())) + .map(|_| Self::verifier_service(receiver.clone(), sender.clone(), sigverify_disabled)) .collect() } } diff --git a/src/thin_client.rs b/src/thin_client.rs index 131764759..abc776473 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -313,6 +313,7 @@ mod tests { leader, exit.clone(), sink(), + false, ); sleep(Duration::from_millis(900)); @@ -356,6 +357,7 @@ mod tests { leader, exit.clone(), sink(), + false, ); //TODO: remove this sleep, or add a retry so CI is stable sleep(Duration::from_millis(300)); @@ -410,6 +412,7 @@ mod tests { leader, exit.clone(), sink(), + false, ); sleep(Duration::from_millis(300)); diff --git a/src/tpu.rs b/src/tpu.rs index d9b4eec29..dd79e97b5 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -59,13 +59,15 @@ impl Tpu { blob_recycler: &BlobRecycler, exit: Arc, writer: W, + sigverify_disabled: bool, ) -> (Self, BlobReceiver) { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = FetchStage::new(transactions_socket, exit, &packet_recycler); - let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver); + let (sigverify_stage, verified_receiver) = + SigVerifyStage::new(packet_receiver, sigverify_disabled); let (banking_stage, signal_receiver) = BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone()); diff --git a/tests/multinode.rs b/tests/multinode.rs index 9bfe9185e..598d39a8e 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -392,7 +392,7 @@ fn test_multi_node_dynamic_network() { let (alice, ledger_path) = genesis(10_000_000); let alice_arc = Arc::new(RwLock::new(alice)); let leader_data = leader.data.clone(); - let server = FullNode::new( + let server = FullNode::new_without_sigverify( leader, true, LedgerFile::Path(ledger_path.clone()), @@ -460,7 +460,7 @@ fn test_multi_node_dynamic_network() { let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let rd = validator.data.clone(); info!("starting {:8x} {:x}", keypair.pubkey(), rd.debug_id()); - let val = FullNode::new( + let val = FullNode::new_without_sigverify( validator, false, LedgerFile::Path(ledger_path.clone()), @@ -519,7 +519,7 @@ fn test_multi_node_dynamic_network() { if distance > max_distance_increase { info!("Node {:x} is behind by {}", server.0.debug_id(), distance); max_distance_increase = distance; - if max_distance_increase > purge_lag as i64 { + if max_distance_increase as u64 > purge_lag as u64 { server.1.exit(); info!("Node {:x} is exiting", server.0.debug_id()); retain_me = false;