Add block event logic to EntryStreamStage
This commit is contained in:
parent
e58f08b60f
commit
c1447b2695
|
@ -57,23 +57,23 @@ impl Output for SocketOutput {
|
|||
}
|
||||
|
||||
pub trait EntryStreamHandler {
|
||||
fn emit_entry_events(&self, entries: &[Entry]) -> Result<()>;
|
||||
fn emit_entry_event(&self, entries: &Entry) -> Result<()>;
|
||||
fn emit_block_event(&self, tick_height: u64, last_id: Hash) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EntryStream<T: Output> {
|
||||
pub output: T,
|
||||
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
||||
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
||||
pub queued_block: Option<EntryStreamBlock>,
|
||||
}
|
||||
|
||||
impl<T> EntryStreamHandler for EntryStream<T>
|
||||
where
|
||||
T: Output,
|
||||
{
|
||||
fn emit_entry_events(&self, entries: &[Entry]) -> Result<()> {
|
||||
fn emit_entry_event(&self, entry: &Entry) -> Result<()> {
|
||||
let leader_scheduler = self.leader_scheduler.read().unwrap();
|
||||
for entry in entries {
|
||||
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
|
||||
let leader_id = leader_scheduler
|
||||
.get_leader_for_slot(slot)
|
||||
|
@ -89,7 +89,6 @@ where
|
|||
json_entry,
|
||||
);
|
||||
self.output.write(payload)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -120,6 +119,7 @@ impl SocketEntryStream {
|
|||
EntryStream {
|
||||
output: SocketOutput { socket },
|
||||
leader_scheduler,
|
||||
queued_block: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -131,6 +131,7 @@ impl MockEntryStream {
|
|||
EntryStream {
|
||||
output: VecOutput::new(),
|
||||
leader_scheduler,
|
||||
queued_block: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,6 +140,12 @@ impl MockEntryStream {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EntryStreamBlock {
|
||||
pub tick_height: u64,
|
||||
pub id: Hash,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
@ -192,12 +199,11 @@ mod test {
|
|||
let entry = Entry::new(&mut last_id, tick_height, 1, vec![]); //just ticks
|
||||
last_id = entry.id;
|
||||
previous_slot = curr_slot;
|
||||
entry_stream.emit_entry_event(&entry).unwrap();
|
||||
expected_entries.push(entry.clone());
|
||||
entries.push(entry);
|
||||
}
|
||||
|
||||
entry_stream.emit_entry_events(&entries).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
entry_stream.entries().len() as u64,
|
||||
// one entry per tick (0..=N+2) is +3, plus one block
|
||||
|
|
|
@ -3,11 +3,11 @@
|
|||
//! real-time access to entries.
|
||||
|
||||
use crate::entry::{EntryReceiver, EntrySender};
|
||||
use crate::entry_stream::EntryStreamHandler;
|
||||
#[cfg(test)]
|
||||
use crate::entry_stream::MockEntryStream as EntryStream;
|
||||
#[cfg(not(test))]
|
||||
use crate::entry_stream::SocketEntryStream as EntryStream;
|
||||
use crate::entry_stream::{EntryStreamBlock, EntryStreamHandler};
|
||||
use crate::leader_scheduler::LeaderScheduler;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
|
@ -59,11 +59,35 @@ impl EntryStreamStage {
|
|||
) -> Result<()> {
|
||||
let timeout = Duration::new(1, 0);
|
||||
let entries = ledger_entry_receiver.recv_timeout(timeout)?;
|
||||
|
||||
for entry in &entries {
|
||||
if entry.is_tick() && entry_stream.queued_block.is_some() {
|
||||
let queued_block = entry_stream.queued_block.as_ref();
|
||||
let block_tick_height = queued_block.unwrap().tick_height;
|
||||
let block_id = queued_block.unwrap().id;
|
||||
entry_stream
|
||||
.emit_entry_events(&entries)
|
||||
.emit_block_event(block_tick_height, block_id)
|
||||
.unwrap_or_else(|e| {
|
||||
error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
|
||||
});
|
||||
entry_stream.queued_block = None;
|
||||
}
|
||||
entry_stream.emit_entry_event(&entry).unwrap_or_else(|e| {
|
||||
error!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
|
||||
});
|
||||
if 0 == entry_stream
|
||||
.leader_scheduler
|
||||
.read()
|
||||
.unwrap()
|
||||
.num_ticks_left_in_slot(entry.tick_height)
|
||||
{
|
||||
entry_stream.queued_block = Some(EntryStreamBlock {
|
||||
tick_height: entry.tick_height,
|
||||
id: entry.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
entry_stream_sender.send(entries)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -87,11 +111,14 @@ mod test {
|
|||
use chrono::{DateTime, FixedOffset};
|
||||
use serde_json::Value;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::system_transaction::SystemTransaction;
|
||||
|
||||
#[test]
|
||||
fn test_entry_stream_stage_process_entries() {
|
||||
// Set up bank and leader_scheduler
|
||||
let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10);
|
||||
let ticks_per_slot = 5;
|
||||
let leader_scheduler_config = LeaderSchedulerConfig::new(ticks_per_slot, 2, 10);
|
||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000);
|
||||
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
|
||||
// Set up entry stream
|
||||
|
@ -105,12 +132,18 @@ mod test {
|
|||
let mut last_id = Hash::default();
|
||||
let mut entries = Vec::new();
|
||||
let mut expected_entries = Vec::new();
|
||||
for x in 0..5 {
|
||||
for x in 0..6 {
|
||||
let entry = Entry::new(&mut last_id, x, 1, vec![]); //just ticks
|
||||
last_id = entry.id;
|
||||
expected_entries.push(entry.clone());
|
||||
entries.push(entry);
|
||||
}
|
||||
let keypair = Keypair::new();
|
||||
let tx = SystemTransaction::new_account(&keypair, keypair.pubkey(), 1, Hash::default(), 0);
|
||||
let entry = Entry::new(&mut last_id, ticks_per_slot - 1, 1, vec![tx]);
|
||||
expected_entries.insert(ticks_per_slot as usize, entry.clone());
|
||||
entries.insert(ticks_per_slot as usize, entry);
|
||||
|
||||
ledger_entry_sender.send(entries).unwrap();
|
||||
EntryStreamStage::process_entries(
|
||||
&ledger_entry_receiver,
|
||||
|
@ -118,19 +151,38 @@ mod test {
|
|||
&mut entry_stream,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(entry_stream.entries().len(), 5);
|
||||
assert_eq!(entry_stream.entries().len(), 8);
|
||||
|
||||
for (i, item) in entry_stream.entries().iter().enumerate() {
|
||||
let (entry_events, block_events): (Vec<Value>, Vec<Value>) = entry_stream
|
||||
.entries()
|
||||
.iter()
|
||||
.map(|item| {
|
||||
let json: Value = serde_json::from_str(&item).unwrap();
|
||||
let dt_str = json["dt"].as_str().unwrap();
|
||||
|
||||
// Ensure `ts` field parses as valid DateTime
|
||||
let _dt: DateTime<FixedOffset> = DateTime::parse_from_rfc3339(dt_str).unwrap();
|
||||
|
||||
json
|
||||
})
|
||||
.partition(|json| {
|
||||
let item_type = json["t"].as_str().unwrap();
|
||||
item_type == "entry"
|
||||
});
|
||||
for (i, json) in entry_events.iter().enumerate() {
|
||||
let entry_obj = json["entry"].clone();
|
||||
let tx = entry_obj["transactions"].as_array().unwrap();
|
||||
if tx.len() == 0 {
|
||||
// TODO: There is a bug in Transaction deserialize methods such that
|
||||
// `serde_json::from_str` does not work for populated Entries.
|
||||
// Remove this `if` when fixed.
|
||||
let entry: Entry = serde_json::from_value(entry_obj).unwrap();
|
||||
assert_eq!(entry, expected_entries[i]);
|
||||
}
|
||||
}
|
||||
for json in block_events {
|
||||
let height = json["h"].as_u64().unwrap();
|
||||
assert_eq!(ticks_per_slot - 1, height);
|
||||
}
|
||||
|
||||
// Ensure entries pass through stage unadulterated
|
||||
let recv_entries = entry_stream_receiver.recv().unwrap();
|
||||
assert_eq!(expected_entries, recv_entries);
|
||||
|
|
Loading…
Reference in New Issue