From 137233b4a1682ef178ff5e7e1a25d76fe8637f6f Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 25 Feb 2019 15:33:42 -0700 Subject: [PATCH] Add EntryMeta wrapper --- src/blockstream.rs | 68 +++++++------------------ src/blockstream_service.rs | 102 +++++++++++++++++++------------------ src/entry.rs | 27 +++++++++- src/replay_stage.rs | 35 +++++++++++-- src/storage_stage.rs | 30 ++++++++--- src/tvu.rs | 2 - 6 files changed, 149 insertions(+), 115 deletions(-) diff --git a/src/blockstream.rs b/src/blockstream.rs index 197bda383a..3d61cb2429 100644 --- a/src/blockstream.rs +++ b/src/blockstream.rs @@ -3,16 +3,15 @@ //! real-time access to entries. use crate::entry::Entry; -use crate::leader_scheduler::LeaderScheduler; use crate::result::Result; use chrono::{SecondsFormat, Utc}; use solana_sdk::hash::Hash; +use solana_sdk::pubkey::Pubkey; use std::cell::RefCell; use std::io::prelude::*; use std::net::Shutdown; use std::os::unix::net::UnixStream; use std::path::Path; -use std::sync::{Arc, RwLock}; pub trait EntryWriter: std::fmt::Debug { fn write(&self, payload: String) -> Result<()>; @@ -64,14 +63,14 @@ pub trait BlockstreamEvents { &self, slot: u64, tick_height: u64, - leader_id: &str, + leader_id: Pubkey, entries: &Entry, ) -> Result<()>; fn emit_block_event( &self, slot: u64, tick_height: u64, - leader_id: &str, + leader_id: Pubkey, last_id: Hash, ) -> Result<()>; } @@ -79,7 +78,6 @@ pub trait BlockstreamEvents { #[derive(Debug)] pub struct Blockstream { pub output: T, - pub leader_scheduler: Arc>, pub queued_block: Option, } @@ -91,12 +89,12 @@ where &self, slot: u64, tick_height: u64, - leader_id: &str, + leader_id: Pubkey, entry: &Entry, ) -> Result<()> { let json_entry = serde_json::to_string(&entry)?; let payload = format!( - r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":{:?},"entry":{}}}"#, + r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":"{:?}","entry":{}}}"#, Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), slot, tick_height, @@ -111,11 +109,11 @@ where &self, slot: u64, tick_height: u64, - leader_id: &str, + leader_id: Pubkey, last_id: Hash, ) -> Result<()> { let payload = format!( - r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":{:?},"id":"{:?}"}}"#, + r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":"{:?}","id":"{:?}"}}"#, Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true), slot, tick_height, @@ -130,10 +128,9 @@ where pub type SocketBlockstream = Blockstream; impl SocketBlockstream { - pub fn new(socket: String, leader_scheduler: Arc>) -> Self { + pub fn new(socket: String) -> Self { Blockstream { output: EntrySocket { socket }, - leader_scheduler, queued_block: None, } } @@ -142,10 +139,9 @@ impl SocketBlockstream { pub type MockBlockstream = Blockstream; impl MockBlockstream { - pub fn new(_: String, leader_scheduler: Arc>) -> Self { + pub fn new(_: String) -> Self { Blockstream { output: EntryVec::new(), - leader_scheduler, queued_block: None, } } @@ -160,6 +156,7 @@ pub struct BlockData { pub slot: u64, pub tick_height: u64, pub id: Hash, + pub leader_id: Pubkey, } #[cfg(test)] @@ -168,25 +165,14 @@ mod test { use crate::entry::Entry; use chrono::{DateTime, FixedOffset}; use serde_json::Value; - use solana_runtime::bank::Bank; - use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; use std::collections::HashSet; #[test] fn test_blockstream() -> () { - // Set up bank and leader_scheduler - let (mut genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); - genesis_block.ticks_per_slot = 5; - genesis_block.slots_per_epoch = 2; - - let bank = Bank::new(&genesis_block); - let leader_scheduler = LeaderScheduler::new_with_bank(&bank); - let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); - - // Set up blockstream - let blockstream = MockBlockstream::new("test_stream".to_string(), leader_scheduler.clone()); - let ticks_per_slot = bank.ticks_per_slot(); + let blockstream = MockBlockstream::new("test_stream".to_string()); + let ticks_per_slot = 5; let mut last_id = Hash::default(); let mut entries = Vec::new(); @@ -194,36 +180,20 @@ mod test { let tick_height_initial = 0; let tick_height_final = tick_height_initial + ticks_per_slot + 2; - let mut previous_slot = leader_scheduler - .read() - .unwrap() - .tick_height_to_slot(tick_height_initial); - let leader_id = leader_scheduler - .read() - .unwrap() - .get_leader_for_slot(previous_slot) - .map(|leader| leader.to_string()) - .unwrap_or_else(|| "None".to_string()); + let mut curr_slot = 0; + let leader_id = Keypair::new().pubkey(); for tick_height in tick_height_initial..=tick_height_final { - leader_scheduler - .write() - .unwrap() - .update_tick_height(tick_height, &bank); - let curr_slot = leader_scheduler - .read() - .unwrap() - .tick_height_to_slot(tick_height); - if curr_slot != previous_slot { + if tick_height == 5 { blockstream - .emit_block_event(previous_slot, tick_height - 1, &leader_id, last_id) + .emit_block_event(curr_slot, tick_height - 1, leader_id, last_id) .unwrap(); + curr_slot += 1; } let entry = Entry::new(&mut last_id, 1, vec![]); // just ticks last_id = entry.id; - previous_slot = curr_slot; blockstream - .emit_entry_event(curr_slot, tick_height, &leader_id, &entry) + .emit_entry_event(curr_slot, tick_height, leader_id, &entry) .unwrap(); expected_entries.push(entry.clone()); entries.push(entry); diff --git a/src/blockstream_service.rs b/src/blockstream_service.rs index 582d6c4656..700fef0ef1 100644 --- a/src/blockstream_service.rs +++ b/src/blockstream_service.rs @@ -8,12 +8,11 @@ use crate::blockstream::MockBlockstream as Blockstream; use crate::blockstream::SocketBlockstream as Blockstream; use crate::blockstream::{BlockData, BlockstreamEvents}; use crate::entry::{EntryReceiver, EntrySender}; -use crate::leader_scheduler::LeaderScheduler; use crate::result::{Error, Result}; use crate::service::Service; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, RecvTimeoutError}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; @@ -26,12 +25,10 @@ impl BlockstreamService { pub fn new( ledger_entry_receiver: EntryReceiver, blockstream_socket: String, - mut tick_height: u64, - leader_scheduler: Arc>, exit: Arc, ) -> (Self, EntryReceiver) { let (blockstream_sender, blockstream_receiver) = channel(); - let mut blockstream = Blockstream::new(blockstream_socket, leader_scheduler); + let mut blockstream = Blockstream::new(blockstream_socket); let t_blockstream = Builder::new() .name("solana-blockstream".to_string()) .spawn(move || loop { @@ -41,7 +38,6 @@ impl BlockstreamService { if let Err(e) = Self::process_entries( &ledger_entry_receiver, &blockstream_sender, - &mut tick_height, &mut blockstream, ) { match e { @@ -57,50 +53,46 @@ impl BlockstreamService { fn process_entries( ledger_entry_receiver: &EntryReceiver, blockstream_sender: &EntrySender, - tick_height: &mut u64, blockstream: &mut Blockstream, ) -> Result<()> { let timeout = Duration::new(1, 0); - let entries = ledger_entry_receiver.recv_timeout(timeout)?; - let leader_scheduler = blockstream.leader_scheduler.read().unwrap(); + let entries_with_meta = ledger_entry_receiver.recv_timeout(timeout)?; - for entry in &entries { - if entry.is_tick() { - *tick_height += 1 - } - let slot = leader_scheduler.tick_height_to_slot(*tick_height); - let leader_id = leader_scheduler - .get_leader_for_slot(slot) - .map(|leader| leader.to_string()) - .unwrap_or_else(|| "None".to_string()); - - if entry.is_tick() && blockstream.queued_block.is_some() { + for entry_meta in &entries_with_meta { + if entry_meta.entry.is_tick() && blockstream.queued_block.is_some() { let queued_block = blockstream.queued_block.as_ref(); let block_slot = queued_block.unwrap().slot; let block_tick_height = queued_block.unwrap().tick_height; let block_id = queued_block.unwrap().id; + let block_leader = queued_block.unwrap().leader_id; blockstream - .emit_block_event(block_slot, block_tick_height, &leader_id, block_id) + .emit_block_event(block_slot, block_tick_height, block_leader, block_id) .unwrap_or_else(|e| { debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); }); blockstream.queued_block = None; } blockstream - .emit_entry_event(slot, *tick_height, &leader_id, &entry) + .emit_entry_event( + entry_meta.slot, + entry_meta.tick_height, + entry_meta.slot_leader, + &entry_meta.entry, + ) .unwrap_or_else(|e| { debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); }); - if 0 == leader_scheduler.num_ticks_left_in_slot(*tick_height) { + if 0 == entry_meta.num_ticks_left_in_slot { blockstream.queued_block = Some(BlockData { - slot, - tick_height: *tick_height, - id: entry.id, + slot: entry_meta.slot, + tick_height: entry_meta.tick_height, + id: entry_meta.entry.id, + leader_id: entry_meta.slot_leader, }); } } - blockstream_sender.send(entries)?; + blockstream_sender.send(entries_with_meta)?; Ok(()) } } @@ -116,31 +108,20 @@ impl Service for BlockstreamService { #[cfg(test)] mod test { use super::*; - use crate::entry::Entry; + use crate::entry::{Entry, EntryMeta}; use chrono::{DateTime, FixedOffset}; use serde_json::Value; - use solana_runtime::bank::Bank; - use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; #[test] - fn test_blockstream_stage_process_entries() { - // Set up the bank and leader_scheduler + fn test_blockstream_service_process_entries() { let ticks_per_slot = 5; - let starting_tick_height = 1; - - let (mut genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000); - genesis_block.ticks_per_slot = ticks_per_slot; - genesis_block.slots_per_epoch = 2; - - let bank = Bank::new(&genesis_block); - let leader_scheduler = LeaderScheduler::new_with_bank(&bank); - let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + let leader_id = Keypair::new().pubkey(); // Set up blockstream - let mut blockstream = Blockstream::new("test_stream".to_string(), leader_scheduler.clone()); + let mut blockstream = Blockstream::new("test_stream".to_string()); // Set up dummy channels to host an BlockstreamService let (ledger_entry_sender, ledger_entry_receiver) = channel(); @@ -153,25 +134,46 @@ mod test { for x in 0..6 { let entry = Entry::new(&mut last_id, 1, vec![]); //just ticks last_id = entry.id; - expected_entries.push(entry.clone()); - expected_tick_heights.push(starting_tick_height + x); - entries.push(entry); + let slot_height = x / ticks_per_slot; + let parent_slot = if slot_height > 0 { + Some(slot_height - 1) + } else { + None + }; + let entry_meta = EntryMeta { + tick_height: x, + slot: slot_height, + slot_leader: leader_id, + num_ticks_left_in_slot: ticks_per_slot - ((x + 1) % ticks_per_slot), + parent_slot, + entry, + }; + expected_entries.push(entry_meta.clone()); + expected_tick_heights.push(x); + entries.push(entry_meta); } let keypair = Keypair::new(); let tx = SystemTransaction::new_account(&keypair, keypair.pubkey(), 1, Hash::default(), 0); let entry = Entry::new(&mut last_id, 1, vec![tx]); - expected_entries.insert(ticks_per_slot as usize, entry.clone()); + let entry_meta = EntryMeta { + tick_height: ticks_per_slot - 1, + slot: 0, + slot_leader: leader_id, + num_ticks_left_in_slot: 0, + parent_slot: None, + entry, + }; + expected_entries.insert(ticks_per_slot as usize, entry_meta.clone()); expected_tick_heights.insert( ticks_per_slot as usize, - starting_tick_height + ticks_per_slot - 1, // Populated entries should share the tick height of the previous tick. + ticks_per_slot - 1, // Populated entries should share the tick height of the previous tick. ); - entries.insert(ticks_per_slot as usize, entry); + entries.insert(ticks_per_slot as usize, entry_meta); ledger_entry_sender.send(entries).unwrap(); BlockstreamService::process_entries( &ledger_entry_receiver, &blockstream_sender, - &mut (starting_tick_height - 1), &mut blockstream, ) .unwrap(); @@ -201,7 +203,7 @@ mod test { // `serde_json::from_str` does not work for populated Entries. // Remove this `if` when fixed. let entry: Entry = serde_json::from_value(entry_obj).unwrap(); - assert_eq!(entry, expected_entries[i]); + assert_eq!(entry, expected_entries[i].entry); } } for json in block_events { diff --git a/src/entry.rs b/src/entry.rs index 50fcf4b4ea..4d75bcb9f2 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -21,8 +21,31 @@ use std::mem::size_of; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, RwLock}; -pub type EntrySender = Sender>; -pub type EntryReceiver = Receiver>; +pub type EntrySender = Sender>; +pub type EntryReceiver = Receiver>; + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct EntryMeta { + pub tick_height: u64, + pub slot: u64, + pub slot_leader: Pubkey, + pub num_ticks_left_in_slot: u64, + pub parent_slot: Option, + pub entry: Entry, +} + +impl EntryMeta { + pub fn default_with_entry(entry: &Entry) -> Self { + EntryMeta { + tick_height: 0, + slot: 0, + slot_leader: Pubkey::default(), + num_ticks_left_in_slot: 0, + parent_slot: None, + entry: entry.clone(), + } + } +} /// Each Entry contains three pieces of data. The `num_hashes` field is the number /// of hashes performed since the previous entry. The `id` field is the result diff --git a/src/replay_stage.rs b/src/replay_stage.rs index e7f74cbd7e..95117d5be4 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -4,7 +4,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::blocktree_processor::{self, BankForksInfo}; use crate::cluster_info::ClusterInfo; -use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; +use crate::entry::{Entry, EntryMeta, EntryReceiver, EntrySender, EntrySlice}; use crate::leader_scheduler::LeaderScheduler; use crate::packet::BlobError; use crate::result::{Error, Result}; @@ -65,6 +65,8 @@ impl ReplayStage { last_entry_id: &Arc>, leader_scheduler: &Arc>, subscriptions: &Arc, + slot: u64, + parent_slot: Option, ) -> Result<()> { // Coalesce all the available entries into a single vote submit( @@ -95,9 +97,12 @@ impl ReplayStage { ) }; + let mut entry_tick_height = num_ticks; + let mut entries_with_meta = Vec::new(); for (i, entry) in entries.iter().enumerate() { inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize); if entry.is_tick() { + entry_tick_height += 1; if num_ticks_to_next_vote == 0 { num_ticks_to_next_vote = bank.ticks_per_slot(); } @@ -107,6 +112,14 @@ impl ReplayStage { "replicate-stage_tick-to-vote", num_ticks_to_next_vote as usize ); + entries_with_meta.push(EntryMeta { + tick_height: entry_tick_height, + slot, + slot_leader: bank.slot_leader(), + num_ticks_left_in_slot: num_ticks_to_next_vote, + parent_slot, + entry: entry.clone(), + }); // If it's the last entry in the vector, i will be vec len - 1. // If we don't process the entry now, the for loop will exit and the entry // will be dropped. @@ -140,6 +153,7 @@ impl ReplayStage { // If leader rotation happened, only write the entries up to leader rotation. entries.truncate(num_entries_to_write); + entries_with_meta.truncate(num_entries_to_write); *last_entry_id.write().unwrap() = entries .last() .expect("Entries cannot be empty at this point") @@ -155,7 +169,7 @@ impl ReplayStage { // an error occurred processing one of the entries (causing the rest of the entries to // not be processed). if entries_len != 0 { - ledger_entry_sender.send(entries)?; + ledger_entry_sender.send(entries_with_meta)?; } *current_blob_index += entries_len; @@ -313,6 +327,7 @@ impl ReplayStage { vec![] } }; + let parent_slot = blocktree.meta(slot).unwrap().map(|meta| meta.parent_slot); if !entries.is_empty() { if let Err(e) = Self::process_entries( @@ -325,6 +340,8 @@ impl ReplayStage { &last_entry_id, &leader_scheduler_, &subscriptions_, + slot, + parent_slot, ) { error!("{} process_entries failed: {:?}", my_id, e); } @@ -593,7 +610,11 @@ mod test { while let Ok(entries) = ledger_writer_recv.try_recv() { received_ticks.extend(entries); } - assert_eq!(&received_ticks[..], &entries_to_send[..]); + let received_ticks_entries: Vec = received_ticks + .iter() + .map(|entry_meta| entry_meta.entry.clone()) + .collect(); + assert_eq!(&received_ticks_entries[..], &entries_to_send[..]); // Replay stage should continue running even after rotation has happened (tvu never goes down) assert_eq!(exit.load(Ordering::Relaxed), false); @@ -672,7 +693,7 @@ mod test { .recv() .expect("Expected to receive an entry on the ledger writer receiver"); - assert_eq!(next_tick, received_tick); + assert_eq!(next_tick[0], received_tick[0].entry); replay_stage .close() @@ -791,7 +812,7 @@ mod test { let received_entry = ledger_writer_recv .recv() .expect("Expected to recieve an entry on the ledger writer receiver"); - assert_eq!(received_entry[0], entry); + assert_eq!(received_entry[0].entry, entry); if i == leader_rotation_index { expected_last_id = entry.id; @@ -851,6 +872,8 @@ mod test { &Arc::new(RwLock::new(last_entry_id)), &leader_scheduler, &Arc::new(RpcSubscriptions::default()), + 0, + None, ); match res { @@ -877,6 +900,8 @@ mod test { &Arc::new(RwLock::new(last_entry_id)), &leader_scheduler, &Arc::new(RpcSubscriptions::default()), + 0, + None, ); match res { diff --git a/src/storage_stage.rs b/src/storage_stage.rs index 25e9ac450f..06f5451c48 100644 --- a/src/storage_stage.rs +++ b/src/storage_stage.rs @@ -7,7 +7,7 @@ use crate::blocktree::Blocktree; use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use crate::client::mk_client_with_timeout; use crate::cluster_info::ClusterInfo; -use crate::entry::EntryReceiver; +use crate::entry::{Entry, EntryReceiver}; use crate::result::{Error, Result}; use crate::service::Service; use bincode::deserialize; @@ -362,7 +362,11 @@ impl StorageStage { tx_sender: &TransactionSender, ) -> Result<()> { let timeout = Duration::new(1, 0); - let entries = entry_receiver.recv_timeout(timeout)?; + let entries: Vec = entry_receiver + .recv_timeout(timeout)? + .iter() + .map(|entry_meta| entry_meta.entry.clone()) + .collect(); for entry in entries { // Go through the transactions, find votes, and use them to update // the storage_keys with their signatures. @@ -446,7 +450,7 @@ impl Service for StorageStage { mod tests { use crate::blocktree::{create_tmp_sample_blocktree, Blocktree}; use crate::cluster_info::{ClusterInfo, NodeInfo}; - use crate::entry::{make_tiny_test_entries, Entry}; + use crate::entry::{make_tiny_test_entries, Entry, EntryMeta}; use crate::service::Service; use crate::storage_stage::StorageState; use crate::storage_stage::NUM_IDENTITIES; @@ -528,7 +532,11 @@ mod tests { STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - storage_entry_sender.send(entries.clone()).unwrap(); + let entries_meta: Vec = entries + .iter() + .map(|entry| EntryMeta::default_with_entry(entry)) + .collect(); + storage_entry_sender.send(entries_meta.clone()).unwrap(); let keypair = Keypair::new(); let hash = Hash::default(); @@ -537,7 +545,7 @@ mod tests { assert_eq!(result, Hash::default()); for _ in 0..9 { - storage_entry_sender.send(entries.clone()).unwrap(); + storage_entry_sender.send(entries_meta.clone()).unwrap(); } for _ in 0..5 { result = storage_state.get_mining_result(&signature); @@ -593,7 +601,11 @@ mod tests { STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - storage_entry_sender.send(entries.clone()).unwrap(); + let entries_meta: Vec = entries + .iter() + .map(|entry| EntryMeta::default_with_entry(entry)) + .collect(); + storage_entry_sender.send(entries_meta.clone()).unwrap(); let mut reference_keys; { @@ -605,7 +617,11 @@ mod tests { let keypair = Keypair::new(); let vote_tx = VoteTransaction::new_vote(&keypair, 123456, Hash::default(), 1); vote_txs.push(vote_tx); - let vote_entries = vec![Entry::new(&Hash::default(), 1, vote_txs)]; + let vote_entries = vec![EntryMeta::default_with_entry(&Entry::new( + &Hash::default(), + 1, + vote_txs, + ))]; storage_entry_sender.send(vote_entries).unwrap(); for _ in 0..5 { diff --git a/src/tvu.rs b/src/tvu.rs index 0552885b54..900e94195a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -138,8 +138,6 @@ impl Tvu { let (blockstream_service, blockstream_receiver) = BlockstreamService::new( previous_receiver, blockstream.unwrap().to_string(), - bank_forks.read().unwrap().working_bank().tick_height(), // TODO: BlockstreamService needs to deal with BankForks somehow still - leader_scheduler, exit.clone(), ); previous_receiver = blockstream_receiver;