Converted sigverify disable flag to runtime check instead of "cfg" (#799)

This commit is contained in:
pgarg66 2018-07-31 16:54:24 -07:00 committed by GitHub
parent 821e3bc3ca
commit 7c5172a65e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 93 additions and 24 deletions

View File

@ -57,7 +57,6 @@ unstable = []
ipv6 = [] ipv6 = []
cuda = [] cuda = []
erasure = [] erasure = []
sigverify_cpu_disable = []
[dependencies] [dependencies]
rayon = "1.0.0" rayon = "1.0.0"

View File

@ -283,6 +283,7 @@ mod tests {
leader, leader,
exit.clone(), exit.clone(),
sink(), sink(),
false,
); );
//TODO: this seems unstable //TODO: this seems unstable
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));

View File

@ -58,12 +58,13 @@ impl Config {
} }
impl FullNode { impl FullNode {
pub fn new( fn new_internal(
mut node: TestNode, mut node: TestNode,
leader: bool, leader: bool,
ledger: LedgerFile, ledger: LedgerFile,
keypair_for_validator: Option<KeyPair>, keypair_for_validator: Option<KeyPair>,
network_entry_for_validator: Option<SocketAddr>, network_entry_for_validator: Option<SocketAddr>,
sigverify_disabled: bool,
) -> FullNode { ) -> FullNode {
info!("creating bank..."); info!("creating bank...");
let bank = Bank::default(); let bank = Bank::default();
@ -112,6 +113,7 @@ impl FullNode {
node, node,
&network_entry_point, &network_entry_point,
exit.clone(), exit.clone(),
sigverify_disabled,
); );
info!( info!(
"validator ready... local request address: {} (advertising {}) connected to: {}", "validator ready... local request address: {} (advertising {}) connected to: {}",
@ -130,6 +132,7 @@ impl FullNode {
node, node,
exit.clone(), exit.clone(),
outfile, outfile,
sigverify_disabled,
); );
info!( info!(
"leader ready... local request address: {} (advertising {})", "leader ready... local request address: {} (advertising {})",
@ -139,6 +142,40 @@ impl FullNode {
} }
} }
pub fn new(
node: TestNode,
leader: bool,
ledger: LedgerFile,
keypair_for_validator: Option<KeyPair>,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
FullNode::new_internal(
node,
leader,
ledger,
keypair_for_validator,
network_entry_for_validator,
false,
)
}
pub fn new_without_sigverify(
node: TestNode,
leader: bool,
ledger: LedgerFile,
keypair_for_validator: Option<KeyPair>,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
FullNode::new_internal(
node,
leader,
ledger,
keypair_for_validator,
network_entry_for_validator,
true,
)
}
fn new_window( fn new_window(
ledger_tail: Option<Vec<Entry>>, ledger_tail: Option<Vec<Entry>>,
entry_height: u64, entry_height: u64,
@ -191,6 +228,7 @@ impl FullNode {
node: TestNode, node: TestNode,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
writer: W, writer: W,
sigverify_disabled: bool,
) -> Self { ) -> Self {
let bank = Arc::new(bank); let bank = Arc::new(bank);
let mut thread_hdls = vec![]; let mut thread_hdls = vec![];
@ -214,6 +252,7 @@ impl FullNode {
&blob_recycler, &blob_recycler,
exit.clone(), exit.clone(),
writer, writer,
sigverify_disabled,
); );
thread_hdls.extend(tpu.thread_hdls()); thread_hdls.extend(tpu.thread_hdls());
let ncp = Ncp::new( let ncp = Ncp::new(
@ -275,6 +314,7 @@ impl FullNode {
node: TestNode, node: TestNode,
entry_point: &NodeInfo, entry_point: &NodeInfo,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
_sigverify_disabled: bool,
) -> Self { ) -> Self {
let bank = Arc::new(bank); let bank = Arc::new(bank);
let mut thread_hdls = vec![]; let mut thread_hdls = vec![];
@ -359,7 +399,7 @@ mod tests {
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone(); let entry = tn.data.clone();
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit); let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false);
v.exit(); v.exit();
v.join().unwrap(); v.join().unwrap();
} }
@ -373,7 +413,7 @@ mod tests {
let bank = Bank::new(&alice); let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone(); let entry = tn.data.clone();
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit) FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false)
}) })
.collect(); .collect();
//each validator can exit in parallel to speed many sequential calls to `join` //each validator can exit in parallel to speed many sequential calls to `join`

0
src/nat.rs Executable file → Normal file
View File

31
src/sigverify.rs Executable file → Normal file
View File

@ -6,13 +6,8 @@
use counter::Counter; use counter::Counter;
use packet::{Packet, SharedPackets}; use packet::{Packet, SharedPackets};
#[cfg(not(feature = "sigverify_cpu_disable"))]
use std::mem::size_of; use std::mem::size_of;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
#[cfg(not(feature = "sigverify_cpu_disable"))]
use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET}; use transaction::{PUB_KEY_OFFSET, SIGNED_DATA_OFFSET, SIG_OFFSET};
pub const TX_OFFSET: usize = 0; pub const TX_OFFSET: usize = 0;
@ -47,7 +42,6 @@ pub fn init() {
} }
#[cfg(not(feature = "cuda"))] #[cfg(not(feature = "cuda"))]
#[cfg(not(feature = "sigverify_cpu_disable"))]
fn verify_packet(packet: &Packet) -> u8 { fn verify_packet(packet: &Packet) -> u8 {
use ring::signature; use ring::signature;
use signature::{PublicKey, Signature}; use signature::{PublicKey, Signature};
@ -72,10 +66,9 @@ fn verify_packet(packet: &Packet) -> u8 {
).is_ok() as u8 ).is_ok() as u8
} }
#[cfg(feature = "sigverify_cpu_disable")] fn verify_packet_disabled(_packet: &Packet) -> u8 {
fn verify_packet(_packet: &Packet) -> u8 {
warn!("signature verification is disabled"); warn!("signature verification is disabled");
return 1; 1
} }
fn batch_size(batches: &[SharedPackets]) -> usize { fn batch_size(batches: &[SharedPackets]) -> usize {
@ -106,6 +99,26 @@ pub fn ed25519_verify(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
rv rv
} }
#[cfg_attr(feature = "cargo-clippy", allow(ptr_arg))]
pub fn ed25519_verify_disabled(batches: &Vec<SharedPackets>) -> Vec<Vec<u8>> {
use rayon::prelude::*;
let count = batch_size(batches);
info!("CPU ECDSA for {}", batch_size(batches));
let rv = batches
.into_par_iter()
.map(|p| {
p.read()
.expect("'p' read lock in ed25519_verify")
.packets
.par_iter()
.map(verify_packet_disabled)
.collect()
})
.collect();
inc_new_counter!("ed25519_verify", count);
rv
}
#[cfg(feature = "cuda")] #[cfg(feature = "cuda")]
pub fn init() { pub fn init() {
unsafe { unsafe {

View File

@ -24,21 +24,30 @@ pub struct SigVerifyStage {
} }
impl SigVerifyStage { impl SigVerifyStage {
pub fn new(packet_receiver: Receiver<SharedPackets>) -> (Self, Receiver<VerifiedPackets>) { pub fn new(
packet_receiver: Receiver<SharedPackets>,
sigverify_disabled: bool,
) -> (Self, Receiver<VerifiedPackets>) {
sigverify::init(); sigverify::init();
let (verified_sender, verified_receiver) = channel(); let (verified_sender, verified_receiver) = channel();
let thread_hdls = Self::verifier_services(packet_receiver, verified_sender); let thread_hdls =
Self::verifier_services(packet_receiver, verified_sender, sigverify_disabled);
(SigVerifyStage { thread_hdls }, verified_receiver) (SigVerifyStage { thread_hdls }, verified_receiver)
} }
fn verify_batch(batch: Vec<SharedPackets>) -> VerifiedPackets { fn verify_batch(batch: Vec<SharedPackets>, sigverify_disabled: bool) -> VerifiedPackets {
let r = sigverify::ed25519_verify(&batch); let r = if sigverify_disabled {
sigverify::ed25519_verify_disabled(&batch)
} else {
sigverify::ed25519_verify(&batch)
};
batch.into_iter().zip(r).collect() batch.into_iter().zip(r).collect()
} }
fn verifier( fn verifier(
recvr: &Arc<Mutex<PacketReceiver>>, recvr: &Arc<Mutex<PacketReceiver>>,
sendr: &Arc<Mutex<Sender<VerifiedPackets>>>, sendr: &Arc<Mutex<Sender<VerifiedPackets>>>,
sigverify_disabled: bool,
) -> Result<()> { ) -> Result<()> {
let (batch, len) = let (batch, len) =
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
@ -53,7 +62,7 @@ impl SigVerifyStage {
rand_id rand_id
); );
let verified_batch = Self::verify_batch(batch); let verified_batch = Self::verify_batch(batch, sigverify_disabled);
sendr sendr
.lock() .lock()
.expect("lock in fn verify_batch in tpu") .expect("lock in fn verify_batch in tpu")
@ -76,9 +85,10 @@ impl SigVerifyStage {
fn verifier_service( fn verifier_service(
packet_receiver: Arc<Mutex<PacketReceiver>>, packet_receiver: Arc<Mutex<PacketReceiver>>,
verified_sender: Arc<Mutex<Sender<VerifiedPackets>>>, verified_sender: Arc<Mutex<Sender<VerifiedPackets>>>,
sigverify_disabled: bool,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
spawn(move || loop { spawn(move || loop {
if let Err(e) = Self::verifier(&packet_receiver, &verified_sender) { if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, sigverify_disabled) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -91,11 +101,12 @@ impl SigVerifyStage {
fn verifier_services( fn verifier_services(
packet_receiver: PacketReceiver, packet_receiver: PacketReceiver,
verified_sender: Sender<VerifiedPackets>, verified_sender: Sender<VerifiedPackets>,
sigverify_disabled: bool,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let sender = Arc::new(Mutex::new(verified_sender)); let sender = Arc::new(Mutex::new(verified_sender));
let receiver = Arc::new(Mutex::new(packet_receiver)); let receiver = Arc::new(Mutex::new(packet_receiver));
(0..4) (0..4)
.map(|_| Self::verifier_service(receiver.clone(), sender.clone())) .map(|_| Self::verifier_service(receiver.clone(), sender.clone(), sigverify_disabled))
.collect() .collect()
} }
} }

View File

@ -313,6 +313,7 @@ mod tests {
leader, leader,
exit.clone(), exit.clone(),
sink(), sink(),
false,
); );
sleep(Duration::from_millis(900)); sleep(Duration::from_millis(900));
@ -356,6 +357,7 @@ mod tests {
leader, leader,
exit.clone(), exit.clone(),
sink(), sink(),
false,
); );
//TODO: remove this sleep, or add a retry so CI is stable //TODO: remove this sleep, or add a retry so CI is stable
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));
@ -410,6 +412,7 @@ mod tests {
leader, leader,
exit.clone(), exit.clone(),
sink(), sink(),
false,
); );
sleep(Duration::from_millis(300)); sleep(Duration::from_millis(300));

View File

@ -59,13 +59,15 @@ impl Tpu {
blob_recycler: &BlobRecycler, blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
writer: W, writer: W,
sigverify_disabled: bool,
) -> (Self, BlobReceiver) { ) -> (Self, BlobReceiver) {
let packet_recycler = PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let (fetch_stage, packet_receiver) = let (fetch_stage, packet_receiver) =
FetchStage::new(transactions_socket, exit, &packet_recycler); FetchStage::new(transactions_socket, exit, &packet_recycler);
let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver); let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled);
let (banking_stage, signal_receiver) = let (banking_stage, signal_receiver) =
BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone()); BankingStage::new(bank.clone(), verified_receiver, packet_recycler.clone());

View File

@ -392,7 +392,7 @@ fn test_multi_node_dynamic_network() {
let (alice, ledger_path) = genesis(10_000_000); let (alice, ledger_path) = genesis(10_000_000);
let alice_arc = Arc::new(RwLock::new(alice)); let alice_arc = Arc::new(RwLock::new(alice));
let leader_data = leader.data.clone(); let leader_data = leader.data.clone();
let server = FullNode::new( let server = FullNode::new_without_sigverify(
leader, leader,
true, true,
LedgerFile::Path(ledger_path.clone()), LedgerFile::Path(ledger_path.clone()),
@ -460,7 +460,7 @@ fn test_multi_node_dynamic_network() {
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let rd = validator.data.clone(); let rd = validator.data.clone();
info!("starting {:8x} {:x}", keypair.pubkey(), rd.debug_id()); info!("starting {:8x} {:x}", keypair.pubkey(), rd.debug_id());
let val = FullNode::new( let val = FullNode::new_without_sigverify(
validator, validator,
false, false,
LedgerFile::Path(ledger_path.clone()), LedgerFile::Path(ledger_path.clone()),
@ -519,7 +519,7 @@ fn test_multi_node_dynamic_network() {
if distance > max_distance_increase { if distance > max_distance_increase {
info!("Node {:x} is behind by {}", server.0.debug_id(), distance); info!("Node {:x} is behind by {}", server.0.debug_id(), distance);
max_distance_increase = distance; max_distance_increase = distance;
if max_distance_increase > purge_lag as i64 { if max_distance_increase as u64 > purge_lag as u64 {
server.1.exit(); server.1.exit();
info!("Node {:x} is exiting", server.0.debug_id()); info!("Node {:x} is exiting", server.0.debug_id());
retain_me = false; retain_me = false;