From fd7db7a954ab400593fc9e9b6bce9b05257576c7 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 7 Feb 2019 15:10:54 -0800 Subject: [PATCH] Support multiple forks in the ledger (#2277) * Modify db_ledger to support per_slot metadata, add signal for updates, and add chaining to slots in db_ledger * Modify replay stage to ask db_ledger for updates based on slots * Add repair send/receive metrics * Add repair service, remove old repair code * Fix tmp_copy_ledger and setup for tests to account for multiple slots and tick limits within slots --- Cargo.lock | 2 + src/broadcast_service.rs | 31 +- src/cluster_info.rs | 110 ++- src/db_ledger.rs | 1457 ++++++++++++++++++++++++++++---------- src/db_window.rs | 216 +++++- src/erasure.rs | 4 +- src/fullnode.rs | 135 +++- src/lib.rs | 1 + src/packet.rs | 7 +- src/repair_service.rs | 372 ++++++++++ src/replay_stage.rs | 109 +-- src/replicator.rs | 12 +- src/retransmit_stage.rs | 17 +- src/rpc_mock.rs | 2 +- src/tpu.rs | 8 +- src/tvu.rs | 19 +- src/window.rs | 98 --- src/window_service.rs | 237 +++---- tests/multinode.rs | 42 +- tests/replicator.rs | 6 +- 20 files changed, 2066 insertions(+), 819 deletions(-) create mode 100644 src/repair_service.rs diff --git a/Cargo.lock b/Cargo.lock index d11d2ca5f1..ea97e37f3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,3 +1,5 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. [[package]] name = "MacTypes-sys" version = "1.3.0" diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index f7d8aa4128..0368eb1b33 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -82,7 +82,7 @@ impl Broadcast { .collect(); // TODO: blob_index should be slot-relative... - index_blobs(&blobs, &self.id, self.blob_index, &slots); + index_blobs(&blobs, &self.id, &mut self.blob_index, &slots); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); @@ -92,9 +92,6 @@ impl Broadcast { blob_sender.send(blobs.clone())?; - // don't count coding blobs in the blob indexes - self.blob_index += blobs.len() as u64; - // Send out data ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?; @@ -180,11 +177,12 @@ pub struct BroadcastService { } impl BroadcastService { + #[allow(clippy::too_many_arguments)] fn run( bank: &Arc, sock: &UdpSocket, cluster_info: &Arc>, - entry_height: u64, + blob_index: u64, leader_scheduler: &Arc>, receiver: &Receiver>, max_tick_height: u64, @@ -196,7 +194,7 @@ impl BroadcastService { let mut broadcast = Broadcast { id: me.id, max_tick_height, - blob_index: entry_height, + blob_index, #[cfg(feature = "erasure")] coding_generator: CodingGenerator::new(), }; @@ -246,11 +244,12 @@ impl BroadcastService { /// WriteStage is the last stage in the pipeline), which will then close Broadcast service, /// which will then close FetchStage in the Tpu, and then the rest of the Tpu, /// completing the cycle. + #[allow(clippy::too_many_arguments)] pub fn new( bank: Arc, sock: UdpSocket, cluster_info: Arc>, - entry_height: u64, + blob_index: u64, leader_scheduler: Arc>, receiver: Receiver>, max_tick_height: u64, @@ -267,7 +266,7 @@ impl BroadcastService { &bank, &sock, &cluster_info, - entry_height, + blob_index, &leader_scheduler, &receiver, max_tick_height, @@ -315,7 +314,7 @@ mod test { ledger_path: &str, leader_scheduler: Arc>, entry_receiver: Receiver>, - entry_height: u64, + blob_index: u64, max_tick_height: u64, ) -> MockBroadcastService { // Make the database ledger @@ -343,7 +342,7 @@ mod test { bank.clone(), leader_info.sockets.broadcast, cluster_info, - entry_height, + blob_index, leader_scheduler, entry_receiver, max_tick_height, @@ -361,7 +360,7 @@ mod test { #[ignore] //TODO this test won't work since broadcast stage no longer edits the ledger fn test_broadcast_ledger() { - let ledger_path = get_tmp_ledger_path("test_broadcast"); + let ledger_path = get_tmp_ledger_path("test_broadcast_ledger"); { // Create the leader scheduler let leader_keypair = Keypair::new(); @@ -370,8 +369,7 @@ mod test { // Mock the tick height to look like the tick height right after a leader transition leader_scheduler.set_leader_schedule(vec![leader_keypair.pubkey()]); let start_tick_height = 0; - let max_tick_height = start_tick_height + leader_scheduler.ticks_per_epoch; - let entry_height = 2 * start_tick_height; + let max_tick_height = start_tick_height + leader_scheduler.ticks_per_slot; let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); let (entry_sender, entry_receiver) = channel(); @@ -380,7 +378,7 @@ mod test { &ledger_path, leader_scheduler.clone(), entry_receiver, - entry_height, + 0, max_tick_height, ); @@ -395,13 +393,16 @@ mod test { sleep(Duration::from_millis(2000)); let db_ledger = broadcast_service.db_ledger; + let mut blob_index = 0; for i in 0..max_tick_height - start_tick_height { let slot = leader_scheduler .read() .unwrap() .tick_height_to_slot(start_tick_height + i + 1); - let result = db_ledger.get_data_blob(slot, entry_height + i).unwrap(); + let result = db_ledger.get_data_blob(slot, blob_index).unwrap(); + + blob_index += 1; assert!(result.is_some()); } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 5b030d43d5..1f8ad6fc61 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -145,7 +145,7 @@ enum Protocol { /// Window protocol messages /// TODO: move this message to a different module - RequestWindowIndex(NodeInfo, u64), + RequestWindowIndex(NodeInfo, u64, u64), } impl ClusterInfo { @@ -692,13 +692,17 @@ impl ClusterInfo { orders } - pub fn window_index_request_bytes(&self, ix: u64) -> Result> { - let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix); + pub fn window_index_request_bytes(&self, slot_height: u64, blob_index: u64) -> Result> { + let req = Protocol::RequestWindowIndex(self.my_data().clone(), slot_height, blob_index); let out = serialize(&req)?; Ok(out) } - pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { + pub fn window_index_request( + &self, + slot_height: u64, + blob_index: u64, + ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication, as indicated // by a valid tvu port location let valid: Vec<_> = self.repair_peers(); @@ -707,11 +711,11 @@ impl ClusterInfo { } let n = thread_rng().gen::() % valid.len(); let addr = valid[n].gossip; // send the request to the peer's gossip port - let out = self.window_index_request_bytes(ix)?; + let out = self.window_index_request_bytes(slot_height, blob_index)?; submit( influxdb::Point::new("cluster-info") - .add_field("repair-ix", influxdb::Value::Integer(ix as i64)) + .add_field("repair-ix", influxdb::Value::Integer(blob_index as i64)) .to_owned(), ); @@ -835,34 +839,38 @@ impl ClusterInfo { }) .unwrap() } + + // TODO: To support repairing multiple slots, broadcast needs to reset + // blob index for every slot, and window requests should be by slot + index. + // Issue: https://github.com/solana-labs/solana/issues/2440 fn run_window_request( from: &NodeInfo, from_addr: &SocketAddr, db_ledger: Option<&Arc>, me: &NodeInfo, - ix: u64, + slot_height: u64, + blob_index: u64, ) -> Vec { if let Some(db_ledger) = db_ledger { - let meta = db_ledger.meta(); + // Try to find the requested index in one of the slots + let blob = db_ledger.get_data_blob(slot_height, blob_index); - if let Ok(Some(meta)) = meta { - let max_slot = meta.received_slot; - // Try to find the requested index in one of the slots - for i in 0..=max_slot { - let blob = db_ledger.get_data_blob(i, ix); + if let Ok(Some(mut blob)) = blob { + inc_new_counter_info!("cluster_info-window-request-ledger", 1); + blob.meta.set_addr(from_addr); - if let Ok(Some(mut blob)) = blob { - inc_new_counter_info!("cluster_info-window-request-ledger", 1); - blob.meta.set_addr(from_addr); - - return vec![Arc::new(RwLock::new(blob))]; - } - } + return vec![Arc::new(RwLock::new(blob))]; } } inc_new_counter_info!("cluster_info-window-request-fail", 1); - trace!("{}: failed RequestWindowIndex {} {}", me.id, from.id, ix,); + trace!( + "{}: failed RequestWindowIndex {} {} {}", + me.id, + from.id, + slot_height, + blob_index, + ); vec![] } @@ -987,7 +995,8 @@ impl ClusterInfo { me: &Arc>, from: &ContactInfo, db_ledger: Option<&Arc>, - ix: u64, + slot_height: u64, + blob_index: u64, from_addr: &SocketAddr, ) -> Vec { let now = Instant::now(); @@ -999,8 +1008,8 @@ impl ClusterInfo { let self_id = me.read().unwrap().gossip.id; if from.id == me.read().unwrap().gossip.id { warn!( - "{}: Ignored received RequestWindowIndex from ME {} {} ", - self_id, from.id, ix, + "{}: Ignored received RequestWindowIndex from ME {} {} {} ", + self_id, from.id, slot_height, blob_index, ); inc_new_counter_info!("cluster_info-window-request-address-eq", 1); return vec![]; @@ -1010,16 +1019,24 @@ impl ClusterInfo { let my_info = me.read().unwrap().my_data().clone(); inc_new_counter_info!("cluster_info-window-request-recv", 1); trace!( - "{}: received RequestWindowIndex from: {} index: {} ", + "{}: received RequestWindowIndex from: {} slot_height: {}, blob_index: {}", self_id, from.id, - ix, + slot_height, + blob_index, + ); + let res = Self::run_window_request( + &from, + &from_addr, + db_ledger, + &my_info, + slot_height, + blob_index, ); - let res = Self::run_window_request(&from, &from_addr, db_ledger, &my_info, ix); report_time_spent( "RequestWindowIndex", &now.elapsed(), - &format!(" ix: {}", ix), + &format!("slot_height {}, blob_index: {}", slot_height, blob_index), ); res } @@ -1081,8 +1098,15 @@ impl ClusterInfo { } vec![] } - Protocol::RequestWindowIndex(from, ix) => { - Self::handle_request_window_index(me, &from, db_ledger, ix, from_addr) + Protocol::RequestWindowIndex(from, slot_height, blob_index) => { + Self::handle_request_window_index( + me, + &from, + db_ledger, + slot_height, + blob_index, + from_addr, + ) } } } @@ -1330,7 +1354,7 @@ mod tests { fn window_index_request() { let me = NodeInfo::new_localhost(Keypair::new().pubkey(), timestamp()); let mut cluster_info = ClusterInfo::new(me); - let rv = cluster_info.window_index_request(0); + let rv = cluster_info.window_index_request(0, 0); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); let gossip_addr = socketaddr!([127, 0, 0, 1], 1234); @@ -1345,7 +1369,7 @@ mod tests { 0, ); cluster_info.insert_info(nxt.clone()); - let rv = cluster_info.window_index_request(0).unwrap(); + let rv = cluster_info.window_index_request(0, 0).unwrap(); assert_eq!(nxt.gossip, gossip_addr); assert_eq!(rv.0, nxt.gossip); @@ -1365,7 +1389,7 @@ mod tests { let mut two = false; while !one || !two { //this randomly picks an option, so eventually it should pick both - let rv = cluster_info.window_index_request(0).unwrap(); + let rv = cluster_info.window_index_request(0, 0).unwrap(); if rv.0 == gossip_addr { one = true; } @@ -1393,8 +1417,14 @@ mod tests { socketaddr!("127.0.0.1:1239"), 0, ); - let rv = - ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 0); + let rv = ClusterInfo::run_window_request( + &me, + &socketaddr_any!(), + Some(&db_ledger), + &me, + 0, + 0, + ); assert!(rv.is_empty()); let data_size = 1; let blob = SharedBlob::default(); @@ -1410,8 +1440,14 @@ mod tests { .write_shared_blobs(vec![&blob]) .expect("Expect successful ledger write"); - let rv = - ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 1); + let rv = ClusterInfo::run_window_request( + &me, + &socketaddr_any!(), + Some(&db_ledger), + &me, + 2, + 1, + ); assert!(!rv.is_empty()); let v = rv[0].clone(); assert_eq!(v.read().unwrap().index(), 1); diff --git a/src/db_ledger.rs b/src/db_ledger.rs index e3a3c7f880..89f4724d49 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -4,21 +4,28 @@ use crate::entry::Entry; use crate::genesis_block::GenesisBlock; +use crate::leader_scheduler::DEFAULT_TICKS_PER_SLOT; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, Options, WriteBatch, DB}; +use hashbrown::HashMap; +use rocksdb::{ + ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, IteratorMode, Options, WriteBatch, DB, +}; use serde::de::DeserializeOwned; use serde::Serialize; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::borrow::Borrow; +use std::borrow::{Borrow, Cow}; +use std::cell::RefCell; use std::cmp; use std::fs; use std::io; +use std::iter::once; use std::path::Path; +use std::rc::Rc; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Arc; @@ -115,27 +122,59 @@ pub trait LedgerColumnFamilyRaw { fn db(&self) -> &Arc; } -#[derive(Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] // The Meta column family pub struct SlotMeta { + pub slot_height: u64, // The total number of consecutive blob starting from index 0 // we have received for this slot. pub consumed: u64, // The entry height of the highest blob received for this slot. pub received: u64, - // The slot the blob with index == "consumed" is in - pub consumed_slot: u64, - // The slot the blob with index == "received" is in - pub received_slot: u64, + // The number of ticks in the range [0..consumed]. Used to detect when + // a slot contains all expected ticks, so that components like repair/ReplayStage + // know to look at the next slot. + pub consumed_ticks: u64, + // The number of blocks in this slot + pub num_blocks: u64, + // The list of slots that chains to this slot + pub next_slots: Vec, + // True if every block from 0..slot, where slot is the slot index of this slot + // is full + pub is_trunk: bool, } impl SlotMeta { - fn new() -> Self { + pub fn contains_all_ticks(&self, db_ledger: &DbLedger) -> bool { + if self.num_blocks == 0 { + // A placeholder slot does not contain all the ticks + false + } else { + let num_expected_ticks = { + let num = self.num_expected_ticks(db_ledger); + if self.slot_height == 0 { + num - 1 + } else { + num + } + }; + num_expected_ticks <= self.consumed_ticks + } + } + + pub fn num_expected_ticks(&self, db_ledger: &DbLedger) -> u64 { + db_ledger.ticks_per_slot * self.num_blocks + } + + fn new(slot_height: u64, num_blocks: u64) -> Self { SlotMeta { + slot_height, consumed: 0, received: 0, - consumed_slot: 0, - received_slot: 0, + consumed_ticks: 0, + num_blocks, + next_slots: vec![], + is_trunk: slot_height == 0, } } } @@ -154,6 +193,22 @@ impl MetaCf { BigEndian::write_u64(&mut key[0..8], slot_height); key } + + pub fn get_slot_meta(&self, slot_height: u64) -> Result> { + let key = Self::key(slot_height); + self.get(&key) + } + + pub fn put_slot_meta(&self, slot_height: u64, slot_meta: &SlotMeta) -> Result<()> { + let key = Self::key(slot_height); + self.put(&key, slot_meta) + } + + pub fn index_from_key(key: &[u8]) -> Result { + let mut rdr = io::Cursor::new(&key[..]); + let index = rdr.read_u64::()?; + Ok(index) + } } impl LedgerColumnFamily for MetaCf { @@ -275,6 +330,23 @@ impl LedgerColumnFamilyRaw for ErasureCf { } } +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub struct DbLedgerConfig { + pub ticks_per_slot: u64, +} + +impl DbLedgerConfig { + pub fn new(ticks_per_slot: u64) -> Self { + DbLedgerConfig { ticks_per_slot } + } +} + +impl Default for DbLedgerConfig { + fn default() -> Self { + Self::new(DEFAULT_TICKS_PER_SLOT) + } +} + // ledger window pub struct DbLedger { // Underlying database is automatically closed in the Drop implementation of DB @@ -283,6 +355,7 @@ pub struct DbLedger { data_cf: DataCf, erasure_cf: ErasureCf, new_blobs_signals: Vec>, + ticks_per_slot: u64, } // TODO: Once we support a window that knows about different leader @@ -327,12 +400,16 @@ impl DbLedger { // Create the erasure column family let erasure_cf = ErasureCf::new(db.clone()); + // TODO: make these constructor arguments + // Issue: https://github.com/solana-labs/solana/issues/2458 + let ticks_per_slot = DEFAULT_TICKS_PER_SLOT; Ok(DbLedger { db, meta_cf, data_cf, erasure_cf, new_blobs_signals: vec![], + ticks_per_slot, }) } @@ -344,22 +421,26 @@ impl DbLedger { Ok((db_ledger, signal_sender, signal_receiver)) } - /// Returns the entry vector for the slot starting with `blob_start_index` - pub fn get_slot_entries( - &self, - slot_index: u64, - blob_start_index: u64, - max_entries: Option, - ) -> Result> { - trace!("get_slot_entries {} {}", slot_index, blob_start_index); - // Find the next consecutive block of blobs. - let consecutive_blobs = - self.get_slot_consecutive_blobs(slot_index, blob_start_index, max_entries)?; - Ok(Self::deserialize_blobs(&consecutive_blobs)) + pub fn open_config(ledger_path: &str, config: DbLedgerConfig) -> Result { + let mut db_ledger = Self::open(ledger_path)?; + db_ledger.ticks_per_slot = config.ticks_per_slot; + Ok(db_ledger) } - pub fn meta(&self) -> Result> { - self.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) + pub fn open_with_config_signal( + ledger_path: &str, + config: DbLedgerConfig, + ) -> Result<(Self, SyncSender, Receiver)> { + let mut db_ledger = Self::open(ledger_path)?; + let (signal_sender, signal_receiver) = sync_channel(1); + db_ledger.new_blobs_signals = vec![signal_sender.clone()]; + db_ledger.ticks_per_slot = config.ticks_per_slot; + + Ok((db_ledger, signal_sender, signal_receiver)) + } + + pub fn meta(&self, slot_height: u64) -> Result> { + self.meta_cf.get(&MetaCf::key(slot_height)) } pub fn destroy(ledger_path: &str) -> Result<()> { @@ -370,6 +451,17 @@ impl DbLedger { Ok(()) } + pub fn get_next_slot(&self, slot_height: u64) -> Result> { + let mut db_iterator = self.db.raw_iterator_cf(self.meta_cf.handle())?; + db_iterator.seek(&MetaCf::key(slot_height + 1)); + if !db_iterator.valid() { + Ok(None) + } else { + let key = &db_iterator.key().expect("Expected valid key"); + Ok(Some(MetaCf::index_from_key(&key)?)) + } + } + pub fn write_shared_blobs(&self, shared_blobs: I) -> Result> where I: IntoIterator, @@ -388,14 +480,14 @@ impl DbLedger { Ok(new_entries) } - pub fn write_blobs<'a, I>(&self, blobs: I) -> Result> + pub fn write_blobs(&self, blobs: I) -> Result> where I: IntoIterator, - I::Item: Borrow<&'a Blob>, + I::Item: Borrow, { - let blobs = blobs.into_iter().map(|b| *b.borrow()); - let new_entries = self.insert_data_blobs(blobs)?; - Ok(new_entries) + //let blobs = blobs.into_iter().map(|b| *b.borrow()); + let entries = self.insert_data_blobs(blobs)?; + Ok(entries) } pub fn write_entries(&self, slot: u64, index: u64, entries: I) -> Result> @@ -417,205 +509,101 @@ impl DbLedger { self.write_blobs(&blobs) } - /// Returns the next consumed index and the number of ticks in the new consumed - /// range - fn get_slot_consecutive_blobs( - &self, - slot_index: u64, - mut current_index: u64, - max_blobs: Option, - ) -> Result>> { - let mut blobs: Vec> = vec![]; - loop { - if Some(blobs.len() as u64) == max_blobs { - break; - } - // Try to find the next blob we're looking for in the prev_inserted_blob_datas - if let Some(blob_data) = self.data_cf.get_by_slot_index(slot_index, current_index)? { - // Try to find the next blob we're looking for in the database - blobs.push(blob_data); - } else { - break; - } - - current_index += 1; - } - - Ok(blobs) - } - pub fn insert_data_blobs(&self, new_blobs: I) -> Result> where I: IntoIterator, I::Item: Borrow, { - let mut new_blobs: Vec<_> = new_blobs.into_iter().collect(); + let mut write_batch = WriteBatch::default(); + // A map from slot_height to a 2-tuple of metadata: (working copy, backup copy), + // so we can detect changes to the slot metadata later + let mut slot_meta_working_set = HashMap::new(); + let new_blobs: Vec<_> = new_blobs.into_iter().collect(); + let mut prev_inserted_blob_datas = HashMap::new(); - if new_blobs.is_empty() { - return Ok(vec![]); - } - - new_blobs.sort_unstable_by(|b1, b2| b1.borrow().index().cmp(&b2.borrow().index())); - - let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); - - let mut should_write_meta = false; - - let mut meta = { - if let Some(meta) = self.db.get_cf(self.meta_cf.handle(), &meta_key)? { - deserialize(&meta)? - } else { - should_write_meta = true; - SlotMeta::new() - } - }; - - // TODO: Handle if leader sends different blob for same index when the index > consumed - // The old window implementation would just replace that index. - let lowest_index = new_blobs[0].borrow().index(); - let lowest_slot = new_blobs[0].borrow().slot(); - let highest_index = new_blobs.last().unwrap().borrow().index(); - let highest_slot = new_blobs.last().unwrap().borrow().slot(); - if lowest_index < meta.consumed { - return Err(Error::DbLedgerError(DbLedgerError::BlobForIndexExists)); - } - - // Index is zero-indexed, while the "received" height starts from 1, - // so received = index + 1 for the same blob. - if highest_index >= meta.received { - meta.received = highest_index + 1; - meta.received_slot = highest_slot; - should_write_meta = true; - } - - let mut consumed_queue = vec![]; - - if meta.consumed == lowest_index { - // Find the next consecutive block of blobs. - // TODO: account for consecutive blocks that - // span multiple slots - should_write_meta = true; - let mut index_into_blob = 0; - let mut current_index = lowest_index; - let mut current_slot = lowest_slot; - 'outer: loop { - let entry: Entry = { - // Try to find the next blob we're looking for in the new_blobs - // vector - let mut found_blob = None; - while index_into_blob < new_blobs.len() { - let new_blob = new_blobs[index_into_blob].borrow(); - let index = new_blob.index(); - - // Skip over duplicate blobs with the same index and continue - // until we either find the index we're looking for, or detect - // that the index doesn't exist in the new_blobs vector. - if index > current_index { - break; - } - - index_into_blob += 1; - - if index == current_index { - found_blob = Some(new_blob); - } - } - - // If we found the blob in the new_blobs vector, process it, otherwise, - // look for the blob in the database. - if let Some(next_blob) = found_blob { - current_slot = next_blob.slot(); - let serialized_entry_data = - &next_blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + next_blob.size()]; - // Verify entries can actually be reconstructed - deserialize(serialized_entry_data).expect( - "Blob made it past validation, so must be deserializable at this point", - ) - } else { - let key = DataCf::key(current_slot, current_index); - let blob_data = { - if let Some(blob_data) = self.data_cf.get(&key)? { - blob_data - } else if meta.consumed < meta.received { - let key = DataCf::key(current_slot + 1, current_index); - if let Some(blob_data) = self.data_cf.get(&key)? { - current_slot += 1; - blob_data - } else { - break 'outer; - } - } else { - break 'outer; - } - }; - deserialize(&blob_data[BLOB_HEADER_SIZE..]) - .expect("Blobs in database must be deserializable") - } - }; - - consumed_queue.push(entry); - current_index += 1; - meta.consumed += 1; - meta.consumed_slot = current_slot; - } - } - - // Commit Step: Atomic write both the metadata and the data - let mut batch = WriteBatch::default(); - if should_write_meta { - batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?; - } - - for blob in new_blobs { + let mut consecutive_entries = vec![]; + for blob in new_blobs.iter() { let blob = blob.borrow(); - let key = DataCf::key(blob.slot(), blob.index()); - let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; - batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; + let blob_slot = blob.slot(); + + // Check if we've already inserted the slot metadata for this blob's slot + let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| { + // Store a 2-tuple of the metadata (working copy, backup copy) + if let Some(mut meta) = self + .meta_cf + .get_slot_meta(blob_slot) + .expect("Expect database get to succeed") + { + // If num_blocks == 0, then this is one of the dummy metadatas inserted + // during the chaining process, see the function find_slot_meta_in_cached_state() + // for details + if meta.num_blocks == 0 { + // TODO: derive num_blocks for this metadata from the blob itself + // Issue: https://github.com/solana-labs/solana/issues/2459. + meta.num_blocks = 1; + // Set backup as None so that all the logic for inserting new slots + // still runs, as this placeholder slot is essentially equivalent to + // inserting a new slot + (Rc::new(RefCell::new(meta.clone())), None) + } else { + (Rc::new(RefCell::new(meta.clone())), Some(meta)) + } + } else { + // TODO: derive num_blocks for this metadata from the blob itself + // Issue: https://github.com/solana-labs/solana/issues/2459 + (Rc::new(RefCell::new(SlotMeta::new(blob_slot, 1))), None) + } + }); + + let slot_meta = &mut entry.0.borrow_mut(); + + // This slot is full, skip the bogus blob + if slot_meta.contains_all_ticks(&self) { + continue; + } + + let entries = self.insert_data_blob( + blob, + &mut prev_inserted_blob_datas, + slot_meta, + &mut write_batch, + ); + if let Ok(entries) = entries { + consecutive_entries.extend(entries); + } } - self.db.write(batch)?; - if !consumed_queue.is_empty() { + // Handle chaining for the working set + self.handle_chaining(&mut write_batch, &slot_meta_working_set)?; + let mut should_signal = false; + + // Check if any metadata was changed, if so, insert the new version of the + // metadata into the write batch + for (slot_height, (meta_copy, meta_backup)) in slot_meta_working_set.iter() { + let meta: &SlotMeta = &RefCell::borrow(&*meta_copy); + // Check if the working copy of the metadata has changed + if Some(meta) != meta_backup.as_ref() { + should_signal = should_signal || Self::slot_has_updates(meta, &meta_backup); + write_batch.put_cf( + self.meta_cf.handle(), + &MetaCf::key(*slot_height), + &serialize(&meta)?, + )?; + } + } + + self.db.write(write_batch)?; + if should_signal { for signal in self.new_blobs_signals.iter() { let _ = signal.try_send(true); } } - Ok(consumed_queue) - } - // Writes a list of sorted, consecutive broadcast blobs to the db_ledger - pub fn write_consecutive_blobs(&self, blobs: &[SharedBlob]) -> Result<()> { - assert!(!blobs.is_empty()); - - let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); - - let mut meta = { - if let Some(meta) = self.meta_cf.get(&meta_key)? { - let first = blobs[0].read().unwrap(); - assert_eq!(meta.consumed, first.index()); - meta - } else { - SlotMeta::new() - } - }; - - { - let last = blobs.last().unwrap().read().unwrap(); - meta.consumed = last.index() + 1; - meta.consumed_slot = last.slot(); - meta.received = cmp::max(meta.received, last.index() + 1); - meta.received_slot = cmp::max(meta.received_slot, last.index()); - } - - let mut batch = WriteBatch::default(); - batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?; - for blob in blobs { - let blob = blob.read().unwrap(); - let key = DataCf::key(blob.slot(), blob.index()); - let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; - batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; - } - self.db.write(batch)?; - Ok(()) + // TODO: Delete returning these entries and instead have replay_stage query db_ledger + // for updates. Returning these entries is to temporarily support current API as to + // not break functionality in db_window. + // Issue: https://github.com/solana-labs/solana/issues/2444 + Ok(consecutive_entries) } // Fill 'buf' with num_blobs or most number of consecutive @@ -692,6 +680,13 @@ impl DbLedger { }) } + pub fn read_ledger_blobs(&self) -> impl Iterator { + self.db + .iterator_cf(self.data_cf.handle(), IteratorMode::Start) + .unwrap() + .map(|(_, blob_data)| Blob::new(&blob_data)) + } + pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.erasure_cf.get_by_slot_index(slot, index) } @@ -705,16 +700,20 @@ impl DbLedger { self.erasure_cf.put_by_slot_index(slot, index, bytes) } + pub fn put_data_raw(&self, key: &[u8], value: &[u8]) -> Result<()> { + self.data_cf.put(key, value) + } + pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { self.data_cf.put_by_slot_index(slot, index, bytes) } - pub fn get_data_blob(&self, slot: u64, index: u64) -> Result> { - let bytes = self.get_data_blob_bytes(slot, index)?; + pub fn get_data_blob(&self, slot_height: u64, blob_index: u64) -> Result> { + let bytes = self.get_data_blob_bytes(slot_height, blob_index)?; Ok(bytes.map(|bytes| { let blob = Blob::new(&bytes); - assert!(blob.slot() == slot); - assert!(blob.index() == index); + assert!(blob.slot() == slot_height); + assert!(blob.index() == blob_index); blob })) } @@ -730,12 +729,14 @@ impl DbLedger { // Given a start and end entry index, find all the missing // indexes in the ledger in the range [start_index, end_index) + // for the slot with slot_height == slot fn find_missing_indexes( db_iterator: &mut DbLedgerRawIterator, slot: u64, start_index: u64, end_index: u64, key: &dyn Fn(u64, u64) -> Vec, + slot_height_from_key: &dyn Fn(&[u8]) -> Result, index_from_key: &dyn Fn(&[u8]) -> Result, max_missing: usize, ) -> Vec { @@ -761,15 +762,29 @@ impl DbLedger { break; } let current_key = db_iterator.key().expect("Expect a valid key"); - let current_index = index_from_key(¤t_key) - .expect("Expect to be able to parse index from valid key"); + let current_slot = slot_height_from_key(¤t_key) + .expect("Expect to be able to parse slot from valid key"); + let current_index = { + if current_slot > slot { + end_index + } else { + index_from_key(¤t_key) + .expect("Expect to be able to parse index from valid key") + } + }; let upper_index = cmp::min(current_index, end_index); + for i in prev_index..upper_index { missing_indexes.push(i); if missing_indexes.len() == max_missing { break 'outer; } } + + if current_slot > slot { + break; + } + if current_index >= end_index { break; } @@ -796,6 +811,7 @@ impl DbLedger { start_index, end_index, &DataCf::key, + &DataCf::slot_height_from_key, &DataCf::index_from_key, max_missing, ) @@ -816,10 +832,46 @@ impl DbLedger { start_index, end_index, &ErasureCf::key, + &ErasureCf::slot_height_from_key, &ErasureCf::index_from_key, max_missing, ) } + /// Returns the entry vector for the slot starting with `blob_start_index` + pub fn get_slot_entries( + &self, + slot_height: u64, + blob_start_index: u64, + max_entries: Option, + ) -> Result> { + // Find the next consecutive block of blobs. + let consecutive_blobs = self.get_slot_consecutive_blobs( + slot_height, + &HashMap::new(), + blob_start_index, + max_entries, + )?; + Ok(Self::deserialize_blobs(&consecutive_blobs)) + } + + // Returns slots connecting to any element of the list `slot_heights`. + pub fn get_slots_since(&self, slot_heights: &[u64]) -> Result> { + // Return error if there was a database error during lookup of any of the + // slot indexes + let slots: Result>> = slot_heights + .into_iter() + .map(|slot_height| self.meta_cf.get_slot_meta(*slot_height)) + .collect(); + + let slots = slots?; + let slots: Vec<_> = slots + .into_iter() + .filter_map(|x| x) + .flat_map(|x| x.next_slots) + .collect(); + + Ok(slots) + } fn deserialize_blobs(blob_datas: &[I]) -> Vec where @@ -856,6 +908,332 @@ impl DbLedger { options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); options } + + fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option) -> bool { + // We should signal that there are updates if we extended the chain of consecutive blocks starting + // from block 0, which is true iff: + // 1) The block with index prev_block_index is itself part of the trunk of consecutive blocks + // starting from block 0, + slot_meta.is_trunk && + // AND either: + // 1) The slot didn't exist in the database before, and now we have a consecutive + // block for that slot + ((slot_meta_backup.is_none() && slot_meta.consumed != 0) || + // OR + // 2) The slot did exist, but now we have a new consecutive block for that slot + (slot_meta_backup.is_some() && slot_meta_backup.as_ref().unwrap().consumed != slot_meta.consumed)) + } + + // Chaining based on latest discussion here: https://github.com/solana-labs/solana/pull/2253 + fn handle_chaining( + &self, + write_batch: &mut WriteBatch, + working_set: &HashMap>, Option)>, + ) -> Result<()> { + let mut new_chained_slots = HashMap::new(); + let working_set_slot_heights: Vec<_> = working_set.iter().map(|s| *s.0).collect(); + for slot_height in working_set_slot_heights { + self.handle_chaining_for_slot(working_set, &mut new_chained_slots, slot_height)?; + } + + // Write all the newly changed slots in new_chained_slots to the write_batch + for (slot_height, meta_copy) in new_chained_slots.iter() { + let meta: &SlotMeta = &RefCell::borrow(&*meta_copy); + write_batch.put_cf( + self.meta_cf.handle(), + &MetaCf::key(*slot_height), + &serialize(meta)?, + )?; + } + Ok(()) + } + + fn handle_chaining_for_slot( + &self, + working_set: &HashMap>, Option)>, + new_chained_slots: &mut HashMap>>, + slot_height: u64, + ) -> Result<()> { + let (meta_copy, meta_backup) = working_set + .get(&slot_height) + .expect("Slot must exist in the working_set hashmap"); + { + let mut slot_meta = meta_copy.borrow_mut(); + assert!(slot_meta.num_blocks > 0); + + // If: + // 1) This is a new slot + // 2) slot_height != 0 + // then try to chain this slot to a previous slot + if slot_height != 0 { + let prev_slot_height = slot_height - slot_meta.num_blocks; + + // Check if slot_meta is a new slot + if meta_backup.is_none() { + let prev_slot = self.find_slot_meta_else_create( + working_set, + new_chained_slots, + prev_slot_height, + )?; + + // This is a newly inserted slot so: + // 1) Chain to the previous slot, and also + // 2) Determine whether to set the is_trunk flag + self.chain_new_slot_to_prev_slot( + &mut prev_slot.borrow_mut(), + slot_height, + &mut slot_meta, + ); + } + } + } + + if self.is_newly_completed_slot(&RefCell::borrow(&*meta_copy), meta_backup) + && RefCell::borrow(&*meta_copy).is_trunk + { + // This is a newly inserted slot and slot.is_trunk is true, so go through + // and update all child slots with is_trunk if applicable + let mut next_slots: Vec<(u64, Rc>)> = + vec![(slot_height, meta_copy.clone())]; + while !next_slots.is_empty() { + let (_, current_slot) = next_slots.pop().unwrap(); + current_slot.borrow_mut().is_trunk = true; + + let current_slot = &RefCell::borrow(&*current_slot); + if current_slot.contains_all_ticks(self) { + for next_slot_index in current_slot.next_slots.iter() { + let next_slot = self.find_slot_meta_else_create( + working_set, + new_chained_slots, + *next_slot_index, + )?; + next_slots.push((*next_slot_index, next_slot)); + } + } + } + } + + Ok(()) + } + + fn chain_new_slot_to_prev_slot( + &self, + prev_slot: &mut SlotMeta, + current_slot_height: u64, + current_slot: &mut SlotMeta, + ) { + prev_slot.next_slots.push(current_slot_height); + current_slot.is_trunk = prev_slot.is_trunk && prev_slot.contains_all_ticks(self); + } + + fn is_newly_completed_slot( + &self, + slot_meta: &SlotMeta, + backup_slot_meta: &Option, + ) -> bool { + slot_meta.contains_all_ticks(self) + && (backup_slot_meta.is_none() + || slot_meta.consumed_ticks != backup_slot_meta.as_ref().unwrap().consumed_ticks) + } + + // 1) Find the slot metadata in the cache of dirty slot metadata we've previously touched, + // else: + // 2) Search the database for that slot metadata. If still no luck, then + // 3) Create a dummy placeholder slot in the database + fn find_slot_meta_else_create<'a>( + &self, + working_set: &'a HashMap>, Option)>, + chained_slots: &'a mut HashMap>>, + slot_index: u64, + ) -> Result>> { + let result = self.find_slot_meta_in_cached_state(working_set, chained_slots, slot_index)?; + if let Some(slot) = result { + Ok(slot) + } else { + self.find_slot_meta_in_db_else_create(slot_index, chained_slots) + } + } + + // Search the database for that slot metadata. If still no luck, then + // create a dummy placeholder slot in the database + fn find_slot_meta_in_db_else_create<'a>( + &self, + slot_height: u64, + insert_map: &'a mut HashMap>>, + ) -> Result>> { + if let Some(slot) = self.meta_cf.get_slot_meta(slot_height)? { + insert_map.insert(slot_height, Rc::new(RefCell::new(slot))); + Ok(insert_map.get(&slot_height).unwrap().clone()) + } else { + // If this slot doesn't exist, make a dummy placeholder slot (denoted by passing + // 0 for the num_blocks argument to the SlotMeta constructor). This way we + // remember which slots chained to this one when we eventually get a real blob + // for this slot + insert_map.insert( + slot_height, + Rc::new(RefCell::new(SlotMeta::new(slot_height, 0))), + ); + Ok(insert_map.get(&slot_height).unwrap().clone()) + } + } + + // Find the slot metadata in the cache of dirty slot metadata we've previously touched + fn find_slot_meta_in_cached_state<'a>( + &self, + working_set: &'a HashMap>, Option)>, + chained_slots: &'a HashMap>>, + slot_height: u64, + ) -> Result>>> { + if let Some((entry, _)) = working_set.get(&slot_height) { + Ok(Some(entry.clone())) + } else if let Some(entry) = chained_slots.get(&slot_height) { + Ok(Some(entry.clone())) + } else { + Ok(None) + } + } + + /// Insert a blob into ledger, updating the slot_meta if necessary + fn insert_data_blob<'a>( + &self, + blob_to_insert: &'a Blob, + prev_inserted_blob_datas: &mut HashMap<(u64, u64), &'a [u8]>, + slot_meta: &mut SlotMeta, + write_batch: &mut WriteBatch, + ) -> Result> { + let blob_index = blob_to_insert.index(); + let blob_slot = blob_to_insert.slot(); + let blob_size = blob_to_insert.size(); + + if blob_index < slot_meta.consumed + || prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) + { + return Err(Error::DbLedgerError(DbLedgerError::BlobForIndexExists)); + } + + let (new_consumed, new_consumed_ticks, blob_datas) = { + if slot_meta.consumed == blob_index { + let blob_datas = self.get_slot_consecutive_blobs( + blob_slot, + prev_inserted_blob_datas, + // Don't start looking for consecutive blobs at blob_index, + // because we haven't inserted/committed the new blob_to_insert + // into the database or prev_inserted_blob_datas hashmap yet. + blob_index + 1, + None, + )?; + + let blob_to_insert = Cow::Borrowed(&blob_to_insert.data[..]); + let mut new_consumed_ticks = 0; + let mut entries = vec![]; + // Check all the consecutive blobs for ticks + for blob_data in once(&blob_to_insert).chain(blob_datas.iter()) { + let serialized_entry_data = &blob_data[BLOB_HEADER_SIZE..]; + let entry: Entry = deserialize(serialized_entry_data).expect( + "Blob made it past validation, so must be deserializable at this point", + ); + if entry.is_tick() { + new_consumed_ticks += 1; + } + entries.push(entry); + } + + ( + // Add one because we skipped this current blob when calling + // get_slot_consecutive_blobs() earlier + slot_meta.consumed + blob_datas.len() as u64 + 1, + new_consumed_ticks, + entries, + ) + } else { + (slot_meta.consumed, 0, vec![]) + } + }; + + let key = DataCf::key(blob_slot, blob_index); + let serialized_blob_data = &blob_to_insert.data[..BLOB_HEADER_SIZE + blob_size]; + + // Commit step: commit all changes to the mutable structures at once, or none at all. + // We don't want only some of these changes going through. + write_batch.put_cf(self.data_cf.handle(), &key, serialized_blob_data)?; + prev_inserted_blob_datas.insert((blob_slot, blob_index), serialized_blob_data); + // Index is zero-indexed, while the "received" height starts from 1, + // so received = index + 1 for the same blob. + slot_meta.received = cmp::max(blob_index + 1, slot_meta.received); + slot_meta.consumed = new_consumed; + slot_meta.consumed_ticks += new_consumed_ticks; + // TODO: Remove returning these entries and instead have replay_stage query db_ledger + // for updates. Returning these entries is to temporarily support current API as to + // not break functionality in db_window. + // Issue: https://github.com/solana-labs/solana/issues/2444 + Ok(blob_datas) + } + + /// Returns the next consumed index and the number of ticks in the new consumed + /// range + fn get_slot_consecutive_blobs<'a>( + &self, + slot_height: u64, + prev_inserted_blob_datas: &HashMap<(u64, u64), &'a [u8]>, + mut current_index: u64, + max_blobs: Option, + ) -> Result>> { + let mut blobs: Vec> = vec![]; + loop { + if Some(blobs.len() as u64) == max_blobs { + break; + } + // Try to find the next blob we're looking for in the prev_inserted_blob_datas + if let Some(prev_blob_data) = + prev_inserted_blob_datas.get(&(slot_height, current_index)) + { + blobs.push(Cow::Borrowed(*prev_blob_data)); + } else if let Some(blob_data) = + self.data_cf.get_by_slot_index(slot_height, current_index)? + { + // Try to find the next blob we're looking for in the database + blobs.push(Cow::Owned(blob_data)); + } else { + break; + } + + current_index += 1; + } + + Ok(blobs) + } + + // Handle special case of writing genesis blobs. For instance, the first two entries + // don't count as ticks, even if they're empty entries + fn write_genesis_blobs(&self, blobs: &[Blob]) -> Result<()> { + // TODO: change bootstrap height to number of slots + let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); + let mut bootstrap_meta = SlotMeta::new(0, 1); + let last = blobs.last().unwrap(); + let num_ending_ticks = blobs.iter().skip(2).fold(0, |tick_count, blob| { + let entry: Entry = deserialize(&blob.data[BLOB_HEADER_SIZE..]) + .expect("Blob has to be deseriablizable"); + tick_count + entry.is_tick() as u64 + }); + bootstrap_meta.consumed = last.index() + 1; + bootstrap_meta.received = last.index() + 1; + bootstrap_meta.is_trunk = true; + bootstrap_meta.consumed_ticks = num_ending_ticks; + + let mut batch = WriteBatch::default(); + batch.put_cf( + self.meta_cf.handle(), + &meta_key, + &serialize(&bootstrap_meta)?, + )?; + for blob in blobs { + let key = DataCf::key(blob.slot(), blob.index()); + let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; + batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; + } + self.db.write(batch)?; + Ok(()) + } } // TODO: all this goes away with Blocktree @@ -928,7 +1306,7 @@ where }) .collect(); - db_ledger.write_blobs(&blobs[..])?; + db_ledger.write_genesis_blobs(&blobs[..])?; Ok(()) } @@ -980,14 +1358,12 @@ pub fn tmp_copy_ledger(from: &str, name: &str) -> String { let path = get_tmp_ledger_path(name); let db_ledger = DbLedger::open(from).unwrap(); - let ledger_entries = db_ledger.read_ledger().unwrap(); + let blobs = db_ledger.read_ledger_blobs(); let genesis_block = GenesisBlock::load(from).unwrap(); DbLedger::destroy(&path).expect("Expected successful database destruction"); let db_ledger = DbLedger::open(&path).unwrap(); - db_ledger - .write_entries(DEFAULT_SLOT_HEIGHT, 0, ledger_entries) - .unwrap(); + db_ledger.write_blobs(blobs).unwrap(); genesis_block.write(&path).unwrap(); path @@ -996,9 +1372,12 @@ pub fn tmp_copy_ledger(from: &str, name: &str) -> String { #[cfg(test)] mod tests { use super::*; - use crate::entry::{make_tiny_test_entries, make_tiny_test_entries_from_id, EntrySlice}; + use crate::entry::{ + create_ticks, make_tiny_test_entries, make_tiny_test_entries_from_id, EntrySlice, + }; use crate::packet::index_blobs; use solana_sdk::hash::Hash; + use std::time::Duration; #[test] fn test_put_get_simple() { @@ -1006,7 +1385,7 @@ mod tests { let ledger = DbLedger::open(&ledger_path).unwrap(); // Test meta column family - let meta = SlotMeta::new(); + let meta = SlotMeta::new(DEFAULT_SLOT_HEIGHT, 1); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); ledger.meta_cf.put(&meta_key, &meta).unwrap(); let result = ledger @@ -1052,14 +1431,14 @@ mod tests { fn test_read_blobs_bytes() { let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = DEFAULT_SLOT_HEIGHT; - index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, &[slot; 10]); + index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, &[slot; 10]); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let ledger_path = get_tmp_ledger_path("test_read_blobs_bytes"); let ledger = DbLedger::open(&ledger_path).unwrap(); - ledger.write_blobs(&blobs).unwrap(); + ledger.write_blobs(blobs.clone()).unwrap(); let mut buf = [0; 1024]; let (num_blobs, bytes) = ledger.read_blobs_bytes(0, 1, &mut buf, slot).unwrap(); @@ -1117,6 +1496,7 @@ mod tests { fn test_insert_data_blobs_basic() { let entries = make_tiny_test_entries(2); let shared_blobs = entries.to_shared_blobs(); + let slot_height = 0; for (i, b) in shared_blobs.iter().enumerate() { b.write().unwrap().set_index(i as u64); @@ -1128,37 +1508,37 @@ mod tests { let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_basic"); let ledger = DbLedger::open(&ledger_path).unwrap(); - // Insert second blob, we're missing the first blob, so should return nothing - let result = ledger.insert_data_blobs(vec![blobs[1]]).unwrap(); - - assert!(result.len() == 0); + // Insert second blob, we're missing the first blob, so no consecutive + // blobs starting from slot 0, index 0 should exist. + ledger.insert_data_blobs(vec![blobs[1]]).unwrap(); assert!(ledger - .get_slot_entries(DEFAULT_SLOT_HEIGHT, 0, None) + .get_slot_entries(slot_height, 0, None) .unwrap() .is_empty()); let meta = ledger .meta_cf - .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .get(&MetaCf::key(slot_height)) .unwrap() .expect("Expected new metadata object to be created"); assert!(meta.consumed == 0 && meta.received == 2); // Insert first blob, check for consecutive returned entries - let result = ledger.insert_data_blobs(vec![blobs[0]]).unwrap(); - assert_eq!(result, entries); + ledger.insert_data_blobs(vec![blobs[0]]).unwrap(); + let result = ledger.get_slot_entries(slot_height, 0, None).unwrap(); - let result = ledger - .get_slot_entries(DEFAULT_SLOT_HEIGHT, 0, None) - .unwrap(); assert_eq!(result, entries); let meta = ledger .meta_cf - .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .get(&MetaCf::key(slot_height)) .unwrap() .expect("Expected new metadata object to exist"); - assert!(meta.consumed == 2 && meta.received == 2); + assert_eq!(meta.consumed, 2); + assert_eq!(meta.received, 2); + assert_eq!(meta.consumed_ticks, 0); + assert!(meta.next_slots.is_empty()); + assert!(meta.is_trunk); // Destroying database without closing it first is undefined behavior drop(ledger); @@ -1169,6 +1549,7 @@ mod tests { fn test_insert_data_blobs_multiple() { let num_blobs = 10; let entries = make_tiny_test_entries(num_blobs); + let slot_height = 0; let shared_blobs = entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { b.write().unwrap().set_index(i as u64); @@ -1181,16 +1562,14 @@ mod tests { // Insert blobs in reverse, check for consecutive returned blobs for i in (0..num_blobs).rev() { - let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); - let result_fetch = ledger - .get_slot_entries(DEFAULT_SLOT_HEIGHT, 0, None) - .unwrap(); + ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); + let result = ledger.get_slot_entries(slot_height, 0, None).unwrap(); + let meta = ledger .meta_cf - .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .get(&MetaCf::key(slot_height)) .unwrap() .expect("Expected metadata object to exist"); - assert_eq!(result, result_fetch); if i != 0 { assert_eq!(result.len(), 0); assert!(meta.consumed == 0 && meta.received == num_blobs as u64); @@ -1206,45 +1585,9 @@ mod tests { } #[test] - fn test_insert_data_blobs_slots() { - let num_blobs = 10; - let entries = make_tiny_test_entries(num_blobs); - let shared_blobs = entries.to_shared_blobs(); - for (i, b) in shared_blobs.iter().enumerate() { - b.write().unwrap().set_index(i as u64); - } - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - - let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_slots"); - let ledger = DbLedger::open(&ledger_path).unwrap(); - - // Insert last blob into next slot - let result = ledger - .insert_data_blobs(vec![*blobs.last().unwrap()]) - .unwrap(); - assert_eq!(result.len(), 0); - - // Insert blobs into first slot, check for consecutive blobs - for i in (0..num_blobs - 1).rev() { - let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); - let meta = ledger - .meta_cf - .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) - .unwrap() - .expect("Expected metadata object to exist"); - if i != 0 { - assert_eq!(result.len(), 0); - assert!(meta.consumed == 0 && meta.received == num_blobs as u64); - } else { - assert_eq!(result, entries); - assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64); - } - } - - // Destroying database without closing it first is undefined behavior - drop(ledger); - DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + fn test_insert_slots() { + test_insert_data_blobs_slots("test_insert_data_blobs_slots_single", false); + test_insert_data_blobs_slots("test_insert_data_blobs_slots_bulk", true); } #[test] @@ -1265,12 +1608,10 @@ mod tests { w_b.set_slot(DEFAULT_SLOT_HEIGHT); } - assert_eq!( - db_ledger - .write_shared_blobs(&shared_blobs) - .expect("Expected successful write of blobs"), - vec![] - ); + db_ledger + .write_shared_blobs(&shared_blobs) + .expect("Expected successful write of blobs"); + let mut db_iterator = db_ledger .db .raw_iterator_cf(db_ledger.data_cf.handle()) @@ -1356,42 +1697,50 @@ mod tests { } #[test] - pub fn test_insert_data_blobs_bulk() { - let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk"); + pub fn test_insert_data_blobs_consecutive() { + let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_consecutive"); { let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + let slot = 0; // Write entries let num_entries = 20 as u64; - let original_entries = make_tiny_test_entries(num_entries as usize); + let original_entries = create_ticks(num_entries, Hash::default()); let shared_blobs = original_entries.clone().to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); w_b.set_index(i as u64); - w_b.set_slot(i as u64); + w_b.set_slot(slot); } - assert_eq!( - db_ledger - .write_shared_blobs(shared_blobs.iter().skip(1).step_by(2)) - .unwrap(), - vec![] - ); + db_ledger + .write_shared_blobs(shared_blobs.iter().skip(1).step_by(2)) + .unwrap(); - assert_eq!( - db_ledger - .write_shared_blobs(shared_blobs.iter().step_by(2)) - .unwrap(), - original_entries - ); + assert_eq!(db_ledger.get_slot_entries(0, 0, None).unwrap(), vec![]); - let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); + let meta_key = MetaCf::key(slot); let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); - assert_eq!(meta.consumed, num_entries); assert_eq!(meta.received, num_entries); - assert_eq!(meta.consumed_slot, num_entries - 1); - assert_eq!(meta.received_slot, num_entries - 1); + assert_eq!(meta.consumed, 0); + assert_eq!(meta.consumed_ticks, 0); + + db_ledger + .write_shared_blobs(shared_blobs.iter().step_by(2)) + .unwrap(); + + assert_eq!( + db_ledger.get_slot_entries(0, 0, None).unwrap(), + original_entries, + ); + + let meta_key = MetaCf::key(slot); + let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); + assert_eq!(meta.received, num_entries); + assert_eq!(meta.consumed, num_entries); + assert_eq!(meta.consumed_ticks, num_entries); } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } @@ -1415,85 +1764,34 @@ mod tests { let index = (i / 2) as u64; let mut w_b = b.write().unwrap(); w_b.set_index(index); - w_b.set_slot(index); } - assert_eq!( - db_ledger - .write_shared_blobs( - shared_blobs - .iter() - .skip(num_duplicates) - .step_by(num_duplicates * 2) - ) - .unwrap(), - vec![] - ); + db_ledger + .write_shared_blobs( + shared_blobs + .iter() + .skip(num_duplicates) + .step_by(num_duplicates * 2), + ) + .unwrap(); + + assert_eq!(db_ledger.get_slot_entries(0, 0, None).unwrap(), vec![]); + + db_ledger + .write_shared_blobs(shared_blobs.iter().step_by(num_duplicates * 2)) + .unwrap(); let expected: Vec<_> = original_entries .into_iter() .step_by(num_duplicates) .collect(); - assert_eq!( - db_ledger - .write_shared_blobs(shared_blobs.iter().step_by(num_duplicates * 2)) - .unwrap(), - expected, - ); + assert_eq!(db_ledger.get_slot_entries(0, 0, None).unwrap(), expected,); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); assert_eq!(meta.consumed, num_entries); assert_eq!(meta.received, num_entries); - assert_eq!(meta.consumed_slot, num_entries - 1); - assert_eq!(meta.received_slot, num_entries - 1); - } - DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); - } - - #[test] - pub fn test_write_consecutive_blobs() { - let db_ledger_path = get_tmp_ledger_path("test_write_consecutive_blobs"); - { - let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); - - // Write entries - let num_entries = 20 as u64; - let original_entries = make_tiny_test_entries(num_entries as usize); - let shared_blobs = original_entries.to_shared_blobs(); - for (i, b) in shared_blobs.iter().enumerate() { - let mut w_b = b.write().unwrap(); - w_b.set_index(i as u64); - w_b.set_slot(i as u64); - } - - db_ledger - .write_consecutive_blobs(&shared_blobs) - .expect("Expect successful blob writes"); - - let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); - let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); - assert_eq!(meta.consumed, num_entries); - assert_eq!(meta.received, num_entries); - assert_eq!(meta.consumed_slot, num_entries - 1); - assert_eq!(meta.received_slot, num_entries - 1); - - for (i, b) in shared_blobs.iter().enumerate() { - let mut w_b = b.write().unwrap(); - w_b.set_index(num_entries + i as u64); - w_b.set_slot(num_entries + i as u64); - } - - db_ledger - .write_consecutive_blobs(&shared_blobs) - .expect("Expect successful blob writes"); - - let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); - assert_eq!(meta.consumed, 2 * num_entries); - assert_eq!(meta.received, 2 * num_entries); - assert_eq!(meta.consumed_slot, 2 * num_entries - 1); - assert_eq!(meta.received_slot, 2 * num_entries - 1); } DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } @@ -1549,4 +1847,405 @@ mod tests { DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_new_blobs_signal() { + // Initialize ledger + let ledger_path = get_tmp_ledger_path("test_new_blobs_signal"); + let ticks_per_slot = 10; + let config = DbLedgerConfig::new(ticks_per_slot); + let (ledger, _, recvr) = DbLedger::open_with_config_signal(&ledger_path, config).unwrap(); + let ledger = Arc::new(ledger); + + // Create ticks for slot 0 + let entries = create_ticks(ticks_per_slot, Hash::default()); + let mut blobs = entries.to_blobs(); + + for (i, b) in blobs.iter_mut().enumerate() { + b.set_index(i as u64); + } + + // Insert second blob, but we're missing the first blob, so no consecutive + // blobs starting from slot 0, index 0 should exist. + ledger.insert_data_blobs(&blobs[1..2]).unwrap(); + let timer = Duration::new(1, 0); + assert!(recvr.recv_timeout(timer).is_err()); + // Insert first blob, now we've made a consecutive block + ledger.insert_data_blobs(&blobs[0..1]).unwrap(); + // Wait to get notified of update, should only be one update + assert!(recvr.recv_timeout(timer).is_ok()); + assert!(recvr.try_recv().is_err()); + // Insert the rest of the ticks + ledger + .insert_data_blobs(&blobs[1..ticks_per_slot as usize]) + .unwrap(); + // Wait to get notified of update, should only be one update + assert!(recvr.recv_timeout(timer).is_ok()); + assert!(recvr.try_recv().is_err()); + + // Create some other slots, and send batches of ticks for each slot such that each slot + // is missing the tick at blob index == slot index - 1. Thus, no consecutive blocks + // will be formed + let num_slots = ticks_per_slot; + let mut all_blobs = vec![]; + for slot_index in 1..num_slots + 1 { + let entries = create_ticks(num_slots, Hash::default()); + let mut blobs = entries.to_blobs(); + for (i, ref mut b) in blobs.iter_mut().enumerate() { + if (i as u64) < slot_index - 1 { + b.set_index(i as u64); + } else { + b.set_index(i as u64 + 1); + } + b.set_slot(slot_index as u64); + } + + all_blobs.extend(blobs); + } + + // Should be no updates, since no new chains from block 0 were formed + ledger.insert_data_blobs(all_blobs.iter()).unwrap(); + assert!(recvr.recv_timeout(timer).is_err()); + + // Insert a blob for each slot that doesn't make a consecutive block, we + // should get no updates + let all_blobs: Vec<_> = (1..num_slots + 1) + .map(|slot_index| { + let entries = create_ticks(1, Hash::default());; + let mut blob = entries[0].to_blob(); + blob.set_index(2 * num_slots as u64); + blob.set_slot(slot_index as u64); + blob + }) + .collect(); + + ledger.insert_data_blobs(all_blobs.iter()).unwrap(); + assert!(recvr.recv_timeout(timer).is_err()); + + // For slots 1..num_slots/2, fill in the holes in one batch insertion, + // so we should only get one signal + let all_blobs: Vec<_> = (1..num_slots / 2) + .map(|slot_index| { + let entries = make_tiny_test_entries(1); + let mut blob = entries[0].to_blob(); + blob.set_index(slot_index as u64 - 1); + blob.set_slot(slot_index as u64); + blob + }) + .collect(); + + ledger.insert_data_blobs(all_blobs.iter()).unwrap(); + assert!(recvr.recv_timeout(timer).is_ok()); + assert!(recvr.try_recv().is_err()); + + // Fill in the holes for each of the remaining slots, we should get a single update + // for each + for slot_index in num_slots / 2..num_slots { + let entries = make_tiny_test_entries(1); + let mut blob = entries[0].to_blob(); + blob.set_index(slot_index as u64 - 1); + blob.set_slot(slot_index as u64); + ledger.insert_data_blobs(vec![blob]).unwrap(); + assert!(recvr.recv_timeout(timer).is_ok()); + assert!(recvr.try_recv().is_err()); + } + + // Destroying database without closing it first is undefined behavior + drop(ledger); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_handle_chaining_basic() { + let db_ledger_path = get_tmp_ledger_path("test_handle_chaining_basic"); + { + let ticks_per_slot = 2; + let config = DbLedgerConfig::new(ticks_per_slot); + let db_ledger = DbLedger::open_config(&db_ledger_path, config).unwrap(); + + let entries = create_ticks(6, Hash::default()); + let mut blobs = entries.to_blobs(); + for (i, ref mut b) in blobs.iter_mut().enumerate() { + b.set_index(i as u64 % ticks_per_slot); + b.set_slot(i as u64 / ticks_per_slot); + } + + // 1) Write to the first slot + db_ledger + .write_blobs(&blobs[ticks_per_slot as usize..2 * ticks_per_slot as usize]) + .unwrap(); + let s1 = db_ledger.meta_cf.get_slot_meta(1).unwrap().unwrap(); + assert!(s1.next_slots.is_empty()); + // Slot 1 is not trunk because slot 0 hasn't been inserted yet + assert!(!s1.is_trunk); + assert_eq!(s1.num_blocks, 1); + assert_eq!(s1.consumed_ticks, ticks_per_slot); + + // 2) Write to the second slot + db_ledger + .write_blobs(&blobs[2 * ticks_per_slot as usize..3 * ticks_per_slot as usize]) + .unwrap(); + let s2 = db_ledger.meta_cf.get_slot_meta(2).unwrap().unwrap(); + assert!(s2.next_slots.is_empty()); + // Slot 2 is not trunk because slot 0 hasn't been inserted yet + assert!(!s2.is_trunk); + assert_eq!(s2.num_blocks, 1); + assert_eq!(s2.consumed_ticks, ticks_per_slot); + + // Check the first slot again, it should chain to the second slot, + // but still isn't part of the trunk + let s1 = db_ledger.meta_cf.get_slot_meta(1).unwrap().unwrap(); + assert_eq!(s1.next_slots, vec![2]); + assert!(!s1.is_trunk); + assert_eq!(s1.consumed_ticks, ticks_per_slot); + + // 3) Write to the zeroth slot, check that every slot + // is now part of the trunk + db_ledger + .write_blobs(&blobs[0..ticks_per_slot as usize]) + .unwrap(); + for i in 0..3 { + let s = db_ledger.meta_cf.get_slot_meta(i).unwrap().unwrap(); + // The last slot will not chain to any other slots + if i != 2 { + assert_eq!(s.next_slots, vec![i + 1]); + } + assert_eq!(s.num_blocks, 1); + if i == 0 { + assert_eq!(s.consumed_ticks, ticks_per_slot - 1); + } else { + assert_eq!(s.consumed_ticks, ticks_per_slot); + } + assert!(s.is_trunk); + } + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_handle_chaining_missing_slots() { + let db_ledger_path = get_tmp_ledger_path("test_handle_chaining_missing_slots"); + { + let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + let num_slots = 30; + let ticks_per_slot = 2; + db_ledger.ticks_per_slot = ticks_per_slot as u64; + + let ticks = create_ticks((num_slots / 2) * ticks_per_slot, Hash::default()); + let mut blobs = ticks.to_blobs(); + + // Leave a gap for every other slot + for (i, ref mut b) in blobs.iter_mut().enumerate() { + b.set_index(i as u64 % ticks_per_slot); + b.set_slot(((i / 2) * 2 + 1) as u64); + } + + db_ledger.write_blobs(&blobs[..]).unwrap(); + + // Check metadata + for i in 0..num_slots { + // If it's a slot we just inserted, then next_slots should be empty + // because no slots chain to it yet because we left a gap. However, if it's + // a slot we haven't inserted, aka one of the gaps, then one of the slots + // we just inserted will chain to that gap + let s = db_ledger.meta_cf.get_slot_meta(i as u64).unwrap().unwrap(); + if i % 2 == 0 { + assert_eq!(s.next_slots, vec![i as u64 + 1]); + assert_eq!(s.consumed_ticks, 0); + } else { + assert!(s.next_slots.is_empty()); + assert_eq!(s.consumed_ticks, ticks_per_slot as u64); + } + + if i == 0 { + assert!(s.is_trunk); + } else { + assert!(!s.is_trunk); + } + } + + // Fill in the gaps + for (i, ref mut b) in blobs.iter_mut().enumerate() { + b.set_index(i as u64 % ticks_per_slot); + b.set_slot(((i / 2) * 2) as u64); + } + + db_ledger.write_blobs(&blobs[..]).unwrap(); + + for i in 0..num_slots { + // Check that all the slots chain correctly once the missing slots + // have been filled + let s = db_ledger.meta_cf.get_slot_meta(i as u64).unwrap().unwrap(); + if i != num_slots - 1 { + assert_eq!(s.next_slots, vec![i as u64 + 1]); + } else { + assert!(s.next_slots.is_empty()); + } + if i == 0 { + assert_eq!(s.consumed_ticks, ticks_per_slot - 1); + } else { + assert_eq!(s.consumed_ticks, ticks_per_slot); + } + assert!(s.is_trunk); + } + } + + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_forward_chaining_is_trunk() { + let db_ledger_path = get_tmp_ledger_path("test_forward_chaining_is_trunk"); + { + let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + let num_slots = 15; + let ticks_per_slot = 2; + db_ledger.ticks_per_slot = ticks_per_slot as u64; + + let entries = create_ticks(num_slots * ticks_per_slot, Hash::default()); + let mut blobs = entries.to_blobs(); + for (i, ref mut b) in blobs.iter_mut().enumerate() { + b.set_index(i as u64 % ticks_per_slot); + b.set_slot(i as u64 / ticks_per_slot); + } + + // Write the blobs such that every 3rd block has a gap in the beginning + for (slot_index, slot_ticks) in blobs.chunks(ticks_per_slot as usize).enumerate() { + if slot_index % 3 == 0 { + db_ledger + .write_blobs(&slot_ticks[1..ticks_per_slot as usize]) + .unwrap(); + } else { + db_ledger + .write_blobs(&slot_ticks[..ticks_per_slot as usize]) + .unwrap(); + } + } + + // Check metadata + for i in 0..num_slots { + let s = db_ledger.meta_cf.get_slot_meta(i as u64).unwrap().unwrap(); + // The last slot will not chain to any other slots + if i as u64 != num_slots - 1 { + assert_eq!(s.next_slots, vec![i as u64 + 1]); + } else { + assert!(s.next_slots.is_empty()); + } + assert_eq!(s.num_blocks, 1); + if i % 3 == 0 { + assert_eq!(s.consumed_ticks, 0); + } else { + assert_eq!(s.consumed_ticks, ticks_per_slot as u64); + } + + // Other than slot 0, no slots should be part of the trunk + if i != 0 { + assert!(!s.is_trunk); + } else { + assert!(s.is_trunk); + } + } + + // Iteratively finish every 3rd slot, and check that all slots up to and including + // slot_index + 3 become part of the trunk + for (slot_index, slot_ticks) in blobs.chunks(ticks_per_slot as usize).enumerate() { + if slot_index % 3 == 0 { + db_ledger.write_blobs(&slot_ticks[0..1]).unwrap(); + + for i in 0..num_slots { + let s = db_ledger.meta_cf.get_slot_meta(i as u64).unwrap().unwrap(); + if i != num_slots - 1 { + assert_eq!(s.next_slots, vec![i as u64 + 1]); + } else { + assert!(s.next_slots.is_empty()); + } + if i <= slot_index as u64 + 3 { + assert!(s.is_trunk); + } else { + assert!(!s.is_trunk); + } + } + } + } + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_get_slots_since() { + let db_ledger_path = get_tmp_ledger_path("test_get_slots_since"); + + { + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + + // Slot doesn't exist + assert!(db_ledger.get_slots_since(&vec![0]).unwrap().is_empty()); + + let mut meta0 = SlotMeta::new(0, 1); + db_ledger.meta_cf.put_slot_meta(0, &meta0).unwrap(); + + // Slot exists, chains to nothing + assert!(db_ledger.get_slots_since(&vec![0]).unwrap().is_empty()); + meta0.next_slots = vec![1, 2]; + db_ledger.meta_cf.put_slot_meta(0, &meta0).unwrap(); + + // Slot exists, chains to some other slots + assert_eq!(db_ledger.get_slots_since(&vec![0]).unwrap(), vec![1, 2]); + assert_eq!(db_ledger.get_slots_since(&vec![0, 1]).unwrap(), vec![1, 2]); + + let mut meta3 = SlotMeta::new(3, 1); + meta3.next_slots = vec![10, 5]; + db_ledger.meta_cf.put_slot_meta(3, &meta3).unwrap(); + assert_eq!( + db_ledger.get_slots_since(&vec![0, 1, 3]).unwrap(), + vec![1, 2, 10, 5] + ); + } + + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + + fn test_insert_data_blobs_slots(name: &str, should_bulk_write: bool) { + let db_ledger_path = get_tmp_ledger_path(name); + { + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + + // Write entries + let num_entries = 20 as u64; + let original_entries = make_tiny_test_entries(num_entries as usize); + let shared_blobs = original_entries.clone().to_shared_blobs(); + for (i, b) in shared_blobs.iter().enumerate() { + let mut w_b = b.write().unwrap(); + w_b.set_index(i as u64); + w_b.set_slot(i as u64); + } + + if should_bulk_write { + db_ledger.write_shared_blobs(shared_blobs.iter()).unwrap(); + } else { + for i in 0..num_entries { + let i = i as usize; + db_ledger + .write_shared_blobs(&shared_blobs[i..i + 1]) + .unwrap(); + } + } + + for i in 0..num_entries - 1 { + assert_eq!( + db_ledger.get_slot_entries(i, i, None).unwrap()[0], + original_entries[i as usize] + ); + + let meta_key = MetaCf::key(i); + let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); + assert_eq!(meta.received, i + 1); + if i != 0 { + assert!(meta.consumed == 0); + } else { + assert!(meta.consumed == 1); + } + } + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } } diff --git a/src/db_window.rs b/src/db_window.rs index 4a7095f8f8..75663aeb42 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -21,8 +21,49 @@ use std::sync::{Arc, RwLock}; pub const MAX_REPAIR_LENGTH: usize = 128; +pub fn generate_repairs(db_ledger: &DbLedger, max_repairs: usize) -> Result> { + // Slot height and blob indexes for blobs we want to repair + let mut repairs: Vec<(u64, u64)> = vec![]; + let mut slots = vec![0]; + while repairs.len() < max_repairs && !slots.is_empty() { + let slot_height = slots.pop().unwrap(); + let slot = db_ledger.meta(slot_height)?; + if slot.is_none() { + continue; + } + let slot = slot.unwrap(); + slots.extend(slot.next_slots.clone()); + + if slot.contains_all_ticks(db_ledger) { + continue; + } else { + let num_unreceived_ticks = { + if slot.consumed == slot.received { + slot.num_expected_ticks(db_ledger) - slot.consumed_ticks + } else { + 0 + } + }; + + let upper = slot.received + num_unreceived_ticks; + + let reqs = db_ledger.find_missing_data_indexes( + 0, + slot.consumed, + upper, + max_repairs - repairs.len(), + ); + + repairs.extend(reqs.into_iter().map(|i| (slot_height, i))) + } + } + + Ok(repairs) +} + pub fn repair( db_ledger: &DbLedger, + slot_index: u64, cluster_info: &Arc>, id: &Pubkey, times: usize, @@ -31,7 +72,8 @@ pub fn repair( leader_scheduler_option: &Arc>, ) -> Result)>> { let rcluster_info = cluster_info.read().unwrap(); - let meta = db_ledger.meta()?; + let is_next_leader = false; + let meta = db_ledger.meta(slot_index)?; if meta.is_none() { return Ok(vec![]); } @@ -44,7 +86,7 @@ pub fn repair( assert!(received > consumed); // Check if we are the next next slot leader - let is_next_leader = { + { let leader_scheduler = leader_scheduler_option.read().unwrap(); let next_slot = leader_scheduler.tick_height_to_slot(tick_height) + 1; match leader_scheduler.get_leader_for_slot(next_slot) { @@ -88,7 +130,7 @@ pub fn repair( let reqs: Vec<_> = idxs .into_iter() - .filter_map(|pix| rcluster_info.window_index_request(pix).ok()) + .filter_map(|pix| rcluster_info.window_index_request(slot_index, pix).ok()) .collect(); drop(rcluster_info); @@ -210,7 +252,9 @@ pub fn process_blob( // If write_shared_blobs() of these recovered blobs fails fails, don't return // because consumed_entries might be nonempty from earlier, and tick height needs to // be updated. Hopefully we can recover these blobs next time successfully. - if let Err(e) = try_erasure(db_ledger, &mut consumed_entries) { + + // TODO: Support per-slot erasure. Issue: https://github.com/solana-labs/solana/issues/2441 + if let Err(e) = try_erasure(db_ledger, &mut consumed_entries, 0) { trace!( "erasure::recover failed to write recovered coding blobs. Err: {:?}", e @@ -227,7 +271,7 @@ pub fn process_blob( // then stop if max_ix != 0 && !consumed_entries.is_empty() { let meta = db_ledger - .meta()? + .meta(0)? .expect("Expect metadata to exist if consumed entries is nonzero"); let consumed = meta.consumed; @@ -267,15 +311,19 @@ pub fn calculate_max_repair_entry_height( } #[cfg(feature = "erasure")] -fn try_erasure(db_ledger: &Arc, consume_queue: &mut Vec) -> Result<()> { - let meta = db_ledger.meta()?; +fn try_erasure( + db_ledger: &Arc, + consume_queue: &mut Vec, + slot_index: u64, +) -> Result<()> { + let meta = db_ledger.meta(slot_index)?; if let Some(meta) = meta { - let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?; + let (data, coding) = erasure::recover(db_ledger, slot_index, meta.consumed)?; for c in coding { let c = c.read().unwrap(); db_ledger.put_coding_blob_bytes( - meta.consumed_slot, + 0, c.index(), &c.data[..BLOB_HEADER_SIZE + c.size()], )?; @@ -435,6 +483,79 @@ mod test { assert!(blob_receiver.try_recv().is_err()); } + #[test] + pub fn test_generate_repairs() { + let db_ledger_path = get_tmp_ledger_path("test_generate_repairs"); + let num_ticks_per_slot = 10; + let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot); + let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap(); + + let num_entries_per_slot = 10; + let num_slots = 2; + let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs(); + + // Insert every nth entry for each slot + let nth = 3; + for (i, b) in blobs.iter_mut().enumerate() { + b.set_index(((i % num_entries_per_slot) * nth) as u64); + b.set_slot((i / num_entries_per_slot) as u64); + } + + db_ledger.write_blobs(&blobs).unwrap(); + + let missing_indexes_per_slot: Vec = (0..num_entries_per_slot - 1) + .flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64)) + .collect(); + + let expected: Vec<(u64, u64)> = (0..num_slots) + .flat_map(|slot_height| { + missing_indexes_per_slot + .iter() + .map(move |blob_index| (slot_height as u64, *blob_index)) + }) + .collect(); + + // Across all slots, find all missing indexes in the range [0, num_entries_per_slot * nth] + assert_eq!( + generate_repairs(&db_ledger, std::usize::MAX).unwrap(), + expected + ); + + assert_eq!( + generate_repairs(&db_ledger, expected.len() - 2).unwrap()[..], + expected[0..expected.len() - 2] + ); + + // Now fill in all the holes for each slot such that for each slot, consumed == received. + // Because none of the slots contain ticks, we should see that the repair requests + // ask for ticks, starting from the last received index for that slot + for (slot_height, blob_index) in expected { + let mut b = make_tiny_test_entries(1).to_blobs().pop().unwrap(); + b.set_index(blob_index); + b.set_slot(slot_height); + db_ledger.write_blobs(&vec![b]).unwrap(); + } + + let last_index_per_slot = ((num_entries_per_slot - 1) * nth) as u64; + let missing_indexes_per_slot: Vec = + (last_index_per_slot + 1..last_index_per_slot + 1 + num_ticks_per_slot).collect(); + let expected: Vec<(u64, u64)> = (0..num_slots) + .flat_map(|slot_height| { + missing_indexes_per_slot + .iter() + .map(move |blob_index| (slot_height as u64, *blob_index)) + }) + .collect(); + assert_eq!( + generate_repairs(&db_ledger, std::usize::MAX).unwrap(), + expected + ); + assert_eq!( + generate_repairs(&db_ledger, expected.len() - 2).unwrap()[..], + expected[0..expected.len() - 2] + ); + } + #[test] pub fn test_find_missing_data_indexes_sanity() { let slot = DEFAULT_SLOT_HEIGHT; @@ -564,6 +685,74 @@ mod test { DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_find_missing_data_indexes_slots() { + let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_slots"); + let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); + + let num_entries_per_slot = 10; + let num_slots = 2; + let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs(); + + // Insert every nth entry for each slot + let nth = 3; + for (i, b) in blobs.iter_mut().enumerate() { + b.set_index(((i % num_entries_per_slot) * nth) as u64); + b.set_slot((i / num_entries_per_slot) as u64); + } + + db_ledger.write_blobs(&blobs).unwrap(); + + let mut expected: Vec = (0..num_entries_per_slot) + .flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64)) + .collect(); + + // For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth] + for slot_height in 0..num_slots { + assert_eq!( + db_ledger.find_missing_data_indexes( + slot_height as u64, + 0, + (num_entries_per_slot * nth) as u64, + num_entries_per_slot * nth as usize + ), + expected, + ); + } + + // Test with a limit on the number of returned entries + for slot_height in 0..num_slots { + assert_eq!( + db_ledger.find_missing_data_indexes( + slot_height as u64, + 0, + (num_entries_per_slot * nth) as u64, + num_entries_per_slot * (nth - 1) + )[..], + expected[..num_entries_per_slot * (nth - 1)], + ); + } + + // Try to find entries in the range [num_entries_per_slot * nth..num_entries_per_slot * (nth + 1) + // that don't exist in the ledger. + let extra_entries = + (num_entries_per_slot * nth) as u64..(num_entries_per_slot * (nth + 1)) as u64; + expected.extend(extra_entries); + + // For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth] + for slot_height in 0..num_slots { + assert_eq!( + db_ledger.find_missing_data_indexes( + slot_height as u64, + 0, + (num_entries_per_slot * (nth + 1)) as u64, + num_entries_per_slot * (nth + 1), + ), + expected, + ); + } + } + #[test] pub fn test_no_missing_blob_indexes() { let slot = DEFAULT_SLOT_HEIGHT; @@ -577,13 +766,13 @@ mod test { index_blobs( &shared_blobs, &Keypair::new().pubkey(), - 0, + &mut 0, &vec![slot; num_entries], ); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - db_ledger.write_blobs(&blobs).unwrap(); + db_ledger.write_blobs(blobs).unwrap(); let empty: Vec = vec![]; for i in 0..num_entries as u64 { @@ -625,7 +814,8 @@ mod test { let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, false)); let mut consume_queue = vec![]; - try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt"); + try_erasure(&db_ledger, &mut consume_queue, DEFAULT_SLOT_HEIGHT) + .expect("Expected successful erasure attempt"); window[erased_index].data = erased_data; { @@ -668,7 +858,7 @@ mod test { index_blobs( &shared_blobs, &Keypair::new().pubkey(), - 0, + &mut 0, &vec![DEFAULT_SLOT_HEIGHT; num_entries], ); diff --git a/src/erasure.rs b/src/erasure.rs index 62e8acb300..24db9f6694 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -893,7 +893,7 @@ pub mod test { index_blobs( &blobs, &Keypair::new().pubkey(), - offset as u64, + &mut (offset as u64), &vec![slot; blobs.len()], ); @@ -911,7 +911,7 @@ pub mod test { index_blobs( &blobs, &Keypair::new().pubkey(), - offset as u64, + &mut (offset as u64), &vec![DEFAULT_SLOT_HEIGHT; blobs.len()], ); blobs diff --git a/src/fullnode.rs b/src/fullnode.rs index da67e541a0..640221acf7 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,7 +3,7 @@ use crate::bank::Bank; use crate::cluster_info::{ClusterInfo, Node, NodeInfo}; use crate::counter::Counter; -use crate::db_ledger::DbLedger; +use crate::db_ledger::{DbLedger, DbLedgerConfig}; use crate::genesis_block::GenesisBlock; use crate::gossip_service::GossipService; use crate::leader_scheduler::LeaderSchedulerConfig; @@ -64,6 +64,7 @@ pub struct FullnodeConfig { pub entry_stream: Option, pub storage_rotate_count: u64, pub leader_scheduler_config: LeaderSchedulerConfig, + pub ledger_config: DbLedgerConfig, } impl Default for FullnodeConfig { fn default() -> Self { @@ -77,10 +78,18 @@ impl Default for FullnodeConfig { entry_stream: None, storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE, leader_scheduler_config: Default::default(), + ledger_config: Default::default(), } } } +impl FullnodeConfig { + pub fn set_leader_scheduler_config(&mut self, config: LeaderSchedulerConfig) { + self.ledger_config.ticks_per_slot = config.ticks_per_slot; + self.leader_scheduler_config = config; + } +} + pub struct Fullnode { id: Pubkey, exit: Arc, @@ -107,6 +116,8 @@ impl Fullnode { entrypoint_info_option: Option<&NodeInfo>, config: &FullnodeConfig, ) -> Self { + info!("creating bank..."); + let id = keypair.pubkey(); assert_eq!(id, node.info.id); @@ -117,7 +128,11 @@ impl Fullnode { db_ledger, ledger_signal_sender, ledger_signal_receiver, - ) = new_bank_from_ledger(ledger_path, &config.leader_scheduler_config); + ) = new_bank_from_ledger( + ledger_path, + config.ledger_config, + &config.leader_scheduler_config, + ); info!("node info: {:?}", node.info); info!("node entrypoint_info: {:?}", entrypoint_info_option); @@ -184,7 +199,7 @@ impl Fullnode { } // Get the scheduled leader - let (scheduled_leader, max_tpu_tick_height) = { + let (scheduled_leader, slot_height, max_tpu_tick_height) = { let tick_height = bank.tick_height(); let leader_scheduler = bank.leader_scheduler.read().unwrap(); @@ -193,6 +208,7 @@ impl Fullnode { leader_scheduler .get_leader_for_slot(slot) .expect("Leader not known after processing bank"), + slot, tick_height + leader_scheduler.num_ticks_left_in_slot(tick_height), ) }; @@ -235,9 +251,12 @@ impl Fullnode { let (to_leader_sender, to_leader_receiver) = channel(); let (to_validator_sender, to_validator_receiver) = channel(); + let blob_index = Self::get_consumed_for_slot(&db_ledger, slot_height); + let (tvu, blob_sender) = Tvu::new( voting_keypair_option, &bank, + blob_index, entry_height, last_entry_id, &cluster_info, @@ -263,7 +282,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone broadcast socket"), cluster_info.clone(), - entry_height, + blob_index, config.sigverify_disabled, max_tpu_tick_height, &last_entry_id, @@ -318,8 +337,8 @@ impl Fullnode { if scheduled_leader == self.id { debug!("node is still the leader"); - let (last_entry_id, entry_height) = self.node_services.tvu.get_state(); - self.validator_to_leader(tick_height, entry_height, last_entry_id); + let last_entry_id = self.node_services.tvu.get_state(); + self.validator_to_leader(tick_height, last_entry_id); FullnodeReturnType::LeaderToLeaderRotation } else { debug!("new leader is {}", scheduled_leader); @@ -334,12 +353,11 @@ impl Fullnode { } } - fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) { + pub fn validator_to_leader(&mut self, tick_height: u64, last_entry_id: Hash) { trace!( - "validator_to_leader({:?}): tick_height={} entry_height={} last_entry_id={}", + "validator_to_leader({:?}): tick_height={} last_entry_id={}", self.id, tick_height, - entry_height, last_entry_id, ); @@ -382,7 +400,7 @@ impl Fullnode { self.cluster_info.clone(), self.sigverify_disabled, max_tick_height, - entry_height, + 0, &last_entry_id, self.id, &to_validator_sender, @@ -409,8 +427,8 @@ impl Fullnode { } else { let should_be_leader = self.to_leader_receiver.recv_timeout(timeout); match should_be_leader { - Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => { - self.validator_to_leader(tick_height, entry_height, last_entry_id); + Ok(TvuReturnType::LeaderRotation(tick_height, last_entry_id)) => { + self.validator_to_leader(tick_height, last_entry_id); return Some(( FullnodeReturnType::ValidatorToLeaderRotation, tick_height + 1, @@ -471,14 +489,24 @@ impl Fullnode { self.exit(); self.join() } + + fn get_consumed_for_slot(db_ledger: &DbLedger, slot_index: u64) -> u64 { + let meta = db_ledger.meta(slot_index).expect("Database error"); + if let Some(meta) = meta { + meta.consumed + } else { + 0 + } + } } pub fn new_bank_from_ledger( ledger_path: &str, + ledger_config: DbLedgerConfig, leader_scheduler_config: &LeaderSchedulerConfig, ) -> (Bank, u64, Hash, DbLedger, SyncSender, Receiver) { let (db_ledger, ledger_signal_sender, ledger_signal_receiver) = - DbLedger::open_with_signal(ledger_path) + DbLedger::open_with_config_signal(ledger_path, ledger_config) .expect("Expected to successfully open database ledger"); let genesis_block = GenesisBlock::load(ledger_path).expect("Expected to successfully open genesis block"); @@ -525,7 +553,7 @@ impl Service for Fullnode { #[cfg(test)] mod tests { use super::*; - use crate::db_ledger::{create_tmp_sample_ledger, tmp_copy_ledger, DEFAULT_SLOT_HEIGHT}; + use crate::db_ledger::{create_tmp_sample_ledger, tmp_copy_ledger}; use crate::entry::make_consecutive_blobs; use crate::leader_scheduler::make_active_set_entries; use crate::streamer::responder; @@ -626,7 +654,7 @@ mod tests { let voting_keypair = VotingKeypair::new_local(&bootstrap_leader_keypair); // Start the bootstrap leader let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.leader_scheduler_config = leader_scheduler_config; + fullnode_config.set_leader_scheduler_config(leader_scheduler_config); let bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_keypair, @@ -655,11 +683,11 @@ mod tests { let mut fullnode_config = FullnodeConfig::default(); let ticks_per_slot = 16; let slots_per_epoch = 2; - fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new( + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( ticks_per_slot, slots_per_epoch, ticks_per_slot * slots_per_epoch, - ); + )); // Create the leader and validator nodes let bootstrap_leader_keypair = Arc::new(Keypair::new()); @@ -673,6 +701,7 @@ mod tests { // tick_height = 0 from the leader scheduler's active window ticks_per_slot * 4, "test_wrong_role_transition", + ticks_per_slot, ); let bootstrap_leader_info = bootstrap_leader_node.info.clone(); @@ -726,6 +755,8 @@ mod tests { fn test_validator_to_leader_transition() { solana_logger::setup(); // Make leader and validator node + let ticks_per_slot = 10; + let slots_per_epoch = 4; let leader_keypair = Arc::new(Keypair::new()); let validator_keypair = Arc::new(Keypair::new()); let (leader_node, validator_node, validator_ledger_path, ledger_initial_len, last_id) = @@ -735,6 +766,7 @@ mod tests { 0, 0, "test_validator_to_leader_transition", + ticks_per_slot, ); let leader_id = leader_keypair.pubkey(); @@ -744,17 +776,15 @@ mod tests { info!("validator: {:?}", validator_info.id); // Set the leader scheduler for the validator - let ticks_per_slot = 10; - let slots_per_epoch = 4; - let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new( + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( ticks_per_slot, slots_per_epoch, ticks_per_slot * slots_per_epoch, - ); + )); let voting_keypair = VotingKeypair::new_local(&validator_keypair); + // Start the validator let validator = Fullnode::new( validator_node, @@ -805,8 +835,11 @@ mod tests { // Close the validator so that rocksdb has locks available validator_exit(); - let (bank, entry_height, _, _, _, _) = - new_bank_from_ledger(&validator_ledger_path, &LeaderSchedulerConfig::default()); + let (bank, entry_height, _, _, _, _) = new_bank_from_ledger( + &validator_ledger_path, + DbLedgerConfig::default(), + &LeaderSchedulerConfig::default(), + ); assert!(bank.tick_height() >= bank.leader_scheduler.read().unwrap().ticks_per_epoch); @@ -825,27 +858,32 @@ mod tests { solana_logger::setup(); // Make leader node + let ticks_per_slot = 5; + let slots_per_epoch = 2; let leader_keypair = Arc::new(Keypair::new()); let validator_keypair = Arc::new(Keypair::new()); info!("leader: {:?}", leader_keypair.pubkey()); info!("validator: {:?}", validator_keypair.pubkey()); - let (leader_node, _, leader_ledger_path, _, _) = - setup_leader_validator(&leader_keypair, &validator_keypair, 1, 0, "test_tvu_behind"); + let (leader_node, _, leader_ledger_path, _, _) = setup_leader_validator( + &leader_keypair, + &validator_keypair, + 1, + 0, + "test_tvu_behind", + ticks_per_slot, + ); let leader_node_info = leader_node.info.clone(); // Set the leader scheduler for the validator - let ticks_per_slot = 5; - let slots_per_epoch = 2; - let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new( + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( ticks_per_slot, slots_per_epoch, ticks_per_slot * slots_per_epoch, - ); + )); let voting_keypair = VotingKeypair::new_local(&leader_keypair); info!("Start the bootstrap leader"); @@ -914,11 +952,13 @@ mod tests { num_genesis_ticks: u64, num_ending_ticks: u64, test_name: &str, + ticks_per_block: u64, ) -> (Node, Node, String, u64, Hash) { // Make a leader identity let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); // Create validator identity + assert!(num_genesis_ticks <= ticks_per_block); let (mint_keypair, ledger_path, genesis_entry_height, last_id) = create_tmp_sample_ledger( test_name, 10_000, @@ -946,14 +986,33 @@ mod tests { num_ending_ticks, ); + let non_tick_active_entries_len = active_set_entries.len() - num_ending_ticks as usize; + let remaining_ticks_in_zeroth_slot = ticks_per_block - num_genesis_ticks; + let entries_for_zeroth_slot = + non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize; + let entry_chunks: Vec<_> = active_set_entries[entries_for_zeroth_slot..] + .chunks(ticks_per_block as usize) + .collect(); + let db_ledger = DbLedger::open(&ledger_path).unwrap(); - db_ledger - .write_entries( - DEFAULT_SLOT_HEIGHT, - genesis_entry_height, - &active_set_entries, - ) - .unwrap(); + + // Iterate writing slots through 0..entry_chunks.len() + for i in 0..entry_chunks.len() + 1 { + let (start_height, entries) = { + if i == 0 { + ( + genesis_entry_height, + &active_set_entries[..entries_for_zeroth_slot], + ) + } else { + (0, entry_chunks[i - 1]) + } + }; + + db_ledger + .write_entries(i as u64, start_height, entries) + .unwrap(); + } let entry_height = genesis_entry_height + active_set_entries.len() as u64; ( diff --git a/src/lib.rs b/src/lib.rs index 55bf1b0df3..8ce0f6e266 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,6 +50,7 @@ pub mod poh; pub mod poh_recorder; pub mod poh_service; pub mod recvmmsg; +pub mod repair_service; pub mod replay_stage; pub mod replicator; pub mod result; diff --git a/src/packet.rs b/src/packet.rs index 8894f7589b..e502b09ea3 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -442,16 +442,15 @@ impl Blob { } } -pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u64]) { +pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slots: &[u64]) { // enumerate all the blobs, those are the indices for (blob, slot) in blobs.iter().zip(slots) { let mut blob = blob.write().unwrap(); - blob.set_index(index); + blob.set_index(*blob_index); blob.set_slot(*slot); blob.set_id(id); - - index += 1; + *blob_index += 1; } } diff --git a/src/repair_service.rs b/src/repair_service.rs new file mode 100644 index 0000000000..5e3ae3e32e --- /dev/null +++ b/src/repair_service.rs @@ -0,0 +1,372 @@ +//! The `repair_service` module implements the tools necessary to generate a thread which +//! regularly finds missing blobs in the ledger and sends repair requests for those blobs + +use crate::cluster_info::ClusterInfo; +use crate::db_ledger::{DbLedger, SlotMeta}; +use crate::result::Result; +use crate::service::Service; +use solana_metrics::{influxdb, submit}; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::thread::sleep; +use std::thread::{self, Builder, JoinHandle}; +use std::time::Duration; + +pub const MAX_REPAIR_LENGTH: usize = 16; +pub const REPAIR_MS: u64 = 100; +pub const MAX_REPAIR_TRIES: u64 = 128; + +#[derive(Default)] +struct RepairInfo { + max_slot: u64, + repair_tries: u64, +} + +impl RepairInfo { + fn new() -> Self { + RepairInfo { + max_slot: 0, + repair_tries: 0, + } + } +} + +pub struct RepairService { + t_repair: JoinHandle<()>, +} + +impl RepairService { + fn run( + db_ledger: &Arc, + exit: &Arc, + repair_socket: &Arc, + cluster_info: &Arc>, + ) { + let mut repair_info = RepairInfo::new(); + let id = cluster_info.read().unwrap().id(); + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let repairs = Self::generate_repairs(db_ledger, MAX_REPAIR_LENGTH, &mut repair_info); + + if let Ok(repairs) = repairs { + let reqs: Vec<_> = repairs + .into_iter() + .filter_map(|(slot_height, blob_index)| { + cluster_info + .read() + .unwrap() + .window_index_request(slot_height, blob_index) + .map(|result| (result, slot_height, blob_index)) + .ok() + }) + .collect(); + + for ((to, req), slot_height, blob_index) in reqs { + if let Ok(local_addr) = repair_socket.local_addr() { + submit( + influxdb::Point::new("repair_service") + .add_field( + "repair_slot", + influxdb::Value::Integer(slot_height as i64), + ) + .to_owned() + .add_field( + "repair_blob", + influxdb::Value::Integer(blob_index as i64), + ) + .to_owned() + .add_field("to", influxdb::Value::String(to.to_string())) + .to_owned() + .add_field("from", influxdb::Value::String(local_addr.to_string())) + .to_owned() + .add_field("id", influxdb::Value::String(id.to_string())) + .to_owned(), + ); + } + + repair_socket.send_to(&req, to).unwrap_or_else(|e| { + info!("{} repair req send_to({}) error {:?}", id, to, e); + 0 + }); + } + } + sleep(Duration::from_millis(REPAIR_MS)); + } + } + + pub fn new( + db_ledger: Arc, + exit: Arc, + repair_socket: Arc, + cluster_info: Arc>, + ) -> Self { + let t_repair = Builder::new() + .name("solana-repair-service".to_string()) + .spawn(move || Self::run(&db_ledger, &exit, &repair_socket, &cluster_info)) + .unwrap(); + + RepairService { t_repair } + } + + fn process_slot( + db_ledger: &DbLedger, + slot_height: u64, + slot: &SlotMeta, + max_repairs: usize, + ) -> Result> { + if slot.contains_all_ticks(db_ledger) { + Ok(vec![]) + } else { + let num_unreceived_ticks = { + if slot.consumed == slot.received { + let num_expected_ticks = slot.num_expected_ticks(db_ledger); + if num_expected_ticks == 0 { + // This signals that we have received nothing for this slot, try to get at least the + // first entry + 1 + } + // This signals that we will never use other slots (leader rotation is + // off) + else if num_expected_ticks == std::u64::MAX + || num_expected_ticks <= slot.consumed_ticks + { + 0 + } else { + num_expected_ticks - slot.consumed_ticks + } + } else { + 0 + } + }; + + let upper = slot.received + num_unreceived_ticks; + + let reqs = + db_ledger.find_missing_data_indexes(slot_height, slot.consumed, upper, max_repairs); + + Ok(reqs.into_iter().map(|i| (slot_height, i)).collect()) + } + } + + fn generate_repairs( + db_ledger: &DbLedger, + max_repairs: usize, + repair_info: &mut RepairInfo, + ) -> Result> { + // Slot height and blob indexes for blobs we want to repair + let mut repairs: Vec<(u64, u64)> = vec![]; + let mut current_slot_height = Some(0); + while repairs.len() < max_repairs && current_slot_height.is_some() { + if current_slot_height.unwrap() > repair_info.max_slot { + repair_info.repair_tries = 0; + repair_info.max_slot = current_slot_height.unwrap(); + } + + let slot = db_ledger.meta(current_slot_height.unwrap())?; + if slot.is_none() { + current_slot_height = db_ledger.get_next_slot(current_slot_height.unwrap())?; + continue; + } + let slot = slot.unwrap(); + let new_repairs = Self::process_slot( + db_ledger, + current_slot_height.unwrap(), + &slot, + max_repairs - repairs.len(), + )?; + repairs.extend(new_repairs); + current_slot_height = db_ledger.get_next_slot(current_slot_height.unwrap())?; + } + + // Only increment repair_tries if the ledger contains every blob for every slot + if repairs.is_empty() { + repair_info.repair_tries += 1; + } + + // Optimistically try the next slot if we haven't gotten any repairs + // for a while + if repair_info.repair_tries >= MAX_REPAIR_TRIES { + repairs.push((repair_info.max_slot + 1, 0)) + } + + Ok(repairs) + } +} + +impl Service for RepairService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.t_repair.join() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::db_ledger::{get_tmp_ledger_path, DbLedger, DbLedgerConfig}; + use crate::entry::create_ticks; + use crate::entry::{make_tiny_test_entries, EntrySlice}; + use solana_sdk::hash::Hash; + + #[test] + pub fn test_repair_missed_future_slot() { + let db_ledger_path = get_tmp_ledger_path("test_repair_missed_future_slot"); + { + let num_ticks_per_slot = 1; + let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot); + let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap(); + + let mut blobs = create_ticks(1, Hash::default()).to_blobs(); + blobs[0].set_index(0); + blobs[0].set_slot(0); + + db_ledger.write_blobs(&blobs).unwrap(); + + let mut repair_info = RepairInfo::new(); + // We have all the blobs for all the slots in the ledger, wait for optimistic + // future repair after MAX_REPAIR_TRIES + for i in 0..MAX_REPAIR_TRIES { + // Check that repair tries to patch the empty slot + assert_eq!(repair_info.repair_tries, i); + assert_eq!(repair_info.max_slot, 0); + let expected = if i == MAX_REPAIR_TRIES - 1 { + vec![(1, 0)] + } else { + vec![] + }; + assert_eq!( + RepairService::generate_repairs(&db_ledger, 2, &mut repair_info).unwrap(), + expected + ); + } + + // Insert a bigger blob + let mut blobs = create_ticks(1, Hash::default()).to_blobs(); + blobs[0].set_index(0); + blobs[0].set_slot(1); + + db_ledger.write_blobs(&blobs).unwrap(); + assert_eq!( + RepairService::generate_repairs(&db_ledger, 2, &mut repair_info).unwrap(), + vec![] + ); + assert_eq!(repair_info.repair_tries, 1); + assert_eq!(repair_info.max_slot, 1); + } + + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_repair_empty_slot() { + let db_ledger_path = get_tmp_ledger_path("test_repair_empty_slot"); + { + let num_ticks_per_slot = 10; + let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot); + let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap(); + + let mut blobs = make_tiny_test_entries(1).to_blobs(); + blobs[0].set_index(1); + blobs[0].set_slot(2); + + let mut repair_info = RepairInfo::new(); + + // Write this blob to slot 2, should chain to slot 1, which we haven't received + // any blobs for + db_ledger.write_blobs(&blobs).unwrap(); + // Check that repair tries to patch the empty slot + assert_eq!( + RepairService::generate_repairs(&db_ledger, 2, &mut repair_info).unwrap(), + vec![(1, 0), (2, 0)] + ); + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_generate_repairs() { + let db_ledger_path = get_tmp_ledger_path("test_generate_repairs"); + { + let num_ticks_per_slot = 10; + let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot); + let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap(); + + let num_entries_per_slot = 10; + let num_slots = 2; + let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs(); + + let mut repair_info = RepairInfo::new(); + + // Insert every nth entry for each slot + let nth = 3; + for (i, b) in blobs.iter_mut().enumerate() { + b.set_index(((i % num_entries_per_slot) * nth) as u64); + b.set_slot((i / num_entries_per_slot) as u64); + } + + db_ledger.write_blobs(&blobs).unwrap(); + + let missing_indexes_per_slot: Vec = (0..num_entries_per_slot - 1) + .flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64)) + .collect(); + + let expected: Vec<(u64, u64)> = (0..num_slots) + .flat_map(|slot_height| { + missing_indexes_per_slot + .iter() + .map(move |blob_index| (slot_height as u64, *blob_index)) + }) + .collect(); + + // Across all slots, find all missing indexes in the range [0, num_entries_per_slot * nth] + assert_eq!( + RepairService::generate_repairs(&db_ledger, std::usize::MAX, &mut repair_info) + .unwrap(), + expected + ); + + assert_eq!( + RepairService::generate_repairs(&db_ledger, expected.len() - 2, &mut repair_info) + .unwrap()[..], + expected[0..expected.len() - 2] + ); + + // Now fill in all the holes for each slot such that for each slot, consumed == received. + // Because none of the slots contain ticks, we should see that the repair requests + // ask for ticks, starting from the last received index for that slot + for (slot_height, blob_index) in expected { + let mut b = make_tiny_test_entries(1).to_blobs().pop().unwrap(); + b.set_index(blob_index); + b.set_slot(slot_height); + db_ledger.write_blobs(&vec![b]).unwrap(); + } + + let last_index_per_slot = ((num_entries_per_slot - 1) * nth) as u64; + let missing_indexes_per_slot: Vec = + (last_index_per_slot + 1..last_index_per_slot + 1 + num_ticks_per_slot).collect(); + let expected: Vec<(u64, u64)> = (0..num_slots) + .flat_map(|slot_height| { + missing_indexes_per_slot + .iter() + .map(move |blob_index| (slot_height as u64, *blob_index)) + }) + .collect(); + assert_eq!( + RepairService::generate_repairs(&db_ledger, std::usize::MAX, &mut repair_info) + .unwrap(), + expected + ); + assert_eq!( + RepairService::generate_repairs(&db_ledger, expected.len() - 2, &mut repair_info) + .unwrap()[..], + expected[0..expected.len() - 2] + ); + } + DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); + } +} diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 435b13733a..1a39eec523 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -62,7 +62,7 @@ impl ReplayStage { cluster_info: &Arc>, voting_keypair: Option<&Arc>, ledger_entry_sender: &EntrySender, - entry_height: &Arc>, + current_blob_index: &mut u64, last_entry_id: &Arc>, entry_stream: Option<&mut EntryStream>, ) -> Result<()> { @@ -162,8 +162,7 @@ impl ReplayStage { ledger_entry_sender.send(entries)?; } - *entry_height.write().unwrap() += entries_len; - + *current_blob_index += entries_len; res?; inc_new_counter_info!( "replicate_stage-duration", @@ -180,7 +179,7 @@ impl ReplayStage { bank: Arc, cluster_info: Arc>, exit: Arc, - entry_height: Arc>, + mut current_blob_index: u64, last_entry_id: Arc>, to_leader_sender: TvuRotationSender, entry_stream: Option<&String>, @@ -196,14 +195,14 @@ impl ReplayStage { .spawn(move || { let _exit = Finalizer::new(exit_.clone()); let mut last_leader_id = Self::get_leader_for_next_tick(&bank); - + let mut prev_slot = None; let (mut current_slot, mut max_tick_height_for_slot) = { let tick_height = bank.tick_height(); let leader_scheduler = bank.leader_scheduler.read().unwrap(); let current_slot = leader_scheduler.tick_height_to_slot(tick_height + 1); let first_tick_in_current_slot = current_slot * leader_scheduler.ticks_per_slot; ( - current_slot, + Some(current_slot), first_tick_in_current_slot + leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot), ) @@ -217,15 +216,35 @@ impl ReplayStage { break; } - let current_entry_height = *entry_height.read().unwrap(); + if current_slot.is_none() { + let new_slot = Self::get_next_slot( + &db_ledger, + prev_slot.expect("prev_slot must exist"), + ); + if new_slot.is_some() { + // Reset the state + current_slot = new_slot; + current_blob_index = 0; + let leader_scheduler = bank.leader_scheduler.read().unwrap(); + let first_tick_in_current_slot = + current_slot.unwrap() * leader_scheduler.ticks_per_slot; + max_tick_height_for_slot = first_tick_in_current_slot + + leader_scheduler + .num_ticks_left_in_slot(first_tick_in_current_slot); + } + } let entries = { - if let Ok(entries) = db_ledger.get_slot_entries( - current_slot, - current_entry_height, - Some(MAX_ENTRY_RECV_PER_ITER as u64), - ) { - entries + if let Some(slot) = current_slot { + if let Ok(entries) = db_ledger.get_slot_entries( + slot, + current_blob_index, + Some(MAX_ENTRY_RECV_PER_ITER as u64), + ) { + entries + } else { + vec![] + } } else { vec![] } @@ -240,7 +259,7 @@ impl ReplayStage { &cluster_info, voting_keypair.as_ref(), &ledger_entry_sender, - &entry_height, + &mut current_blob_index, &last_entry_id, entry_stream.as_mut(), ) { @@ -262,16 +281,16 @@ impl ReplayStage { to_leader_sender .send(TvuReturnType::LeaderRotation( current_tick_height, - *entry_height.read().unwrap(), *last_entry_id.read().unwrap(), )) .unwrap(); } - current_slot += 1; - max_tick_height_for_slot += - bank.leader_scheduler.read().unwrap().ticks_per_slot; + // Check for any slots that chain to this one + prev_slot = current_slot; + current_slot = None; last_leader_id = leader_id; + continue; } } @@ -313,6 +332,12 @@ impl ReplayStage { .get_leader_for_slot(slot) .expect("Scheduled leader should be calculated by this point") } + + fn get_next_slot(db_ledger: &DbLedger, slot_index: u64) -> Option { + // Find the next slot that chains to the old slot + let next_slots = db_ledger.get_slots_since(&[slot_index]).expect("Db error"); + next_slots.first().cloned() + } } impl Service for ReplayStage { @@ -328,8 +353,9 @@ mod test { use super::*; use crate::bank::Bank; use crate::cluster_info::{ClusterInfo, Node}; - use crate::db_ledger::create_tmp_sample_ledger; - use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; + use crate::db_ledger::{ + create_tmp_sample_ledger, DbLedger, DbLedgerConfig, DEFAULT_SLOT_HEIGHT, + }; use crate::entry::create_ticks; use crate::entry::Entry; use crate::fullnode::new_bank_from_ledger; @@ -389,6 +415,7 @@ mod test { 0, ); last_id = active_set_entries.last().unwrap().id; + { let db_ledger = DbLedger::open(&my_ledger_path).unwrap(); db_ledger @@ -402,12 +429,13 @@ mod test { { // Set up the bank + let db_ledger_config = DbLedgerConfig::new(ticks_per_slot); let (bank, _entry_height, last_entry_id, db_ledger, l_sender, l_receiver) = - new_bank_from_ledger(&my_ledger_path, &leader_scheduler_config); + new_bank_from_ledger(&my_ledger_path, db_ledger_config, &leader_scheduler_config); // Set up the replay stage let (rotation_sender, rotation_receiver) = channel(); - let meta = db_ledger.meta().unwrap().unwrap(); + let meta = db_ledger.meta(0).unwrap().unwrap(); let exit = Arc::new(AtomicBool::new(false)); let bank = Arc::new(bank); let db_ledger = Arc::new(db_ledger); @@ -418,7 +446,7 @@ mod test { bank.clone(), Arc::new(RwLock::new(cluster_info_me)), exit.clone(), - Arc::new(RwLock::new(meta.consumed)), + meta.consumed, Arc::new(RwLock::new(last_entry_id)), rotation_sender, None, @@ -434,7 +462,6 @@ mod test { entries_to_send.push(entry); } - let expected_entry_height = (active_set_entries.len() + total_entries_to_send) as u64; let expected_last_id = entries_to_send.last().unwrap().id; // Write the entries to the ledger, replay_stage should get notified of changes @@ -446,7 +473,6 @@ mod test { assert_eq!( Some(TvuReturnType::LeaderRotation( 2 * ticks_per_slot - 1, - expected_entry_height, expected_last_id, )), { @@ -507,7 +533,11 @@ mod test { let (to_leader_sender, _) = channel(); { let (bank, entry_height, last_entry_id, db_ledger, l_sender, l_receiver) = - new_bank_from_ledger(&my_ledger_path, &LeaderSchedulerConfig::default()); + new_bank_from_ledger( + &my_ledger_path, + DbLedgerConfig::default(), + &LeaderSchedulerConfig::default(), + ); let bank = Arc::new(bank); let db_ledger = Arc::new(db_ledger); @@ -518,7 +548,7 @@ mod test { bank.clone(), cluster_info_me.clone(), exit.clone(), - Arc::new(RwLock::new(entry_height)), + entry_height, Arc::new(RwLock::new(last_entry_id)), to_leader_sender, None, @@ -581,8 +611,6 @@ mod test { make_active_set_entries(&my_keypair, &mint_keypair, 100, 1, &last_id, &last_id, 0); let mut last_id = active_set_entries.last().unwrap().id; let initial_tick_height = genesis_entry_height; - let active_set_entries_len = active_set_entries.len() as u64; - let initial_non_tick_height = genesis_entry_height - initial_tick_height; { let db_ledger = DbLedger::open(&my_ledger_path).unwrap(); @@ -608,11 +636,12 @@ mod test { let (rotation_tx, rotation_rx) = channel(); let exit = Arc::new(AtomicBool::new(false)); { + let db_ledger_config = DbLedgerConfig::new(ticks_per_slot); let (bank, _entry_height, last_entry_id, db_ledger, l_sender, l_receiver) = - new_bank_from_ledger(&my_ledger_path, &leader_scheduler_config); + new_bank_from_ledger(&my_ledger_path, db_ledger_config, &leader_scheduler_config); let meta = db_ledger - .meta() + .meta(0) .unwrap() .expect("First slot metadata must exist"); @@ -626,7 +655,7 @@ mod test { bank.clone(), cluster_info_me.clone(), exit.clone(), - Arc::new(RwLock::new(meta.consumed)), + meta.consumed, Arc::new(RwLock::new(last_entry_id)), rotation_tx, None, @@ -642,10 +671,6 @@ mod test { let total_entries_to_send = (active_window_tick_length - initial_tick_height) as usize; let num_hashes = 1; - // Add on the only entries that weren't ticks to the bootstrap height to get the - // total expected entry length - let expected_entry_height = - active_window_tick_length + initial_non_tick_height + active_set_entries_len; let leader_rotation_index = (active_window_tick_length - initial_tick_height - 1) as usize; let mut expected_last_id = Hash::default(); @@ -674,7 +699,6 @@ mod test { assert_eq!( Some(TvuReturnType::LeaderRotation( active_window_tick_length, - expected_entry_height, expected_last_id, )), { @@ -704,8 +728,7 @@ mod test { let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); let (ledger_entry_sender, _ledger_entry_receiver) = channel(); let last_entry_id = Hash::default(); - - let entry_height = 0; + let mut current_blob_index = 0; let mut last_id = Hash::default(); let mut entries = Vec::new(); for _ in 0..5 { @@ -722,7 +745,7 @@ mod test { &cluster_info_me, Some(&voting_keypair), &ledger_entry_sender, - &Arc::new(RwLock::new(entry_height)), + &mut current_blob_index, &Arc::new(RwLock::new(last_entry_id)), None, ); @@ -744,7 +767,7 @@ mod test { &cluster_info_me, Some(&voting_keypair), &ledger_entry_sender, - &Arc::new(RwLock::new(entry_height)), + &mut current_blob_index, &Arc::new(RwLock::new(last_entry_id)), None, ); @@ -774,7 +797,7 @@ mod test { let (ledger_entry_sender, _ledger_entry_receiver) = channel(); let last_entry_id = Hash::default(); - let entry_height = 0; + let mut entry_height = 0; let mut last_id = Hash::default(); let mut entries = Vec::new(); let mut expected_entries = Vec::new(); @@ -794,7 +817,7 @@ mod test { &cluster_info_me, Some(&voting_keypair), &ledger_entry_sender, - &Arc::new(RwLock::new(entry_height)), + &mut entry_height, &Arc::new(RwLock::new(last_entry_id)), Some(&mut entry_stream), ) diff --git a/src/replicator.rs b/src/replicator.rs index 0b01ee5d87..cd45c0d234 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -12,7 +12,7 @@ use crate::service::Service; use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; use crate::streamer::BlobReceiver; use crate::thin_client::{retry_get_balance, ThinClient}; -use crate::window_service::window_service; +use crate::window_service::WindowService; use rand::thread_rng; use rand::Rng; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; @@ -33,13 +33,12 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::sleep; -use std::thread::JoinHandle; use std::time::{Duration, Instant}; pub struct Replicator { gossip_service: GossipService, fetch_stage: BlobFetchStage, - t_window: JoinHandle<()>, + window_service: WindowService, pub retransmit_receiver: BlobReceiver, exit: Arc, entry_height: u64, @@ -173,11 +172,10 @@ impl Replicator { // todo: pull blobs off the retransmit_receiver and recycle them? let (retransmit_sender, retransmit_receiver) = channel(); - let t_window = window_service( + let window_service = WindowService::new( db_ledger.clone(), cluster_info.clone(), 0, - entry_height, max_entry_height, blob_fetch_receiver, retransmit_sender, @@ -274,7 +272,7 @@ impl Replicator { Ok(Self { gossip_service, fetch_stage, - t_window, + window_service, retransmit_receiver, exit, entry_height, @@ -289,7 +287,7 @@ impl Replicator { pub fn join(self) { self.gossip_service.join().unwrap(); self.fetch_stage.join().unwrap(); - self.t_window.join().unwrap(); + self.window_service.join().unwrap(); // Drain the queue here to prevent self.retransmit_receiver from being dropped // before the window_service thread is joined diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 9a97bba792..0ea73722fb 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -8,7 +8,7 @@ use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::BlobReceiver; -use crate::window_service::window_service; +use crate::window_service::WindowService; use log::Level; use solana_metrics::{influxdb, submit}; use std::net::UdpSocket; @@ -119,16 +119,16 @@ fn retransmitter( pub struct RetransmitStage { thread_hdls: Vec>, + window_service: WindowService, } impl RetransmitStage { - #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] + #[allow(clippy::new_ret_no_self)] pub fn new( bank: &Arc, db_ledger: Arc, cluster_info: &Arc>, tick_height: u64, - entry_height: u64, retransmit_socket: Arc, repair_socket: Arc, fetch_stage_receiver: BlobReceiver, @@ -144,11 +144,10 @@ impl RetransmitStage { retransmit_receiver, ); let done = Arc::new(AtomicBool::new(false)); - let t_window = window_service( + let window_service = WindowService::new( db_ledger, cluster_info.clone(), tick_height, - entry_height, 0, fetch_stage_receiver, retransmit_sender, @@ -158,8 +157,11 @@ impl RetransmitStage { exit, ); - let thread_hdls = vec![t_retransmit, t_window]; - Self { thread_hdls } + let thread_hdls = vec![t_retransmit]; + Self { + thread_hdls, + window_service, + } } } @@ -170,6 +172,7 @@ impl Service for RetransmitStage { for thread_hdl in self.thread_hdls { thread_hdl.join()?; } + self.window_service.join()?; Ok(()) } } diff --git a/src/rpc_mock.rs b/src/rpc_mock.rs index 3a8e0486f0..ceeb0a4aef 100644 --- a/src/rpc_mock.rs +++ b/src/rpc_mock.rs @@ -1,7 +1,7 @@ // Implementation of RpcRequestHandler trait for testing Rpc requests without i/o use crate::rpc_request::{RpcRequest, RpcRequestHandler}; -use serde_json::{self, Number, Value}; +use serde_json::{Number, Value}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; diff --git a/src/tpu.rs b/src/tpu.rs index 43ab32320a..f501f7b2b4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -82,7 +82,7 @@ impl Tpu { transactions_sockets: Vec, broadcast_socket: UdpSocket, cluster_info: Arc>, - entry_height: u64, + blob_index: u64, sigverify_disabled: bool, max_tick_height: u64, last_entry_id: &Hash, @@ -119,7 +119,7 @@ impl Tpu { bank.clone(), broadcast_socket, cluster_info, - entry_height, + blob_index, bank.leader_scheduler.clone(), entry_receiver, max_tick_height, @@ -174,7 +174,7 @@ impl Tpu { cluster_info: Arc>, sigverify_disabled: bool, max_tick_height: u64, - entry_height: u64, + blob_index: u64, last_entry_id: &Hash, leader_id: Pubkey, to_validator_sender: &TpuRotationSender, @@ -215,7 +215,7 @@ impl Tpu { bank.clone(), broadcast_socket, cluster_info, - entry_height, + blob_index, bank.leader_scheduler.clone(), entry_receiver, max_tick_height, diff --git a/src/tvu.rs b/src/tvu.rs index 9614a7cc10..0912d14f53 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -32,7 +32,7 @@ use std::thread; #[derive(Debug, PartialEq, Eq, Clone)] pub enum TvuReturnType { - LeaderRotation(u64, u64, Hash), + LeaderRotation(u64, Hash), } pub type TvuRotationSender = Sender; @@ -45,7 +45,6 @@ pub struct Tvu { storage_stage: StorageStage, exit: Arc, last_entry_id: Arc>, - entry_height: Arc>, } pub struct Sockets { @@ -60,6 +59,7 @@ impl Tvu { /// # Arguments /// * `bank` - The bank state. /// * `entry_height` - Initial ledger height + /// * `blob_index` - Index of last processed blob /// * `last_entry_id` - Hash of the last entry /// * `cluster_info` - The cluster_info state. /// * `sockets` - My fetch, repair, and restransmit sockets @@ -68,6 +68,7 @@ impl Tvu { pub fn new( voting_keypair: Option>, bank: &Arc, + blob_index: u64, entry_height: u64, last_entry_id: Hash, cluster_info: &Arc>, @@ -110,7 +111,6 @@ impl Tvu { db_ledger.clone(), &cluster_info, bank.tick_height(), - entry_height, Arc::new(retransmit_socket), repair_socket, blob_fetch_receiver, @@ -118,7 +118,6 @@ impl Tvu { exit.clone(), ); - let l_entry_height = Arc::new(RwLock::new(entry_height)); let l_last_entry_id = Arc::new(RwLock::new(last_entry_id)); let (replay_stage, ledger_entry_receiver) = ReplayStage::new( @@ -128,7 +127,7 @@ impl Tvu { bank.clone(), cluster_info.clone(), exit.clone(), - l_entry_height.clone(), + blob_index, l_last_entry_id.clone(), to_leader_sender, entry_stream, @@ -155,17 +154,13 @@ impl Tvu { storage_stage, exit, last_entry_id: l_last_entry_id, - entry_height: l_entry_height, }, blob_fetch_sender, ) } - pub fn get_state(&self) -> (Hash, u64) { - ( - *self.last_entry_id.read().unwrap(), - *self.entry_height.read().unwrap(), - ) + pub fn get_state(&self) -> Hash { + *self.last_entry_id.read().unwrap() } pub fn is_exited(&self) -> bool { @@ -260,6 +255,7 @@ pub mod tests { Some(Arc::new(voting_keypair)), &bank, 0, + 0, cur_hash, &cref1, { @@ -346,6 +342,7 @@ pub mod tests { Some(Arc::new(voting_keypair)), &bank, 0, + 0, cur_hash, &cref1, { diff --git a/src/window.rs b/src/window.rs index 2ee2c75686..671e6249ad 100644 --- a/src/window.rs +++ b/src/window.rs @@ -1,14 +1,8 @@ //! The `window` module defines data structure for storing the tail of the ledger. //! -use crate::cluster_info::ClusterInfo; -use crate::counter::Counter; -use crate::leader_scheduler::LeaderScheduler; use crate::packet::SharedBlob; -use log::Level; use solana_sdk::pubkey::Pubkey; use std::cmp; -use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; #[derive(Default, Clone)] @@ -46,18 +40,6 @@ pub trait WindowUtil { fn window_size(&self) -> u64; - fn repair( - &mut self, - cluster_info: &Arc>, - id: &Pubkey, - times: usize, - consumed: u64, - received: u64, - tick_height: u64, - max_entry_height: u64, - leader_scheduler_option: &Arc>, - ) -> Vec<(SocketAddr, Vec)>; - fn print(&self, id: &Pubkey, consumed: u64) -> String; fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool; @@ -116,86 +98,6 @@ impl WindowUtil for Window { self.len() as u64 } - fn repair( - &mut self, - cluster_info: &Arc>, - id: &Pubkey, - times: usize, - consumed: u64, - received: u64, - tick_height: u64, - max_entry_height: u64, - leader_scheduler_option: &Arc>, - ) -> Vec<(SocketAddr, Vec)> { - let rcluster_info = cluster_info.read().unwrap(); - // Check if we are the next next slot leader - let is_next_leader = { - let leader_scheduler = leader_scheduler_option.read().unwrap(); - let next_slot = leader_scheduler.tick_height_to_slot(tick_height) + 1; - match leader_scheduler.get_leader_for_slot(next_slot) { - Some(leader_id) if leader_id == *id => true, - // In the case that we are not in the current scope of the leader schedule - // window then either: - // - // 1) The replay stage hasn't caught up to the "consumed" entries we sent, - // in which case it will eventually catch up - // - // 2) We are on the border between ticks_per_epochs, so the - // schedule won't be known until the entry on that cusp is received - // by the replay stage (which comes after this stage). Hence, the next - // leader at the beginning of that next epoch will not know they are the - // leader until they receive that last "cusp" entry. The leader also won't ask for repairs - // for that entry because "is_next_leader" won't be set here. In this case, - // everybody will be blocking waiting for that "cusp" entry instead of repairing, - // until the leader hits "times" >= the max times in calculate_max_repair(). - // The impact of this, along with the similar problem from broadcast for the transitioning - // leader, can be observed in the multinode test, test_full_leader_validator_network(), - None => false, - _ => false, - } - }; - - let num_peers = rcluster_info.repair_peers().len() as u64; - let max_repair = if max_entry_height == 0 { - calculate_max_repair( - num_peers, - consumed, - received, - times, - is_next_leader, - self.window_size(), - ) - } else { - max_entry_height + 1 - }; - - let idxs = self.clear_slots(consumed, max_repair); - let reqs: Vec<_> = idxs - .into_iter() - .filter_map(|pix| rcluster_info.window_index_request(pix).ok()) - .collect(); - - drop(rcluster_info); - - inc_new_counter_info!("streamer-repair_window-repair", reqs.len()); - - if log_enabled!(Level::Trace) { - trace!( - "{}: repair_window counter times: {} consumed: {} received: {} max_repair: {} missing: {}", - id, - times, - consumed, - received, - max_repair, - reqs.len() - ); - for (to, _) in &reqs { - trace!("{}: repair_window request to {}", id, to); - } - } - reqs - } - fn print(&self, id: &Pubkey, consumed: u64) -> String { let pointer: Vec<_> = self .iter() diff --git a/src/window_service.rs b/src/window_service.rs index 99ada7e8c7..1c5f9a5695 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -4,12 +4,12 @@ use crate::cluster_info::ClusterInfo; use crate::counter::Counter; use crate::db_ledger::DbLedger; use crate::db_window::*; - use crate::leader_scheduler::LeaderScheduler; +use crate::repair_service::RepairService; use crate::result::{Error, Result}; +use crate::service::Service; use crate::streamer::{BlobReceiver, BlobSender}; use log::Level; -use rand::{thread_rng, Rng}; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; @@ -17,7 +17,7 @@ use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; -use std::thread::{Builder, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; pub const MAX_REPAIR_BACKOFF: usize = 128; @@ -27,27 +27,6 @@ pub enum WindowServiceReturnType { LeaderRotation(u64), } -fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool { - //exponential backoff - if *last != consumed { - //start with a 50% chance of asking for repairs - *times = 1; - } - *last = consumed; - *times += 1; - - // Experiment with capping repair request duration. - // Once nodes are too far behind they can spend many - // seconds without asking for repair - if *times > MAX_REPAIR_BACKOFF { - // 50% chance that a request will fire between 64 - 128 tries - *times = MAX_REPAIR_BACKOFF / 2; - } - - //if we get lucky, make the request, which should exponentially get less likely - thread_rng().gen_range(0, *times as u64) == 0 -} - #[allow(clippy::too_many_arguments)] fn recv_window( db_ledger: &Arc, @@ -108,99 +87,98 @@ fn recv_window( Ok(()) } -#[allow(clippy::too_many_arguments)] -pub fn window_service( - db_ledger: Arc, - cluster_info: Arc>, - tick_height: u64, - entry_height: u64, - max_entry_height: u64, - r: BlobReceiver, - retransmit: BlobSender, - repair_socket: Arc, - leader_scheduler: Arc>, - done: Arc, - exit: Arc, -) -> JoinHandle<()> { - Builder::new() - .name("solana-window".to_string()) - .spawn(move || { - let mut tick_height_ = tick_height; - let mut last = entry_height; - let mut times = 0; - let id = cluster_info.read().unwrap().id(); - trace!("{}: RECV_WINDOW started", id); - loop { - if exit.load(Ordering::Relaxed) { - break; - } - if let Err(e) = recv_window( - &db_ledger, - &id, - &leader_scheduler, - &mut tick_height_, - max_entry_height, - &r, - &retransmit, - &done, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => { - inc_new_counter_info!("streamer-window-error", 1, 1); - error!("window error: {:?}", e); - } +// Implement a destructor for the window_service thread to signal it exited +// even on panics +struct Finalizer { + exit_sender: Arc, +} + +impl Finalizer { + fn new(exit_sender: Arc) -> Self { + Finalizer { exit_sender } + } +} +// Implement a destructor for Finalizer. +impl Drop for Finalizer { + fn drop(&mut self) { + self.exit_sender.clone().store(true, Ordering::Relaxed); + } +} + +pub struct WindowService { + t_window: JoinHandle<()>, + repair_service: RepairService, +} + +impl WindowService { + #[allow(clippy::too_many_arguments)] + pub fn new( + db_ledger: Arc, + cluster_info: Arc>, + tick_height: u64, + max_entry_height: u64, + r: BlobReceiver, + retransmit: BlobSender, + repair_socket: Arc, + leader_scheduler: Arc>, + done: Arc, + exit: Arc, + ) -> WindowService { + let exit_ = exit.clone(); + let repair_service = RepairService::new( + db_ledger.clone(), + exit.clone(), + repair_socket, + cluster_info.clone(), + ); + let t_window = Builder::new() + .name("solana-window".to_string()) + .spawn(move || { + let _exit = Finalizer::new(exit_); + let mut tick_height_ = tick_height; + let id = cluster_info.read().unwrap().id(); + trace!("{}: RECV_WINDOW started", id); + loop { + if exit.load(Ordering::Relaxed) { + break; } - } - - let meta = db_ledger.meta(); - - if let Ok(Some(meta)) = meta { - let received = meta.received; - let consumed = meta.consumed; - - // Consumed should never be bigger than received - assert!(consumed <= received); - if received == consumed { - trace!( - "{} we have everything received: {} consumed: {}", - id, - received, - consumed - ); - continue; - } - - //exponential backoff - if !repair_backoff(&mut last, &mut times, consumed) { - trace!("{} !repair_backoff() times = {}", id, times); - continue; - } - trace!("{} let's repair! times = {}", id, times); - - let reqs = repair( + if let Err(e) = recv_window( &db_ledger, - &cluster_info, &id, - times, - tick_height_, - max_entry_height, &leader_scheduler, - ); - - if let Ok(reqs) = reqs { - for (to, req) in reqs { - repair_socket.send_to(&req, to).unwrap_or_else(|e| { - info!("{} repair req send_to({}) error {:?}", id, to, e); - 0 - }); + &mut tick_height_, + max_entry_height, + &r, + &retransmit, + &done, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + inc_new_counter_info!("streamer-window-error", 1, 1); + error!("window error: {:?}", e); + } } } } - } - }) - .unwrap() + }) + .unwrap(); + + WindowService { + t_window, + repair_service, + } + } +} + +impl Service for WindowService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.t_window.join()?; + self.repair_service.join() + } } #[cfg(test)] @@ -210,9 +188,9 @@ mod test { use crate::db_ledger::DbLedger; use crate::entry::make_consecutive_blobs; use crate::leader_scheduler::LeaderScheduler; - + use crate::service::Service; use crate::streamer::{blob_receiver, responder}; - use crate::window_service::{repair_backoff, window_service}; + use crate::window_service::WindowService; use solana_sdk::hash::Hash; use std::fs::remove_dir_all; use std::net::UdpSocket; @@ -246,12 +224,11 @@ mod test { ); let mut leader_schedule = LeaderScheduler::default(); leader_schedule.set_leader_schedule(vec![me_id]); - let t_window = window_service( + let t_window = WindowService::new( db_ledger, subs, 0, 0, - 0, r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), @@ -328,12 +305,11 @@ mod test { ); let mut leader_schedule = LeaderScheduler::default(); leader_schedule.set_leader_schedule(vec![me_id]); - let t_window = window_service( + let t_window = WindowService::new( db_ledger, subs.clone(), 0, 0, - 0, r_reader, s_retransmit, Arc::new(leader_node.sockets.repair), @@ -382,33 +358,4 @@ mod test { DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction"); let _ignored = remove_dir_all(&db_ledger_path); } - - #[test] - pub fn test_repair_backoff() { - let num_tests = 100; - let res: usize = (0..num_tests) - .map(|_| { - let mut last = 0; - let mut times = 0; - let total: usize = (0..127) - .map(|x| { - let rv = repair_backoff(&mut last, &mut times, 1) as usize; - assert_eq!(times, x + 2); - rv - }) - .sum(); - assert_eq!(times, 128); - assert_eq!(last, 1); - repair_backoff(&mut last, &mut times, 1); - assert_eq!(times, 64); - repair_backoff(&mut last, &mut times, 2); - assert_eq!(times, 2); - assert_eq!(last, 2); - total - }) - .sum(); - let avg = res / num_tests; - assert!(avg >= 3); - assert!(avg <= 5); - } } diff --git a/tests/multinode.rs b/tests/multinode.rs index c03355e67d..cbdcf21650 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -3,7 +3,7 @@ use solana::blob_fetch_stage::BlobFetchStage; use solana::client::mk_client; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::db_ledger::{create_tmp_sample_ledger, tmp_copy_ledger}; -use solana::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; +use solana::db_ledger::{DbLedger, DbLedgerConfig, DEFAULT_SLOT_HEIGHT}; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{new_bank_from_ledger, Fullnode, FullnodeConfig, FullnodeReturnType}; use solana::gossip_service::GossipService; @@ -728,10 +728,11 @@ fn test_multi_node_dynamic_network() { let mut ledger_paths = Vec::new(); ledger_paths.push(genesis_ledger_path.clone()); + let leader_ledger_path = tmp_copy_ledger(&genesis_ledger_path, "multi_node_dynamic_network"); + let alice_arc = Arc::new(RwLock::new(alice)); let leader_data = leader.info.clone(); - let leader_ledger_path = tmp_copy_ledger(&genesis_ledger_path, "multi_node_dynamic_network"); ledger_paths.push(leader_ledger_path.clone()); let voting_keypair = VotingKeypair::new_local(&leader_keypair); let server = Fullnode::new( @@ -926,14 +927,14 @@ fn test_leader_to_validator_transition() { let mut fullnode_config = FullnodeConfig::default(); let ticks_per_slot = 5; - fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new( + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( ticks_per_slot, 1, // Setup window length to exclude the genesis bootstrap leader vote at tick height 0, so // that when the leader schedule is recomputed for epoch 1 only the validator vote at tick // height 1 will be considered. ticks_per_slot, - ); + )); // Initialize the leader ledger. Make a mint and a genesis entry // in the leader ledger @@ -1007,7 +1008,12 @@ fn test_leader_to_validator_transition() { leader_exit(); info!("Check the ledger to make sure it's the right height..."); - let bank = new_bank_from_ledger(&leader_ledger_path, &LeaderSchedulerConfig::default()).0; + let bank = new_bank_from_ledger( + &leader_ledger_path, + DbLedgerConfig::default(), + &LeaderSchedulerConfig::default(), + ) + .0; assert_eq!( bank.tick_height(), @@ -1072,11 +1078,11 @@ fn test_leader_validator_basic() { // Create the leader scheduler config let mut fullnode_config = FullnodeConfig::default(); let ticks_per_slot = 5; - fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new( + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( ticks_per_slot, 1, // 1 slot per epoch ticks_per_slot, - ); + )); // Start the validator node let voting_keypair = VotingKeypair::new_local(&validator_keypair); @@ -1171,8 +1177,11 @@ fn test_dropped_handoff_recovery() { let ticks_per_slot = 5; let ticks_per_epoch = slots_per_epoch * ticks_per_slot; let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.leader_scheduler_config = - LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, ticks_per_epoch); + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( + ticks_per_slot, + slots_per_epoch, + ticks_per_epoch, + )); // Make a common mint and a genesis entry for both leader + validator's ledgers let num_ending_ticks = 1; @@ -1385,12 +1394,16 @@ fn test_full_leader_validator_network() { let ticks_per_slot = 5; let ticks_per_epoch = slots_per_epoch * ticks_per_slot; let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.leader_scheduler_config = - LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, ticks_per_epoch); + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( + ticks_per_slot, + slots_per_epoch, + ticks_per_epoch * 3, + )); let mut nodes = vec![]; info!("Start up the validators"); + // Start up the validators for kp in node_keypairs.into_iter() { let validator_ledger_path = tmp_copy_ledger( &bootstrap_leader_ledger_path, @@ -1579,8 +1592,11 @@ fn test_broadcast_last_tick() { let bootstrap_leader_keypair = Arc::new(bootstrap_leader_keypair); let voting_keypair = VotingKeypair::new_local(&bootstrap_leader_keypair); let mut fullnode_config = FullnodeConfig::default(); - fullnode_config.leader_scheduler_config = - LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, ticks_per_epoch); + fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new( + ticks_per_slot, + slots_per_epoch, + ticks_per_epoch, + )); let bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_keypair, diff --git a/tests/replicator.rs b/tests/replicator.rs index e88103632d..c101ea7659 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -9,7 +9,9 @@ use bincode::deserialize; use solana::client::mk_client; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::db_ledger::DbLedger; -use solana::db_ledger::{create_tmp_sample_ledger, get_tmp_ledger_path, tmp_copy_ledger}; +use solana::db_ledger::{ + create_tmp_sample_ledger, get_tmp_ledger_path, tmp_copy_ledger, DEFAULT_SLOT_HEIGHT, +}; use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeConfig}; use solana::replicator::Replicator; @@ -148,7 +150,7 @@ fn test_replicator_startup_basic() { let cluster_info = ClusterInfo::new(tn.info.clone()); let repair_index = replicator.entry_height(); let req = cluster_info - .window_index_request_bytes(repair_index) + .window_index_request_bytes(DEFAULT_SLOT_HEIGHT, repair_index) .unwrap(); let exit = Arc::new(AtomicBool::new(false));