diff --git a/Cargo.toml b/Cargo.toml index 839eb8152..d9138fc4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,10 @@ path = "src/bin/bench-tps.rs" name = "solana-drone" path = "src/bin/drone.rs" +[[bin]] +name = "solana-replicator" +path = "src/bin/replicator.rs" + [[bin]] name = "solana-fullnode" path = "src/bin/fullnode.rs" diff --git a/src/bank.rs b/src/bank.rs index 60cccadd1..153aec2c2 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -21,6 +21,7 @@ use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::RwLock; use std::time::Instant; +use storage_program::StorageProgram; use system_program::SystemProgram; use timing::{duration_as_us, timestamp}; use transaction::Transaction; @@ -359,6 +360,8 @@ impl Bank { // TODO: the runtime should be checking read/write access to memory // we are trusting the hard coded contracts not to clobber or allocate BudgetState::process_transaction(&tx, accounts) + } else if StorageProgram::check_id(&tx.program_id) { + StorageProgram::process_transaction(&tx, accounts) } else { return Err(BankError::UnknownContractId(tx.program_id)); } diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs new file mode 100644 index 000000000..cadde6ba9 --- /dev/null +++ b/src/bin/replicator.rs @@ -0,0 +1,89 @@ +#[macro_use] +extern crate clap; +extern crate getopts; +extern crate serde_json; +#[macro_use] +extern crate solana; + +use clap::{App, Arg}; +use solana::crdt::Node; +use solana::fullnode::Config; +use solana::logger; +use solana::replicator::Replicator; +use solana::signature::{Keypair, KeypairUtil}; +use std::fs::File; +use std::net::{Ipv4Addr, SocketAddr}; +use std::process::exit; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +fn main() { + logger::setup(); + let matches = App::new("replicator") + .version(crate_version!()) + .arg( + Arg::with_name("identity") + .short("i") + .long("identity") + .value_name("PATH") + .takes_value(true) + .help("Run with the identity found in FILE"), + ).arg( + Arg::with_name("network") + .short("n") + .long("network") + .value_name("HOST:PORT") + .takes_value(true) + .help("Rendezvous with the network at this gossip entry point"), + ).arg( + Arg::with_name("ledger") + .short("l") + .long("ledger") + .value_name("DIR") + .takes_value(true) + .required(true) + .help("use DIR as persistent ledger location"), + ).get_matches(); + + let ledger_path = matches.value_of("ledger"); + + let (keypair, ncp) = if let Some(i) = matches.value_of("identity") { + let path = i.to_string(); + if let Ok(file) = File::open(path.clone()) { + let parse: serde_json::Result = serde_json::from_reader(file); + if let Ok(data) = parse { + (data.keypair(), data.node_info.contact_info.ncp) + } else { + eprintln!("failed to parse {}", path); + exit(1); + } + } else { + eprintln!("failed to read {}", path); + exit(1); + } + } else { + (Keypair::new(), socketaddr!([127, 0, 0, 1], 8700)) + }; + + let node = Node::new_with_external_ip(keypair.pubkey(), &ncp); + + println!( + "replicating the data with keypair: {:?} ncp:{:?}", + keypair.pubkey(), + ncp + ); + println!("my node: {:?}", node); + + let exit = Arc::new(AtomicBool::new(false)); + + let network_addr = matches + .value_of("network") + .map(|network| network.parse().expect("failed to parse network address")); + + // TODO: ask network what slice we should store + let entry_height = 0; + + let replicator = Replicator::new(entry_height, &exit, ledger_path, node, network_addr); + + replicator.join(); +} diff --git a/src/crdt.rs b/src/crdt.rs index aadf1191e..18211c4f1 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -82,6 +82,8 @@ pub struct ContactInfo { pub rpu: SocketAddr, /// transactions address pub tpu: SocketAddr, + /// storage data address + pub storage_addr: SocketAddr, /// if this struture changes update this value as well /// Always update `NodeInfo` version too /// This separate version for addresses allows us to use the `Vote` @@ -116,6 +118,7 @@ impl NodeInfo { tvu: SocketAddr, rpu: SocketAddr, tpu: SocketAddr, + storage_addr: SocketAddr, ) -> Self { NodeInfo { id, @@ -125,6 +128,7 @@ impl NodeInfo { tvu, rpu, tpu, + storage_addr, version: 0, }, leader_id: Pubkey::default(), @@ -134,19 +138,30 @@ impl NodeInfo { } } + pub fn new_localhost(id: Pubkey) -> Self { + Self::new( + id, + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), + socketaddr!("127.0.0.1:1238"), + ) + } + #[cfg(test)] /// NodeInfo with unspecified addresses for adversarial testing. pub fn new_unspecified() -> Self { let addr = socketaddr!(0, 0); assert!(addr.ip().is_unspecified()); - Self::new(Keypair::new().pubkey(), addr, addr, addr, addr) + Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr) } #[cfg(test)] /// NodeInfo with multicast addresses for adversarial testing. pub fn new_multicast() -> Self { let addr = socketaddr!("224.0.1.255:1000"); assert!(addr.ip().is_multicast()); - Self::new(Keypair::new().pubkey(), addr, addr, addr, addr) + Self::new(Keypair::new().pubkey(), addr, addr, addr, addr, addr) } fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr { let mut nxt_addr = *addr; @@ -164,6 +179,7 @@ impl NodeInfo { replicate_addr, requests_addr, transactions_addr, + "0.0.0.0:0".parse().unwrap(), ) } pub fn new_with_socketaddr(bind_addr: &SocketAddr) -> Self { @@ -173,7 +189,7 @@ impl NodeInfo { // pub fn new_entry_point(gossip_addr: &SocketAddr) -> Self { let daddr: SocketAddr = socketaddr!("0.0.0.0:0"); - NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr) + NodeInfo::new(Pubkey::default(), *gossip_addr, daddr, daddr, daddr, daddr) } } @@ -1238,11 +1254,12 @@ impl Crdt { let pubkey = Keypair::new().pubkey(); let daddr = socketaddr_any!(); - let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr); + let node = NodeInfo::new(pubkey, daddr, daddr, daddr, daddr, daddr); (node, gossip_socket) } } +#[derive(Debug)] pub struct Sockets { pub gossip: UdpSocket, pub requests: UdpSocket, @@ -1254,6 +1271,7 @@ pub struct Sockets { pub retransmit: UdpSocket, } +#[derive(Debug)] pub struct Node { pub info: NodeInfo, pub sockets: Sockets, @@ -1274,12 +1292,14 @@ impl Node { let respond = UdpSocket::bind("0.0.0.0:0").unwrap(); let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); + let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = NodeInfo::new( pubkey, gossip.local_addr().unwrap(), replicate.local_addr().unwrap(), requests.local_addr().unwrap(), transaction.local_addr().unwrap(), + storage.local_addr().unwrap(), ); Node { info, @@ -1317,6 +1337,7 @@ impl Node { let (_, repair) = bind(); let (_, broadcast) = bind(); let (_, retransmit) = bind(); + let (storage_port, _) = bind(); // Responses are sent from the same Udp port as requests are received // from, in hopes that a NAT sitting in the middle will route the @@ -1329,6 +1350,7 @@ impl Node { SocketAddr::new(ncp.ip(), replicate_port), SocketAddr::new(ncp.ip(), requests_port), SocketAddr::new(ncp.ip(), transaction_port), + SocketAddr::new(ncp.ip(), storage_port), ); trace!("new NodeInfo: {:?}", info); @@ -1380,13 +1402,7 @@ mod tests { #[test] fn insert_test() { - let mut d = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - ); + let mut d = NodeInfo::new_localhost(Keypair::new().pubkey()); assert_eq!(d.version, 0); let mut crdt = Crdt::new(d.clone()).unwrap(); assert_eq!(crdt.table[&d.id].version, 0); @@ -1486,27 +1502,9 @@ mod tests { } #[test] fn update_test() { - let d1 = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - ); - let d2 = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - ); - let d3 = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - ); + let d1 = NodeInfo::new_localhost(Keypair::new().pubkey()); + let d2 = NodeInfo::new_localhost(Keypair::new().pubkey()); + let d3 = NodeInfo::new_localhost(Keypair::new().pubkey()); let mut crdt = Crdt::new(d1.clone()).expect("Crdt::new"); let (key, ix, ups) = crdt.get_updates_since(0); assert_eq!(key, d1.id); @@ -1542,13 +1540,7 @@ mod tests { } #[test] fn window_index_request() { - let me = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!([127, 0, 0, 1], 1234), - socketaddr!([127, 0, 0, 1], 1235), - socketaddr!([127, 0, 0, 1], 1236), - socketaddr!([127, 0, 0, 1], 1237), - ); + let me = NodeInfo::new_localhost(Keypair::new().pubkey()); let mut crdt = Crdt::new(me).expect("Crdt::new"); let rv = crdt.window_index_request(0); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); @@ -1560,6 +1552,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1235), socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1237), + socketaddr!([127, 0, 0, 1], 1238), ); crdt.insert(&nxt); let rv = crdt.window_index_request(0).unwrap(); @@ -1573,6 +1566,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1235), socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1237), + socketaddr!([127, 0, 0, 1], 1238), ); crdt.insert(&nxt); let mut one = false; @@ -1598,6 +1592,7 @@ mod tests { socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"), socketaddr!("127.0.0.1:127"), + socketaddr!("127.0.0.1:127"), ); let mut crdt = Crdt::new(me).expect("Crdt::new"); @@ -1616,23 +1611,11 @@ mod tests { /// test that gossip requests are eventually generated for all nodes #[test] fn gossip_request() { - let me = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - ); + let me = NodeInfo::new_localhost(Keypair::new().pubkey()); let mut crdt = Crdt::new(me.clone()).expect("Crdt::new"); let rv = crdt.gossip_request(); assert_matches!(rv, Err(Error::CrdtError(CrdtError::NoPeers))); - let nxt1 = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.2:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - ); + let nxt1 = NodeInfo::new_localhost(Keypair::new().pubkey()); crdt.insert(&nxt1); @@ -1754,6 +1737,7 @@ mod tests { socketaddr!("127.0.0.1:1235"), socketaddr!("127.0.0.1:1236"), socketaddr!("127.0.0.1:1237"), + socketaddr!("127.0.0.1:1238"), ); let recycler = BlobRecycler::default(); let rv = Crdt::run_window_request( @@ -1922,6 +1906,7 @@ mod tests { socketaddr_any!(), socketaddr!("127.0.0.1:1236"), socketaddr_any!(), + socketaddr_any!(), ); leader3.ledger_state.last_id = hash(b"3"); let mut crdt = Crdt::new(leader0.clone()).expect("Crdt::new"); @@ -2011,13 +1996,7 @@ mod tests { #[test] fn test_default_leader() { logger::setup(); - let node_info = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - ); + let node_info = NodeInfo::new_localhost(Keypair::new().pubkey()); let mut crdt = Crdt::new(node_info).unwrap(); let network_entry_point = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1239")); crdt.insert(&network_entry_point); diff --git a/src/erasure.rs b/src/erasure.rs index 0ba8233a5..7233fa8ae 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -737,13 +737,7 @@ mod test { blobs.push(b_); } - let d = crdt::NodeInfo::new( - Keypair::new().pubkey(), - "127.0.0.1:1234".parse().unwrap(), - "127.0.0.1:1235".parse().unwrap(), - "127.0.0.1:1236".parse().unwrap(), - "127.0.0.1:1237".parse().unwrap(), - ); + let d = crdt::NodeInfo::new_localhost(Keypair::new().pubkey()); assert!(index_blobs(&d, &blobs, &mut (offset as u64)).is_ok()); for b in blobs { let idx = b.read().get_index().unwrap() as usize % WINDOW_SIZE; diff --git a/src/lib.rs b/src/lib.rs index 566a7014a..d7bed5c68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ pub mod recorder; pub mod recvmmsg; pub mod recycler; pub mod replicate_stage; +pub mod replicator; pub mod request; pub mod request_processor; pub mod request_stage; @@ -54,6 +55,8 @@ pub mod service; pub mod signature; pub mod sigverify; pub mod sigverify_stage; +pub mod storage_program; +pub mod store_ledger_stage; pub mod streamer; pub mod system_program; pub mod thin_client; diff --git a/src/replicator.rs b/src/replicator.rs new file mode 100644 index 000000000..edaa8bd02 --- /dev/null +++ b/src/replicator.rs @@ -0,0 +1,181 @@ +use blob_fetch_stage::BlobFetchStage; +use crdt::{Crdt, Node, NodeInfo}; +use ncp::Ncp; +use service::Service; +use std::net::SocketAddr; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; +use store_ledger_stage::StoreLedgerStage; +use streamer::BlobReceiver; +use window; +use window_service::window_service; + +pub struct Replicator { + ncp: Ncp, + fetch_stage: BlobFetchStage, + store_ledger_stage: StoreLedgerStage, + t_window: JoinHandle<()>, + pub retransmit_receiver: BlobReceiver, +} + +impl Replicator { + pub fn new( + entry_height: u64, + exit: &Arc, + ledger_path: Option<&str>, + node: Node, + network_addr: Option, + ) -> Replicator { + let window = window::new_window_from_entries(&[], entry_height, &node.info); + let shared_window = Arc::new(RwLock::new(window)); + + let crdt = Arc::new(RwLock::new(Crdt::new(node.info).expect("Crdt::new"))); + + let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i)); + + if let Some(leader_info) = leader_info.as_ref() { + crdt.write().unwrap().insert(leader_info); + } else { + panic!("No leader info!"); + } + + let repair_socket = Arc::new(node.sockets.repair); + let mut blob_sockets: Vec> = + node.sockets.replicate.into_iter().map(Arc::new).collect(); + blob_sockets.push(repair_socket.clone()); + let (fetch_stage, blob_fetch_receiver) = + BlobFetchStage::new_multi_socket(blob_sockets, exit.clone()); + + let (blob_window_sender, blob_window_receiver) = channel(); + // todo: pull blobs off the retransmit_receiver and recycle them? + let (retransmit_sender, retransmit_receiver) = channel(); + let t_window = window_service( + crdt.clone(), + shared_window.clone(), + entry_height, + blob_fetch_receiver, + blob_window_sender, + retransmit_sender, + repair_socket, + ); + + let store_ledger_stage = StoreLedgerStage::new(blob_window_receiver, ledger_path); + + let ncp = Ncp::new( + &crdt, + shared_window.clone(), + ledger_path, + node.sockets.gossip, + exit.clone(), + ); + + Replicator { + ncp, + fetch_stage, + store_ledger_stage, + t_window, + retransmit_receiver, + } + } + + pub fn join(self) { + self.ncp.join().unwrap(); + self.t_window.join().unwrap(); + self.fetch_stage.join().unwrap(); + self.store_ledger_stage.join().unwrap(); + } +} + +#[cfg(test)] +mod tests { + use client::mk_client; + use crdt::Node; + use fullnode::Fullnode; + use ledger::{genesis, read_ledger}; + use logger; + use replicator::Replicator; + use signature::{Keypair, KeypairUtil}; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::thread::sleep; + use std::time::Duration; + + #[test] + fn test_replicator_startup() { + logger::setup(); + info!("starting replicator test"); + let entry_height = 0; + let replicator_ledger_path = "replicator_test_replicator_ledger"; + + let exit = Arc::new(AtomicBool::new(false)); + + let leader_ledger_path = "replicator_test_leader_ledger"; + let (mint, leader_ledger_path) = genesis(leader_ledger_path, 100); + + info!("starting leader node"); + let leader_keypair = Keypair::new(); + let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let network_addr = leader_node.sockets.gossip.local_addr().unwrap(); + let leader_info = leader_node.info.clone(); + let leader_rotation_interval = 20; + let leader = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + None, + false, + Some(leader_rotation_interval), + ); + + let mut leader_client = mk_client(&leader_info); + + let bob = Keypair::new(); + + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); + + let replicator_keypair = Keypair::new(); + + info!("starting replicator node"); + let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey()); + let replicator = Replicator::new( + entry_height, + &exit, + Some(replicator_ledger_path), + replicator_node, + Some(network_addr), + ); + + let mut num_entries = 0; + for _ in 0..10 { + match read_ledger(replicator_ledger_path, true) { + Ok(entries) => { + for _ in entries { + num_entries += 1; + } + info!("{} entries", num_entries); + if num_entries > 0 { + break; + } + } + Err(e) => { + info!("error reading ledger: {:?}", e); + } + } + sleep(Duration::new(1, 0)); + let last_id = leader_client.get_last_id(); + leader_client + .transfer(1, &mint.keypair(), bob.pubkey(), &last_id) + .unwrap(); + } + assert!(num_entries > 0); + exit.store(true, Ordering::Relaxed); + replicator.join(); + leader.exit(); + } +} diff --git a/src/storage_program.rs b/src/storage_program.rs new file mode 100644 index 000000000..3923bfa45 --- /dev/null +++ b/src/storage_program.rs @@ -0,0 +1,42 @@ +//! storage program +//! Receive mining proofs from miners, validate the answers +//! and give reward for good proofs. + +use bank::Account; +use bincode::deserialize; +use signature::Pubkey; +use transaction::Transaction; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum StorageProgram { + SubmitMiningProof { sha_state: [u8; 32] }, +} + +pub const STORAGE_PROGRAM_ID: [u8; 32] = [1u8; 32]; + +impl StorageProgram { + pub fn check_id(program_id: &Pubkey) -> bool { + program_id.as_ref() == STORAGE_PROGRAM_ID + } + + pub fn id() -> Pubkey { + Pubkey::new(&STORAGE_PROGRAM_ID) + } + + pub fn get_balance(account: &Account) -> i64 { + account.tokens + } + + pub fn process_transaction(tx: &Transaction, _accounts: &mut [Account]) { + let syscall: StorageProgram = deserialize(&tx.userdata).unwrap(); + match syscall { + StorageProgram::SubmitMiningProof { sha_state } => { + info!("Mining proof submitted with state {}", sha_state[0]); + return; + } + } + } +} + +#[cfg(test)] +mod test {} diff --git a/src/store_ledger_stage.rs b/src/store_ledger_stage.rs new file mode 100644 index 000000000..e18f1d882 --- /dev/null +++ b/src/store_ledger_stage.rs @@ -0,0 +1,73 @@ +//! The `store_ledger` stores the ledger from received blobs for storage nodes + +use counter::Counter; +use ledger::{reconstruct_entries_from_blobs, LedgerWriter}; +use log::Level; +use result::{Error, Result}; +use service::Service; +use std::sync::atomic::AtomicUsize; +use std::sync::mpsc::RecvTimeoutError; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; +use streamer::BlobReceiver; + +pub struct StoreLedgerStage { + thread_hdls: Vec>, +} + +impl StoreLedgerStage { + /// Process entry blobs, already in order + fn store_requests( + window_receiver: &BlobReceiver, + ledger_writer: Option<&mut LedgerWriter>, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let mut blobs = window_receiver.recv_timeout(timer)?; + while let Ok(mut more) = window_receiver.try_recv() { + blobs.append(&mut more); + } + let entries = reconstruct_entries_from_blobs(blobs.clone())?; + + inc_new_counter_info!( + "store-transactions", + entries.iter().map(|x| x.transactions.len()).sum() + ); + + if let Some(ledger_writer) = ledger_writer { + ledger_writer.write_entries(entries)?; + } + + Ok(()) + } + + pub fn new(window_receiver: BlobReceiver, ledger_path: Option<&str>) -> Self { + let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, true).unwrap()); + + let t_store_requests = Builder::new() + .name("solana-store-ledger-stage".to_string()) + .spawn(move || loop { + if let Err(e) = Self::store_requests(&window_receiver, ledger_writer.as_mut()) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => error!("{:?}", e), + } + } + }).unwrap(); + + let thread_hdls = vec![t_store_requests]; + + StoreLedgerStage { thread_hdls } + } +} + +impl Service for StoreLedgerStage { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/src/write_stage.rs b/src/write_stage.rs index ad3b14a30..781b1fe17 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -133,7 +133,7 @@ impl WriteStage { let start = Instant::now(); for _ in 0..ventries.len() { let entries = ventries.pop().unwrap(); - for e in entries.iter() { + for e in &entries { num_txs += e.transactions.len(); } let crdt_votes_start = Instant::now();