2019-01-29 00:21:27 -08:00
|
|
|
//! The `entry_stream` module provides a method for streaming entries out via a
|
|
|
|
//! local unix socket, to provide client services such as a block explorer with
|
|
|
|
//! real-time access to entries.
|
|
|
|
|
|
|
|
use crate::entry::Entry;
|
2019-02-11 10:36:59 -08:00
|
|
|
use crate::leader_scheduler::LeaderScheduler;
|
2019-01-29 00:21:27 -08:00
|
|
|
use crate::result::Result;
|
2019-02-11 10:36:59 -08:00
|
|
|
use chrono::{SecondsFormat, Utc};
|
2019-01-29 00:21:27 -08:00
|
|
|
use std::io::prelude::*;
|
|
|
|
use std::net::Shutdown;
|
|
|
|
use std::os::unix::net::UnixStream;
|
|
|
|
use std::path::Path;
|
2019-02-11 10:36:59 -08:00
|
|
|
use std::sync::{Arc, RwLock};
|
2019-01-29 00:21:27 -08:00
|
|
|
|
|
|
|
pub trait EntryStreamHandler {
|
|
|
|
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()>;
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct EntryStream {
|
|
|
|
pub socket: String,
|
2019-02-11 10:36:59 -08:00
|
|
|
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
2019-01-29 00:21:27 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl EntryStream {
|
2019-02-11 10:36:59 -08:00
|
|
|
pub fn new(socket: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
|
|
|
|
EntryStream {
|
|
|
|
socket,
|
|
|
|
leader_scheduler,
|
|
|
|
}
|
2019-01-29 00:21:27 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl EntryStreamHandler for EntryStream {
|
|
|
|
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> {
|
|
|
|
let mut socket = UnixStream::connect(Path::new(&self.socket))?;
|
|
|
|
for entry in entries {
|
2019-01-30 10:12:31 -08:00
|
|
|
let json = serde_json::to_string(&entry)?;
|
2019-02-11 10:36:59 -08:00
|
|
|
let (slot, slot_leader) = {
|
|
|
|
let leader_scheduler = self.leader_scheduler.read().unwrap();
|
|
|
|
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
|
|
|
|
(slot, leader_scheduler.get_leader_for_slot(slot))
|
|
|
|
};
|
|
|
|
let payload = format!(
|
|
|
|
r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#,
|
|
|
|
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
|
|
|
|
slot,
|
|
|
|
slot_leader,
|
|
|
|
json
|
|
|
|
);
|
2019-01-30 10:12:31 -08:00
|
|
|
socket.write_all(payload.as_bytes())?;
|
2019-01-29 00:21:27 -08:00
|
|
|
}
|
|
|
|
socket.shutdown(Shutdown::Write)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct MockEntryStream {
|
|
|
|
pub socket: Vec<String>,
|
2019-02-11 10:36:59 -08:00
|
|
|
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
2019-01-29 00:21:27 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl MockEntryStream {
|
|
|
|
#[allow(clippy::needless_pass_by_value)]
|
2019-02-11 10:36:59 -08:00
|
|
|
pub fn new(_socket: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
|
|
|
|
MockEntryStream {
|
|
|
|
socket: Vec::new(),
|
|
|
|
leader_scheduler,
|
|
|
|
}
|
2019-01-29 00:21:27 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl EntryStreamHandler for MockEntryStream {
|
|
|
|
fn stream_entries(&mut self, entries: &[Entry]) -> Result<()> {
|
|
|
|
for entry in entries {
|
2019-01-30 10:12:31 -08:00
|
|
|
let json = serde_json::to_string(&entry)?;
|
2019-02-11 10:36:59 -08:00
|
|
|
let (slot, slot_leader) = {
|
|
|
|
let leader_scheduler = self.leader_scheduler.read().unwrap();
|
|
|
|
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
|
|
|
|
(slot, leader_scheduler.get_leader_for_slot(slot))
|
|
|
|
};
|
|
|
|
let payload = format!(
|
|
|
|
r#"{{"dt":"{}","t":"entry","s":{},"leader_id":"{:?}","entry":{}}}"#,
|
|
|
|
Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true),
|
|
|
|
slot,
|
|
|
|
slot_leader,
|
|
|
|
json
|
|
|
|
);
|
2019-01-30 10:12:31 -08:00
|
|
|
self.socket.push(payload);
|
2019-01-29 00:21:27 -08:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|