diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 35d7afa0de..2f4c8427d1 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -54,7 +54,7 @@ fn main() -> Result<(), Box> { let mint = Mint::new_with_pkcs8(tokens, pkcs8); let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?; - ledger_writer.write_entries(mint.create_entries())?; + ledger_writer.write_entries(&mint.create_entries())?; Ok(()) } diff --git a/src/chacha_cuda.rs b/src/chacha_cuda.rs index d02eda8076..c8585d673a 100644 --- a/src/chacha_cuda.rs +++ b/src/chacha_cuda.rs @@ -104,7 +104,7 @@ mod tests { let ledger_path = get_tmp_ledger_path(ledger_dir); { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(entries.clone()).unwrap(); + writer.write_entries(&entries).unwrap(); } let out_path = Path::new("test_chacha_encrypt_file_many_keys_output.txt.enc"); diff --git a/src/cluster_info.rs b/src/cluster_info.rs index eed0ba16e8..44f6433c04 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -1823,7 +1823,7 @@ mod tests { let zero = Hash::default(); let one = hash(&zero.as_ref()); writer - .write_entries(vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec()) + .write_entries(&vec![Entry::new_tick(0, &zero), Entry::new_tick(0, &one)].to_vec()) .unwrap(); path } diff --git a/src/fullnode.rs b/src/fullnode.rs index d16a19519c..553c26c94c 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -817,7 +817,7 @@ mod tests { .iter() .fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64) + num_ending_ticks as u64; - ledger_writer.write_entries(active_set_entries).unwrap(); + ledger_writer.write_entries(&active_set_entries).unwrap(); // Create the common leader scheduling configuration let num_slots_per_epoch = 3; @@ -913,7 +913,7 @@ mod tests { let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let active_set_entries_len = active_set_entries.len() as u64; last_id = active_set_entries.last().unwrap().id; - ledger_writer.write_entries(active_set_entries).unwrap(); + ledger_writer.write_entries(&active_set_entries).unwrap(); let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len; // Set the leader scheduler for the validator diff --git a/src/write_stage.rs b/src/leader_vote_stage.rs similarity index 63% rename from src/write_stage.rs rename to src/leader_vote_stage.rs index ef4b85e8f7..7eae6b273c 100644 --- a/src/write_stage.rs +++ b/src/leader_vote_stage.rs @@ -1,12 +1,12 @@ -//! The `write_stage` module implements the TPU's write stage. It -//! writes entries to the given writer, which is typically a file or -//! stdout, and then sends the Entry to its output channel. +//! The `leader_vote_stage` module implements the TPU's vote stage. It +//! computes and notes the votes for the entries, and then sends the +//! Entry to its output channel. use bank::Bank; use cluster_info::ClusterInfo; use counter::Counter; use entry::Entry; -use ledger::{Block, LedgerWriter}; +use ledger::Block; use log::Level; use result::{Error, Result}; use service::Service; @@ -18,20 +18,19 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use streamer::responder; -use timing::{duration_as_ms, duration_as_s}; +use timing::duration_as_ms; use vote_stage::send_leader_vote; -pub struct WriteStage { +pub struct LeaderVoteStage { thread_hdls: Vec>, - write_thread: JoinHandle<()>, + vote_thread: JoinHandle<()>, } -impl WriteStage { +impl LeaderVoteStage { /// Process any Entry items that have been published by the RecordStage. /// continuosly send entries out - pub fn write_and_send_entries( + pub fn compute_vote_and_send_entries( cluster_info: &Arc>, - ledger_writer: &mut LedgerWriter, entry_sender: &Sender>, entry_receiver: &Receiver>, ) -> Result<()> { @@ -39,7 +38,6 @@ impl WriteStage { let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let now = Instant::now(); let mut num_new_entries = 0; - let mut num_txs = 0; loop { num_new_entries += received_entries.len(); @@ -51,84 +49,60 @@ impl WriteStage { break; } } - inc_new_counter_info!("write_stage-entries_received", num_new_entries); + inc_new_counter_info!("leader_vote_stage-entries_received", num_new_entries); + debug!("leader_vote_stage entries: {}", num_new_entries); - debug!("write_stage entries: {}", num_new_entries); - - let mut entries_send_total = 0; - let mut cluster_info_votes_total = 0; - - let start = Instant::now(); for entries in ventries { - let cluster_info_votes_start = Instant::now(); let votes = &entries.votes(); cluster_info.write().unwrap().insert_votes(&votes); - cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed()); - for e in &entries { - num_txs += e.transactions.len(); - ledger_writer.write_entry_noflush(&e)?; - } - - inc_new_counter_info!("write_stage-write_entries", entries.len()); + inc_new_counter_info!("leader_vote_stage-write_entries", entries.len()); //TODO(anatoly): real stake based voting needs to change this //leader simply votes if the current set of validators have voted //on a valid last id trace!("New entries? {}", entries.len()); - let entries_send_start = Instant::now(); if !entries.is_empty() { - inc_new_counter_info!("write_stage-recv_vote", votes.len()); - inc_new_counter_info!("write_stage-entries_sent", entries.len()); + inc_new_counter_info!("leader_vote_stage-recv_vote", votes.len()); + inc_new_counter_info!("leader_vote_stage-entries_sent", entries.len()); trace!("broadcasting {}", entries.len()); entry_sender.send(entries)?; } - - entries_send_total += duration_as_ms(&entries_send_start.elapsed()); } - ledger_writer.flush()?; inc_new_counter_info!( - "write_stage-time_ms", + "leader_vote_stage-time_ms", duration_as_ms(&now.elapsed()) as usize ); - debug!("done write_stage txs: {} time {} ms txs/s: {} entries_send_total: {} cluster_info_votes_total: {}", - num_txs, duration_as_ms(&start.elapsed()), - num_txs as f32 / duration_as_s(&start.elapsed()), - entries_send_total, - cluster_info_votes_total); Ok(()) } - /// Create a new WriteStage for writing and broadcasting entries. + /// Create a new LeaderVoteStage for voting and broadcasting entries. pub fn new( keypair: Arc, bank: Arc, cluster_info: Arc>, - ledger_path: &str, entry_receiver: Receiver>, ) -> (Self, Receiver>) { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder( - "write_stage_vote_sender", + "leader_vote_stage_vote_sender", Arc::new(send), vote_blob_receiver, ); let (entry_sender, entry_receiver_forward) = channel(); - let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap(); - let write_thread = Builder::new() + let vote_thread = Builder::new() .name("solana-writer".to_string()) .spawn(move || { let mut last_vote = 0; let mut last_valid_validator_timestamp = 0; let id = cluster_info.read().unwrap().id; loop { - if let Err(e) = Self::write_and_send_entries( + if let Err(e) = Self::compute_vote_and_send_entries( &cluster_info, - &mut ledger_writer, &entry_sender, &entry_receiver, ) { @@ -139,7 +113,7 @@ impl WriteStage { Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { inc_new_counter_info!( - "write_stage-write_and_send_entries-error", + "leader_vote_stage-compute_vote_and_send_entries-error", 1 ); error!("{:?}", e); @@ -155,7 +129,7 @@ impl WriteStage { &mut last_vote, &mut last_valid_validator_timestamp, ) { - inc_new_counter_info!("write_stage-leader_vote-error", 1); + inc_new_counter_info!("leader_vote_stage-leader_vote-error", 1); error!("{:?}", e); } } @@ -163,8 +137,8 @@ impl WriteStage { let thread_hdls = vec![t_responder]; ( - WriteStage { - write_thread, + LeaderVoteStage { + vote_thread, thread_hdls, }, entry_receiver_forward, @@ -172,7 +146,7 @@ impl WriteStage { } } -impl Service for WriteStage { +impl Service for LeaderVoteStage { type JoinReturnType = (); fn join(self) -> thread::Result<()> { @@ -180,6 +154,6 @@ impl Service for WriteStage { thread_hdl.join()?; } - self.write_thread.join() + self.vote_thread.join() } } diff --git a/src/ledger.rs b/src/ledger.rs index 49d625e63d..a18076413c 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -372,7 +372,7 @@ impl LedgerWriter { Ok(LedgerWriter { index, data }) } - pub fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> { + fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> { let len = serialized_size(&entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; @@ -399,25 +399,23 @@ impl LedgerWriter { Ok(()) } - pub fn flush(&mut self) -> io::Result<()> { + pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { + self.write_entry_noflush(&entry)?; self.index.flush()?; self.data.flush()?; Ok(()) } - pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { - self.write_entry_noflush(&entry)?; - self.flush() - } - - pub fn write_entries(&mut self, entries: I) -> io::Result<()> + pub fn write_entries<'a, I>(&mut self, entries: I) -> io::Result<()> where - I: IntoIterator, + I: IntoIterator, { for entry in entries { self.write_entry_noflush(&entry)?; } - self.flush() + self.index.flush()?; + self.data.flush()?; + Ok(()) } } @@ -616,7 +614,7 @@ pub fn create_tmp_ledger_with_mint(name: &str, mint: &Mint) -> String { let path = get_tmp_ledger_path(name); let mut writer = LedgerWriter::open(&path, true).unwrap(); - writer.write_entries(mint.create_entries()).unwrap(); + writer.write_entries(&mint.create_entries()).unwrap(); path } @@ -653,7 +651,7 @@ pub fn create_tmp_sample_ledger( genesis.extend(ticks); let mut writer = LedgerWriter::open(&path, true).unwrap(); - writer.write_entries(genesis.clone()).unwrap(); + writer.write_entries(&genesis.clone()).unwrap(); (mint, path, genesis) } @@ -830,7 +828,7 @@ mod tests { { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(entries.clone()).unwrap(); + writer.write_entries(&entries.clone()).unwrap(); // drops writer, flushes buffers } verify_ledger(&ledger_path).unwrap(); @@ -862,7 +860,7 @@ mod tests { fn truncated_last_entry(ledger_path: &str, entries: Vec) { let len = { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(entries).unwrap(); + writer.write_entries(&entries).unwrap(); writer.data.seek(SeekFrom::Current(0)).unwrap() }; verify_ledger(&ledger_path).unwrap(); @@ -876,7 +874,7 @@ mod tests { fn garbage_on_data(ledger_path: &str, entries: Vec) { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(entries).unwrap(); + writer.write_entries(&entries).unwrap(); writer.data.write_all(b"hi there!").unwrap(); } @@ -959,7 +957,7 @@ mod tests { let ledger_path = get_tmp_ledger_path("test_verify_ledger"); { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(entries.clone()).unwrap(); + writer.write_entries(&entries).unwrap(); } // TODO more cases that make ledger_verify() fail // assert!(verify_ledger(&ledger_path).is_err()); @@ -976,7 +974,7 @@ mod tests { let ledger_path = get_tmp_ledger_path("test_raw_entries"); { let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(entries.clone()).unwrap(); + writer.write_entries(&entries).unwrap(); } let mut window = LedgerWindow::open(&ledger_path).unwrap(); diff --git a/src/ledger_write_stage.rs b/src/ledger_write_stage.rs new file mode 100644 index 0000000000..6703c905cd --- /dev/null +++ b/src/ledger_write_stage.rs @@ -0,0 +1,87 @@ +//! The `ledger_write_stage` module implements the ledger write stage. It +//! writes entries to the given writer, which is typically a file + +use counter::Counter; +use entry::{EntryReceiver, EntrySender}; +use ledger::LedgerWriter; +use log::Level; +use result::{Error, Result}; +use service::Service; +use std::sync::atomic::AtomicUsize; +use std::sync::mpsc::RecvTimeoutError; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; + +pub struct LedgerWriteStage { + write_thread: JoinHandle<()>, +} + +impl LedgerWriteStage { + pub fn write( + ledger_writer: Option<&mut LedgerWriter>, + entry_receiver: &EntryReceiver, + forwarder: &Option, + ) -> Result<()> { + let mut ventries = Vec::new(); + let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + + loop { + ventries.push(received_entries); + + if let Ok(n) = entry_receiver.try_recv() { + received_entries = n; + } else { + break; + } + } + + if let Some(ledger_writer) = ledger_writer { + ledger_writer.write_entries(ventries.iter().flatten())?; + } + + if let Some(forwarder) = forwarder { + for entries in ventries { + forwarder.send(entries)?; + } + } + Ok(()) + } + + pub fn new( + ledger_path: Option<&str>, + entry_receiver: EntryReceiver, + forwarder: Option, + ) -> Self { + let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); + + let write_thread = Builder::new() + .name("solana-ledger-writer".to_string()) + .spawn(move || loop { + if let Err(e) = Self::write(ledger_writer.as_mut(), &entry_receiver, &forwarder) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { + break; + } + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + inc_new_counter_info!( + "ledger-write_stage-write_and_send_entries-error", + 1 + ); + error!("{:?}", e); + } + } + }; + }).unwrap(); + + LedgerWriteStage { write_thread } + } +} + +impl Service for LedgerWriteStage { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.write_thread.join() + } +} diff --git a/src/lib.rs b/src/lib.rs index c4754c84f4..bc8b0146db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,9 @@ pub mod fetch_stage; pub mod fullnode; pub mod hash; pub mod leader_scheduler; +pub mod leader_vote_stage; pub mod ledger; +pub mod ledger_write_stage; pub mod loader_transaction; pub mod logger; pub mod metrics; @@ -80,7 +82,6 @@ pub mod vote_stage; pub mod wallet; pub mod window; pub mod window_service; -pub mod write_stage; extern crate bincode; extern crate bs58; extern crate byteorder; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 9a3a5278d7..2a758043c9 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -3,11 +3,11 @@ use bank::Bank; use cluster_info::ClusterInfo; use counter::Counter; -use entry::EntryReceiver; +use entry::{EntryReceiver, EntrySender}; use hash::Hash; use influx_db_client as influxdb; use leader_scheduler::LeaderScheduler; -use ledger::{Block, LedgerWriter}; +use ledger::Block; use log::Level; use metrics; use result::{Error, Result}; @@ -58,9 +58,9 @@ impl ReplicateStage { bank: &Arc, cluster_info: &Arc>, window_receiver: &EntryReceiver, - ledger_writer: Option<&mut LedgerWriter>, keypair: &Arc, vote_blob_sender: Option<&BlobSender>, + ledger_entry_sender: &EntrySender, tick_height: &mut u64, entry_height: &mut u64, leader_scheduler: &Arc>, @@ -149,8 +149,8 @@ impl ReplicateStage { // TODO: In line with previous behavior, this will write all the entries even if // an error occurred processing one of the entries (causing the rest of the entries to // not be processed). - if let Some(ledger_writer) = ledger_writer { - ledger_writer.write_entries(entries)?; + if entries_len != 0 { + ledger_entry_sender.send(entries)?; } *entry_height += entries_len; @@ -163,17 +163,16 @@ impl ReplicateStage { bank: Arc, cluster_info: Arc>, window_receiver: EntryReceiver, - ledger_path: Option<&str>, exit: Arc, tick_height: u64, entry_height: u64, leader_scheduler: Arc>, - ) -> Self { + ) -> (Self, EntryReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); + let (ledger_entry_sender, ledger_entry_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); let t_responder = responder("replicate_stage", Arc::new(send), vote_blob_receiver); - let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap()); let keypair = Arc::new(keypair); let t_replicate = Builder::new() @@ -215,9 +214,9 @@ impl ReplicateStage { &bank, &cluster_info, &window_receiver, - ledger_writer.as_mut(), &keypair, vote_sender, + &ledger_entry_sender, &mut tick_height_, &mut entry_height_, &leader_scheduler, @@ -234,10 +233,13 @@ impl ReplicateStage { None }).unwrap(); - ReplicateStage { - t_responder, - t_replicate, - } + ( + ReplicateStage { + t_responder, + t_replicate, + }, + ledger_entry_receiver, + ) } } @@ -301,7 +303,7 @@ mod test { let active_set_entries_len = active_set_entries.len() as u64; let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height; let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len; - ledger_writer.write_entries(active_set_entries).unwrap(); + ledger_writer.write_entries(&active_set_entries).unwrap(); // Set up the LeaderScheduler so that this this node becomes the leader at // bootstrap_height = num_bootstrap_slots * leader_rotation_interval @@ -328,12 +330,11 @@ mod test { // Set up the replicate stage let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let replicate_stage = ReplicateStage::new( + let (replicate_stage, _ledger_writer_recv) = ReplicateStage::new( Arc::new(my_keypair), Arc::new(bank), Arc::new(RwLock::new(cluster_info_me)), entry_receiver, - Some(&my_ledger_path), exit.clone(), initial_tick_height, initial_entry_len, @@ -381,11 +382,6 @@ mod test { .expect("RwLock for LeaderScheduler is still locked"); leader_scheduler.reset(); - let (_, tick_height, entry_height, _) = - Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); - - assert_eq!(tick_height, bootstrap_height); - assert_eq!(entry_height, expected_entry_height); let _ignored = remove_dir_all(&my_ledger_path); } } diff --git a/src/store_ledger_stage.rs b/src/store_ledger_stage.rs index 4f4a23f064..4c618e0cf2 100644 --- a/src/store_ledger_stage.rs +++ b/src/store_ledger_stage.rs @@ -33,7 +33,7 @@ impl StoreLedgerStage { ); if let Some(ledger_writer) = ledger_writer { - ledger_writer.write_entries(entries)?; + ledger_writer.write_entries(&entries)?; } Ok(()) diff --git a/src/tpu.rs b/src/tpu.rs index dd890e62c8..2656dc7cf4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -31,15 +31,17 @@ use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; use hash::Hash; +use leader_vote_stage::LeaderVoteStage; +use ledger_write_stage::LedgerWriteStage; use service::Service; use signature::Keypair; use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; use std::sync::{Arc, RwLock}; use std::thread; -use write_stage::WriteStage; pub enum TpuReturnType { LeaderRotation, @@ -49,7 +51,8 @@ pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, - write_stage: WriteStage, + leader_vote_stage: LeaderVoteStage, + ledger_write_stage: LedgerWriteStage, exit: Arc, } @@ -83,19 +86,22 @@ impl Tpu { max_tick_height, ); - let (write_stage, entry_forwarder) = WriteStage::new( - keypair, - bank.clone(), - cluster_info.clone(), - ledger_path, - entry_receiver, + let (leader_vote_stage, ledger_entry_receiver) = + LeaderVoteStage::new(keypair, bank.clone(), cluster_info.clone(), entry_receiver); + + let (ledger_entry_sender, entry_forwarder) = channel(); + let ledger_write_stage = LedgerWriteStage::new( + Some(ledger_path), + ledger_entry_receiver, + Some(ledger_entry_sender), ); let tpu = Tpu { fetch_stage, sigverify_stage, banking_stage, - write_stage, + leader_vote_stage, + ledger_write_stage, exit: exit.clone(), }; (tpu, entry_forwarder, exit) @@ -121,7 +127,8 @@ impl Service for Tpu { fn join(self) -> thread::Result<(Option)> { self.fetch_stage.join()?; self.sigverify_stage.join()?; - self.write_stage.join()?; + self.leader_vote_stage.join()?; + self.ledger_write_stage.join()?; match self.banking_stage.join()? { Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)), _ => Ok(None), diff --git a/src/tvu.rs b/src/tvu.rs index 244ecc09bb..950ccca04e 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -41,6 +41,7 @@ use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; use hash::Hash; use leader_scheduler::LeaderScheduler; +use ledger_write_stage::LedgerWriteStage; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use retransmit_stage::RetransmitStage; use service::Service; @@ -60,6 +61,7 @@ pub struct Tvu { replicate_stage: ReplicateStage, fetch_stage: BlobFetchStage, retransmit_stage: RetransmitStage, + ledger_write_stage: LedgerWriteStage, exit: Arc, } @@ -111,22 +113,24 @@ impl Tvu { leader_scheduler.clone(), ); - let replicate_stage = ReplicateStage::new( + let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new( keypair, bank.clone(), cluster_info, blob_window_receiver, - ledger_path, exit.clone(), tick_height, entry_height, leader_scheduler, ); + let ledger_write_stage = LedgerWriteStage::new(ledger_path, ledger_entry_receiver, None); + Tvu { replicate_stage, fetch_stage, retransmit_stage, + ledger_write_stage, exit, } } @@ -151,6 +155,7 @@ impl Service for Tvu { fn join(self) -> thread::Result> { self.retransmit_stage.join()?; self.fetch_stage.join()?; + self.ledger_write_stage.join()?; match self.replicate_stage.join()? { Some(ReplicateStageReturnType::LeaderRotation( tick_height, diff --git a/tests/multinode.rs b/tests/multinode.rs index 7d279747f2..02d61cdb66 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -129,7 +129,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize); let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - writer.write_entries(entries).unwrap(); + writer.write_entries(&entries).unwrap(); } let leader = Fullnode::new( @@ -802,7 +802,7 @@ fn test_leader_to_validator_transition() { let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); - ledger_writer.write_entries(bootstrap_entries).unwrap(); + ledger_writer.write_entries(&bootstrap_entries).unwrap(); // Start the leader node let bootstrap_height = leader_rotation_interval; @@ -932,7 +932,7 @@ fn test_leader_validator_basic() { let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); let active_set_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); - ledger_writer.write_entries(active_set_entries).unwrap(); + ledger_writer.write_entries(&active_set_entries).unwrap(); // Create the leader scheduler config let num_bootstrap_slots = 2; @@ -1100,7 +1100,7 @@ fn test_dropped_handoff_recovery() { // Write the entries let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - ledger_writer.write_entries(active_set_entries).unwrap(); + ledger_writer.write_entries(&active_set_entries).unwrap(); let next_leader_ledger_path = tmp_copy_ledger( &bootstrap_leader_ledger_path, @@ -1263,7 +1263,7 @@ fn test_full_leader_validator_network() { .last() .expect("expected at least one genesis entry") .id; - ledger_writer.write_entries(bootstrap_entries).unwrap(); + ledger_writer.write_entries(&bootstrap_entries).unwrap(); } // Create the common leader scheduling configuration