Add EntryMeta wrapper

This commit is contained in:
Tyera Eulberg 2019-02-25 15:33:42 -07:00 committed by Tyera Eulberg
parent 3897b66270
commit 137233b4a1
6 changed files with 149 additions and 115 deletions

View File

@ -3,16 +3,15 @@
//! real-time access to entries.
use crate::entry::Entry;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::Result;
use chrono::{SecondsFormat, Utc};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::cell::RefCell;
use std::io::prelude::*;
use std::net::Shutdown;
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::sync::{Arc, RwLock};
pub trait EntryWriter: std::fmt::Debug {
fn write(&self, payload: String) -> Result<()>;
@ -64,14 +63,14 @@ pub trait BlockstreamEvents {
&self,
slot: u64,
tick_height: u64,
leader_id: &str,
leader_id: Pubkey,
entries: &Entry,
) -> Result<()>;
fn emit_block_event(
&self,
slot: u64,
tick_height: u64,
leader_id: &str,
leader_id: Pubkey,
last_id: Hash,
) -> Result<()>;
}
@ -79,7 +78,6 @@ pub trait BlockstreamEvents {
#[derive(Debug)]
pub struct Blockstream<T: EntryWriter> {
pub output: T,
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
pub queued_block: Option<BlockData>,
}
@ -91,12 +89,12 @@ where
&self,
slot: u64,
tick_height: u64,
leader_id: &str,
leader_id: Pubkey,
entry: &Entry,
) -> Result<()> {
let json_entry = serde_json::to_string(&entry)?;
let payload = format!(
r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":{:?},"entry":{}}}"#,
r#"{{"dt":"{}","t":"entry","s":{},"h":{},"l":"{:?}","entry":{}}}"#,
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
slot,
tick_height,
@ -111,11 +109,11 @@ where
&self,
slot: u64,
tick_height: u64,
leader_id: &str,
leader_id: Pubkey,
last_id: Hash,
) -> Result<()> {
let payload = format!(
r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":{:?},"id":"{:?}"}}"#,
r#"{{"dt":"{}","t":"block","s":{},"h":{},"l":"{:?}","id":"{:?}"}}"#,
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
slot,
tick_height,
@ -130,10 +128,9 @@ where
pub type SocketBlockstream = Blockstream<EntrySocket>;
impl SocketBlockstream {
pub fn new(socket: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
pub fn new(socket: String) -> Self {
Blockstream {
output: EntrySocket { socket },
leader_scheduler,
queued_block: None,
}
}
@ -142,10 +139,9 @@ impl SocketBlockstream {
pub type MockBlockstream = Blockstream<EntryVec>;
impl MockBlockstream {
pub fn new(_: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
pub fn new(_: String) -> Self {
Blockstream {
output: EntryVec::new(),
leader_scheduler,
queued_block: None,
}
}
@ -160,6 +156,7 @@ pub struct BlockData {
pub slot: u64,
pub tick_height: u64,
pub id: Hash,
pub leader_id: Pubkey,
}
#[cfg(test)]
@ -168,25 +165,14 @@ mod test {
use crate::entry::Entry;
use chrono::{DateTime, FixedOffset};
use serde_json::Value;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::HashSet;
#[test]
fn test_blockstream() -> () {
// Set up bank and leader_scheduler
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000);
genesis_block.ticks_per_slot = 5;
genesis_block.slots_per_epoch = 2;
let bank = Bank::new(&genesis_block);
let leader_scheduler = LeaderScheduler::new_with_bank(&bank);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
// Set up blockstream
let blockstream = MockBlockstream::new("test_stream".to_string(), leader_scheduler.clone());
let ticks_per_slot = bank.ticks_per_slot();
let blockstream = MockBlockstream::new("test_stream".to_string());
let ticks_per_slot = 5;
let mut last_id = Hash::default();
let mut entries = Vec::new();
@ -194,36 +180,20 @@ mod test {
let tick_height_initial = 0;
let tick_height_final = tick_height_initial + ticks_per_slot + 2;
let mut previous_slot = leader_scheduler
.read()
.unwrap()
.tick_height_to_slot(tick_height_initial);
let leader_id = leader_scheduler
.read()
.unwrap()
.get_leader_for_slot(previous_slot)
.map(|leader| leader.to_string())
.unwrap_or_else(|| "None".to_string());
let mut curr_slot = 0;
let leader_id = Keypair::new().pubkey();
for tick_height in tick_height_initial..=tick_height_final {
leader_scheduler
.write()
.unwrap()
.update_tick_height(tick_height, &bank);
let curr_slot = leader_scheduler
.read()
.unwrap()
.tick_height_to_slot(tick_height);
if curr_slot != previous_slot {
if tick_height == 5 {
blockstream
.emit_block_event(previous_slot, tick_height - 1, &leader_id, last_id)
.emit_block_event(curr_slot, tick_height - 1, leader_id, last_id)
.unwrap();
curr_slot += 1;
}
let entry = Entry::new(&mut last_id, 1, vec![]); // just ticks
last_id = entry.id;
previous_slot = curr_slot;
blockstream
.emit_entry_event(curr_slot, tick_height, &leader_id, &entry)
.emit_entry_event(curr_slot, tick_height, leader_id, &entry)
.unwrap();
expected_entries.push(entry.clone());
entries.push(entry);

View File

@ -8,12 +8,11 @@ use crate::blockstream::MockBlockstream as Blockstream;
use crate::blockstream::SocketBlockstream as Blockstream;
use crate::blockstream::{BlockData, BlockstreamEvents};
use crate::entry::{EntryReceiver, EntrySender};
use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
use crate::service::Service;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
@ -26,12 +25,10 @@ impl BlockstreamService {
pub fn new(
ledger_entry_receiver: EntryReceiver,
blockstream_socket: String,
mut tick_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, EntryReceiver) {
let (blockstream_sender, blockstream_receiver) = channel();
let mut blockstream = Blockstream::new(blockstream_socket, leader_scheduler);
let mut blockstream = Blockstream::new(blockstream_socket);
let t_blockstream = Builder::new()
.name("solana-blockstream".to_string())
.spawn(move || loop {
@ -41,7 +38,6 @@ impl BlockstreamService {
if let Err(e) = Self::process_entries(
&ledger_entry_receiver,
&blockstream_sender,
&mut tick_height,
&mut blockstream,
) {
match e {
@ -57,50 +53,46 @@ impl BlockstreamService {
fn process_entries(
ledger_entry_receiver: &EntryReceiver,
blockstream_sender: &EntrySender,
tick_height: &mut u64,
blockstream: &mut Blockstream,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries = ledger_entry_receiver.recv_timeout(timeout)?;
let leader_scheduler = blockstream.leader_scheduler.read().unwrap();
let entries_with_meta = ledger_entry_receiver.recv_timeout(timeout)?;
for entry in &entries {
if entry.is_tick() {
*tick_height += 1
}
let slot = leader_scheduler.tick_height_to_slot(*tick_height);
let leader_id = leader_scheduler
.get_leader_for_slot(slot)
.map(|leader| leader.to_string())
.unwrap_or_else(|| "None".to_string());
if entry.is_tick() && blockstream.queued_block.is_some() {
for entry_meta in &entries_with_meta {
if entry_meta.entry.is_tick() && blockstream.queued_block.is_some() {
let queued_block = blockstream.queued_block.as_ref();
let block_slot = queued_block.unwrap().slot;
let block_tick_height = queued_block.unwrap().tick_height;
let block_id = queued_block.unwrap().id;
let block_leader = queued_block.unwrap().leader_id;
blockstream
.emit_block_event(block_slot, block_tick_height, &leader_id, block_id)
.emit_block_event(block_slot, block_tick_height, block_leader, block_id)
.unwrap_or_else(|e| {
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
blockstream.queued_block = None;
}
blockstream
.emit_entry_event(slot, *tick_height, &leader_id, &entry)
.emit_entry_event(
entry_meta.slot,
entry_meta.tick_height,
entry_meta.slot_leader,
&entry_meta.entry,
)
.unwrap_or_else(|e| {
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
if 0 == leader_scheduler.num_ticks_left_in_slot(*tick_height) {
if 0 == entry_meta.num_ticks_left_in_slot {
blockstream.queued_block = Some(BlockData {
slot,
tick_height: *tick_height,
id: entry.id,
slot: entry_meta.slot,
tick_height: entry_meta.tick_height,
id: entry_meta.entry.id,
leader_id: entry_meta.slot_leader,
});
}
}
blockstream_sender.send(entries)?;
blockstream_sender.send(entries_with_meta)?;
Ok(())
}
}
@ -116,31 +108,20 @@ impl Service for BlockstreamService {
#[cfg(test)]
mod test {
use super::*;
use crate::entry::Entry;
use crate::entry::{Entry, EntryMeta};
use chrono::{DateTime, FixedOffset};
use serde_json::Value;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
#[test]
fn test_blockstream_stage_process_entries() {
// Set up the bank and leader_scheduler
fn test_blockstream_service_process_entries() {
let ticks_per_slot = 5;
let starting_tick_height = 1;
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000);
genesis_block.ticks_per_slot = ticks_per_slot;
genesis_block.slots_per_epoch = 2;
let bank = Bank::new(&genesis_block);
let leader_scheduler = LeaderScheduler::new_with_bank(&bank);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let leader_id = Keypair::new().pubkey();
// Set up blockstream
let mut blockstream = Blockstream::new("test_stream".to_string(), leader_scheduler.clone());
let mut blockstream = Blockstream::new("test_stream".to_string());
// Set up dummy channels to host an BlockstreamService
let (ledger_entry_sender, ledger_entry_receiver) = channel();
@ -153,25 +134,46 @@ mod test {
for x in 0..6 {
let entry = Entry::new(&mut last_id, 1, vec![]); //just ticks
last_id = entry.id;
expected_entries.push(entry.clone());
expected_tick_heights.push(starting_tick_height + x);
entries.push(entry);
let slot_height = x / ticks_per_slot;
let parent_slot = if slot_height > 0 {
Some(slot_height - 1)
} else {
None
};
let entry_meta = EntryMeta {
tick_height: x,
slot: slot_height,
slot_leader: leader_id,
num_ticks_left_in_slot: ticks_per_slot - ((x + 1) % ticks_per_slot),
parent_slot,
entry,
};
expected_entries.push(entry_meta.clone());
expected_tick_heights.push(x);
entries.push(entry_meta);
}
let keypair = Keypair::new();
let tx = SystemTransaction::new_account(&keypair, keypair.pubkey(), 1, Hash::default(), 0);
let entry = Entry::new(&mut last_id, 1, vec![tx]);
expected_entries.insert(ticks_per_slot as usize, entry.clone());
let entry_meta = EntryMeta {
tick_height: ticks_per_slot - 1,
slot: 0,
slot_leader: leader_id,
num_ticks_left_in_slot: 0,
parent_slot: None,
entry,
};
expected_entries.insert(ticks_per_slot as usize, entry_meta.clone());
expected_tick_heights.insert(
ticks_per_slot as usize,
starting_tick_height + ticks_per_slot - 1, // Populated entries should share the tick height of the previous tick.
ticks_per_slot - 1, // Populated entries should share the tick height of the previous tick.
);
entries.insert(ticks_per_slot as usize, entry);
entries.insert(ticks_per_slot as usize, entry_meta);
ledger_entry_sender.send(entries).unwrap();
BlockstreamService::process_entries(
&ledger_entry_receiver,
&blockstream_sender,
&mut (starting_tick_height - 1),
&mut blockstream,
)
.unwrap();
@ -201,7 +203,7 @@ mod test {
// `serde_json::from_str` does not work for populated Entries.
// Remove this `if` when fixed.
let entry: Entry = serde_json::from_value(entry_obj).unwrap();
assert_eq!(entry, expected_entries[i]);
assert_eq!(entry, expected_entries[i].entry);
}
}
for json in block_events {

View File

@ -21,8 +21,31 @@ use std::mem::size_of;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;
pub type EntrySender = Sender<Vec<EntryMeta>>;
pub type EntryReceiver = Receiver<Vec<EntryMeta>>;
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct EntryMeta {
pub tick_height: u64,
pub slot: u64,
pub slot_leader: Pubkey,
pub num_ticks_left_in_slot: u64,
pub parent_slot: Option<u64>,
pub entry: Entry,
}
impl EntryMeta {
pub fn default_with_entry(entry: &Entry) -> Self {
EntryMeta {
tick_height: 0,
slot: 0,
slot_leader: Pubkey::default(),
num_ticks_left_in_slot: 0,
parent_slot: None,
entry: entry.clone(),
}
}
}
/// Each Entry contains three pieces of data. The `num_hashes` field is the number
/// of hashes performed since the previous entry. The `id` field is the result

View File

@ -4,7 +4,7 @@ use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::blocktree_processor::{self, BankForksInfo};
use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::entry::{Entry, EntryMeta, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::BlobError;
use crate::result::{Error, Result};
@ -65,6 +65,8 @@ impl ReplayStage {
last_entry_id: &Arc<RwLock<Hash>>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>,
slot: u64,
parent_slot: Option<u64>,
) -> Result<()> {
// Coalesce all the available entries into a single vote
submit(
@ -95,9 +97,12 @@ impl ReplayStage {
)
};
let mut entry_tick_height = num_ticks;
let mut entries_with_meta = Vec::new();
for (i, entry) in entries.iter().enumerate() {
inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize);
if entry.is_tick() {
entry_tick_height += 1;
if num_ticks_to_next_vote == 0 {
num_ticks_to_next_vote = bank.ticks_per_slot();
}
@ -107,6 +112,14 @@ impl ReplayStage {
"replicate-stage_tick-to-vote",
num_ticks_to_next_vote as usize
);
entries_with_meta.push(EntryMeta {
tick_height: entry_tick_height,
slot,
slot_leader: bank.slot_leader(),
num_ticks_left_in_slot: num_ticks_to_next_vote,
parent_slot,
entry: entry.clone(),
});
// If it's the last entry in the vector, i will be vec len - 1.
// If we don't process the entry now, the for loop will exit and the entry
// will be dropped.
@ -140,6 +153,7 @@ impl ReplayStage {
// If leader rotation happened, only write the entries up to leader rotation.
entries.truncate(num_entries_to_write);
entries_with_meta.truncate(num_entries_to_write);
*last_entry_id.write().unwrap() = entries
.last()
.expect("Entries cannot be empty at this point")
@ -155,7 +169,7 @@ impl ReplayStage {
// an error occurred processing one of the entries (causing the rest of the entries to
// not be processed).
if entries_len != 0 {
ledger_entry_sender.send(entries)?;
ledger_entry_sender.send(entries_with_meta)?;
}
*current_blob_index += entries_len;
@ -313,6 +327,7 @@ impl ReplayStage {
vec![]
}
};
let parent_slot = blocktree.meta(slot).unwrap().map(|meta| meta.parent_slot);
if !entries.is_empty() {
if let Err(e) = Self::process_entries(
@ -325,6 +340,8 @@ impl ReplayStage {
&last_entry_id,
&leader_scheduler_,
&subscriptions_,
slot,
parent_slot,
) {
error!("{} process_entries failed: {:?}", my_id, e);
}
@ -593,7 +610,11 @@ mod test {
while let Ok(entries) = ledger_writer_recv.try_recv() {
received_ticks.extend(entries);
}
assert_eq!(&received_ticks[..], &entries_to_send[..]);
let received_ticks_entries: Vec<Entry> = received_ticks
.iter()
.map(|entry_meta| entry_meta.entry.clone())
.collect();
assert_eq!(&received_ticks_entries[..], &entries_to_send[..]);
// Replay stage should continue running even after rotation has happened (tvu never goes down)
assert_eq!(exit.load(Ordering::Relaxed), false);
@ -672,7 +693,7 @@ mod test {
.recv()
.expect("Expected to receive an entry on the ledger writer receiver");
assert_eq!(next_tick, received_tick);
assert_eq!(next_tick[0], received_tick[0].entry);
replay_stage
.close()
@ -791,7 +812,7 @@ mod test {
let received_entry = ledger_writer_recv
.recv()
.expect("Expected to recieve an entry on the ledger writer receiver");
assert_eq!(received_entry[0], entry);
assert_eq!(received_entry[0].entry, entry);
if i == leader_rotation_index {
expected_last_id = entry.id;
@ -851,6 +872,8 @@ mod test {
&Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
0,
None,
);
match res {
@ -877,6 +900,8 @@ mod test {
&Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
0,
None,
);
match res {

View File

@ -7,7 +7,7 @@ use crate::blocktree::Blocktree;
use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
use crate::client::mk_client_with_timeout;
use crate::cluster_info::ClusterInfo;
use crate::entry::EntryReceiver;
use crate::entry::{Entry, EntryReceiver};
use crate::result::{Error, Result};
use crate::service::Service;
use bincode::deserialize;
@ -362,7 +362,11 @@ impl StorageStage {
tx_sender: &TransactionSender,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries = entry_receiver.recv_timeout(timeout)?;
let entries: Vec<Entry> = entry_receiver
.recv_timeout(timeout)?
.iter()
.map(|entry_meta| entry_meta.entry.clone())
.collect();
for entry in entries {
// Go through the transactions, find votes, and use them to update
// the storage_keys with their signatures.
@ -446,7 +450,7 @@ impl Service for StorageStage {
mod tests {
use crate::blocktree::{create_tmp_sample_blocktree, Blocktree};
use crate::cluster_info::{ClusterInfo, NodeInfo};
use crate::entry::{make_tiny_test_entries, Entry};
use crate::entry::{make_tiny_test_entries, Entry, EntryMeta};
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::storage_stage::NUM_IDENTITIES;
@ -528,7 +532,11 @@ mod tests {
STORAGE_ROTATE_TEST_COUNT,
&cluster_info,
);
storage_entry_sender.send(entries.clone()).unwrap();
let entries_meta: Vec<EntryMeta> = entries
.iter()
.map(|entry| EntryMeta::default_with_entry(entry))
.collect();
storage_entry_sender.send(entries_meta.clone()).unwrap();
let keypair = Keypair::new();
let hash = Hash::default();
@ -537,7 +545,7 @@ mod tests {
assert_eq!(result, Hash::default());
for _ in 0..9 {
storage_entry_sender.send(entries.clone()).unwrap();
storage_entry_sender.send(entries_meta.clone()).unwrap();
}
for _ in 0..5 {
result = storage_state.get_mining_result(&signature);
@ -593,7 +601,11 @@ mod tests {
STORAGE_ROTATE_TEST_COUNT,
&cluster_info,
);
storage_entry_sender.send(entries.clone()).unwrap();
let entries_meta: Vec<EntryMeta> = entries
.iter()
.map(|entry| EntryMeta::default_with_entry(entry))
.collect();
storage_entry_sender.send(entries_meta.clone()).unwrap();
let mut reference_keys;
{
@ -605,7 +617,11 @@ mod tests {
let keypair = Keypair::new();
let vote_tx = VoteTransaction::new_vote(&keypair, 123456, Hash::default(), 1);
vote_txs.push(vote_tx);
let vote_entries = vec![Entry::new(&Hash::default(), 1, vote_txs)];
let vote_entries = vec![EntryMeta::default_with_entry(&Entry::new(
&Hash::default(),
1,
vote_txs,
))];
storage_entry_sender.send(vote_entries).unwrap();
for _ in 0..5 {

View File

@ -138,8 +138,6 @@ impl Tvu {
let (blockstream_service, blockstream_receiver) = BlockstreamService::new(
previous_receiver,
blockstream.unwrap().to_string(),
bank_forks.read().unwrap().working_bank().tick_height(), // TODO: BlockstreamService needs to deal with BankForks somehow still
leader_scheduler,
exit.clone(),
);
previous_receiver = blockstream_receiver;