Rename EntryStream to Blockstream

This commit is contained in:
Tyera Eulberg 2019-02-21 16:16:09 -07:00 committed by Tyera Eulberg
parent 4a0c759795
commit d8f6865338
11 changed files with 100 additions and 102 deletions

View File

@ -12,7 +12,7 @@ To run an api node, include the argument `no-signer` and (optional)
`entry-stream` socket location:
```bash
$ ./multinode-demo/fullnode-x.sh --no-signer --entry-stream <SOCKET>
$ ./multinode-demo/fullnode-x.sh --no-signer --blockstream <SOCKET>
```
The stream will output a series of JSON objects:

View File

@ -129,11 +129,11 @@ fn main() {
let matches = App::new("solana-fullnode")
.version(crate_version!())
.arg(
Arg::with_name("entry_stream")
.long("entry-stream")
Arg::with_name("blockstream")
.long("blockstream")
.takes_value(true)
.value_name("UNIX DOMAIN SOCKET")
.help("Open entry stream at this unix domain socket location")
.help("Open blockstream at this unix domain socket location")
)
.arg(
Arg::with_name("identity")
@ -238,7 +238,7 @@ fn main() {
)
};
let init_complete_file = matches.value_of("init_complete_file");
fullnode_config.entry_stream = matches.value_of("entry_stream").map(|s| s.to_string());
fullnode_config.blockstream = matches.value_of("blockstream").map(|s| s.to_string());
let keypair = Arc::new(keypair);
let mut node = Node::new_with_external_ip(keypair.pubkey(), &gossip);

View File

@ -29,7 +29,7 @@ else
program="$solana_fullnode"
fi
maybe_entry_stream=
maybe_blockstream=
maybe_init_complete_file=
maybe_no_leader_rotation=
@ -37,8 +37,8 @@ while [[ -n $1 ]]; do
if [[ $1 = --init-complete-file ]]; then
maybe_init_complete_file="--init-complete-file $2"
shift 2
elif [[ $1 = --entry-stream ]]; then
maybe_entry_stream="$1 $2"
elif [[ $1 = --blockstream ]]; then
maybe_blockstream="$1 $2"
shift 2
elif [[ $1 = --no-leader-rotation ]]; then
maybe_no_leader_rotation="--no-leader-rotation"
@ -61,9 +61,9 @@ tune_system
trap 'kill "$pid" && wait "$pid"' INT TERM
$solana_ledger_tool --ledger "$SOLANA_CONFIG_DIR"/bootstrap-leader-ledger verify
# shellcheck disable=SC2086 # Don't want to double quote maybe_entry_stream or maybe_init_complete_file
# shellcheck disable=SC2086 # Don't want to double quote maybe_blockstream or maybe_init_complete_file
$program \
$maybe_entry_stream \
$maybe_blockstream \
$maybe_init_complete_file \
$maybe_no_leader_rotation \
--identity "$SOLANA_CONFIG_DIR"/bootstrap-leader.json \

View File

@ -21,14 +21,14 @@ usage() {
echo
fi
cat <<EOF
usage: $0 [-x] [--entry-stream PATH] [--init-complete-file FILE] [--no-leader-rotation] [--no-signer] [--rpc-port port] [rsync network path to bootstrap leader configuration] [network entry point]
usage: $0 [-x] [--blockstream PATH] [--init-complete-file FILE] [--no-leader-rotation] [--no-signer] [--rpc-port port] [rsync network path to bootstrap leader configuration] [network entry point]
Start a full node on the specified network
-x - start a new, dynamically-configured full node
-X [label] - start or restart a dynamically-configured full node with
the specified label
--entry-stream PATH - open entry stream at this unix domain socket location
--blockstream PATH - open blockstream at this unix domain socket location
--init-complete-file FILE - create this file, if it doesn't already exist, once node initialization is complete
--no-leader-rotation - disable leader rotation
--no-signer - start node without vote signer
@ -42,7 +42,7 @@ if [[ $1 = -h ]]; then
usage
fi
maybe_entry_stream=
maybe_blockstream=
maybe_init_complete_file=
maybe_no_leader_rotation=
maybe_no_signer=
@ -58,8 +58,8 @@ while [[ ${1:0:1} = - ]]; do
self_setup=1
self_setup_label=$$
shift
elif [[ $1 = --entry-stream ]]; then
maybe_entry_stream="$1 $2"
elif [[ $1 = --blockstream ]]; then
maybe_blockstream="$1 $2"
shift 2
elif [[ $1 = --init-complete-file ]]; then
maybe_init_complete_file="--init-complete-file $2"
@ -246,9 +246,9 @@ if [[ ! -d "$ledger_config_dir" ]]; then
fi
trap 'kill "$pid" && wait "$pid"' INT TERM
# shellcheck disable=SC2086 # Don't want to double quote maybe_entry_stream or maybe_init_complete_file or maybe_no_signer or maybe_rpc_port
# shellcheck disable=SC2086 # Don't want to double quote maybe_blockstream or maybe_init_complete_file or maybe_no_signer or maybe_rpc_port
$program \
$maybe_entry_stream \
$maybe_blockstream \
$maybe_init_complete_file \
$maybe_no_leader_rotation \
$maybe_no_signer \

View File

@ -183,7 +183,7 @@ local|tar)
fi
if [[ $nodeType = apinode ]]; then
args+=(
--entry-stream /tmp/solana-entry-stream.sock
--blockstream /tmp/solana-blockstream.sock
--no-signer
)
fi

10
run.sh
View File

@ -23,10 +23,10 @@ $ok || {
exit 1
}
entryStreamSocket=/tmp/solana-entry-stream.sock # Default to location used by the block explorer
blockstreamSocket=/tmp/solana-blockstream.sock # Default to location used by the block explorer
while [[ -n $1 ]]; do
if [[ $1 = --entry-stream ]]; then
entryStreamSocket=$2
if [[ $1 = --blockstream ]]; then
blockstreamSocket=$2
shift 2
else
echo "Unknown argument: $1"
@ -58,8 +58,8 @@ args=(
--ledger "$dataDir"/ledger/
--rpc-port 8899
)
if [[ -n $entryStreamSocket ]]; then
args+=(--entry-stream "$entryStreamSocket")
if [[ -n $blockstreamSocket ]]; then
args+=(--blockstream "$blockstreamSocket")
fi
solana-fullnode "${args[@]}" &
fullnode=$!

View File

@ -1,4 +1,4 @@
//! The `entry_stream` module provides a method for streaming entries out via a
//! The `blockstream` module provides a method for streaming entries out via a
//! local unix socket, to provide client services such as a block explorer with
//! real-time access to entries.
@ -59,7 +59,7 @@ impl EntryWriter for EntrySocket {
}
}
pub trait EntryStreamHandler {
pub trait BlockstreamEvents {
fn emit_entry_event(
&self,
slot: u64,
@ -77,13 +77,13 @@ pub trait EntryStreamHandler {
}
#[derive(Debug)]
pub struct EntryStream<T: EntryWriter> {
pub struct Blockstream<T: EntryWriter> {
pub output: T,
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
pub queued_block: Option<EntryStreamBlock>,
pub queued_block: Option<BlockData>,
}
impl<T> EntryStreamHandler for EntryStream<T>
impl<T> BlockstreamEvents for Blockstream<T>
where
T: EntryWriter,
{
@ -127,11 +127,11 @@ where
}
}
pub type SocketEntryStream = EntryStream<EntrySocket>;
pub type SocketBlockstream = Blockstream<EntrySocket>;
impl SocketEntryStream {
impl SocketBlockstream {
pub fn new(socket: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
EntryStream {
Blockstream {
output: EntrySocket { socket },
leader_scheduler,
queued_block: None,
@ -139,11 +139,11 @@ impl SocketEntryStream {
}
}
pub type MockEntryStream = EntryStream<EntryVec>;
pub type MockBlockstream = Blockstream<EntryVec>;
impl MockEntryStream {
impl MockBlockstream {
pub fn new(_: String, leader_scheduler: Arc<RwLock<LeaderScheduler>>) -> Self {
EntryStream {
Blockstream {
output: EntryVec::new(),
leader_scheduler,
queued_block: None,
@ -156,7 +156,7 @@ impl MockEntryStream {
}
#[derive(Debug)]
pub struct EntryStreamBlock {
pub struct BlockData {
pub slot: u64,
pub tick_height: u64,
pub id: Hash,
@ -175,7 +175,7 @@ mod test {
use std::collections::HashSet;
#[test]
fn test_entry_stream() -> () {
fn test_blockstream() -> () {
// Set up bank and leader_scheduler
let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10);
let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000);
@ -183,9 +183,8 @@ mod test {
let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
// Set up entry stream
let entry_stream =
MockEntryStream::new("test_stream".to_string(), leader_scheduler.clone());
// Set up blockstream
let blockstream = MockBlockstream::new("test_stream".to_string(), leader_scheduler.clone());
let ticks_per_slot = leader_scheduler.read().unwrap().ticks_per_slot;
let mut last_id = Hash::default();
@ -215,14 +214,14 @@ mod test {
.unwrap()
.tick_height_to_slot(tick_height);
if curr_slot != previous_slot {
entry_stream
blockstream
.emit_block_event(previous_slot, tick_height - 1, &leader_id, last_id)
.unwrap();
}
let entry = Entry::new(&mut last_id, 1, vec![]); // just ticks
last_id = entry.id;
previous_slot = curr_slot;
entry_stream
blockstream
.emit_entry_event(curr_slot, tick_height, &leader_id, &entry)
.unwrap();
expected_entries.push(entry.clone());
@ -230,7 +229,7 @@ mod test {
}
assert_eq!(
entry_stream.entries().len() as u64,
blockstream.entries().len() as u64,
// one entry per tick (0..=N+2) is +3, plus one block
ticks_per_slot + 3 + 1
);
@ -240,7 +239,7 @@ mod test {
let mut matched_slots = HashSet::new();
let mut matched_blocks = HashSet::new();
for item in entry_stream.entries() {
for item in blockstream.entries() {
let json: Value = serde_json::from_str(&item).unwrap();
let dt_str = json["dt"].as_str().unwrap();

View File

@ -1,13 +1,13 @@
//! The `entry_stream_stage` implements optional streaming of entries using the
//! `entry_stream` module, providing client services such as a block explorer with
//! The `blockstream_service` implements optional streaming of entries and block metadata
//! using the `blockstream` module, providing client services such as a block explorer with
//! real-time access to entries.
use crate::entry::{EntryReceiver, EntrySender};
#[cfg(test)]
use crate::entry_stream::MockEntryStream as EntryStream;
use crate::blockstream::MockBlockstream as Blockstream;
#[cfg(not(test))]
use crate::entry_stream::SocketEntryStream as EntryStream;
use crate::entry_stream::{EntryStreamBlock, EntryStreamHandler};
use crate::blockstream::SocketBlockstream as Blockstream;
use crate::blockstream::{BlockData, BlockstreamEvents};
use crate::entry::{EntryReceiver, EntrySender};
use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
use crate::service::Service;
@ -17,32 +17,32 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub struct EntryStreamStage {
t_entry_stream: JoinHandle<()>,
pub struct BlockstreamService {
t_blockstream: JoinHandle<()>,
}
impl EntryStreamStage {
impl BlockstreamService {
#[allow(clippy::new_ret_no_self)]
pub fn new(
ledger_entry_receiver: EntryReceiver,
entry_stream_socket: String,
blockstream_socket: String,
mut tick_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit: Arc<AtomicBool>,
) -> (Self, EntryReceiver) {
let (entry_stream_sender, entry_stream_receiver) = channel();
let mut entry_stream = EntryStream::new(entry_stream_socket, leader_scheduler);
let t_entry_stream = Builder::new()
.name("solana-entry-stream".to_string())
let (blockstream_sender, blockstream_receiver) = channel();
let mut blockstream = Blockstream::new(blockstream_socket, leader_scheduler);
let t_blockstream = Builder::new()
.name("solana-blockstream".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = Self::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
&blockstream_sender,
&mut tick_height,
&mut entry_stream,
&mut blockstream,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -52,17 +52,17 @@ impl EntryStreamStage {
}
})
.unwrap();
(Self { t_entry_stream }, entry_stream_receiver)
(Self { t_blockstream }, blockstream_receiver)
}
fn process_entries(
ledger_entry_receiver: &EntryReceiver,
entry_stream_sender: &EntrySender,
blockstream_sender: &EntrySender,
tick_height: &mut u64,
entry_stream: &mut EntryStream,
blockstream: &mut Blockstream,
) -> Result<()> {
let timeout = Duration::new(1, 0);
let entries = ledger_entry_receiver.recv_timeout(timeout)?;
let leader_scheduler = entry_stream.leader_scheduler.read().unwrap();
let leader_scheduler = blockstream.leader_scheduler.read().unwrap();
for entry in &entries {
if entry.is_tick() {
@ -74,25 +74,25 @@ impl EntryStreamStage {
.map(|leader| leader.to_string())
.unwrap_or_else(|| "None".to_string());
if entry.is_tick() && entry_stream.queued_block.is_some() {
let queued_block = entry_stream.queued_block.as_ref();
if entry.is_tick() && blockstream.queued_block.is_some() {
let queued_block = blockstream.queued_block.as_ref();
let block_slot = queued_block.unwrap().slot;
let block_tick_height = queued_block.unwrap().tick_height;
let block_id = queued_block.unwrap().id;
entry_stream
blockstream
.emit_block_event(block_slot, block_tick_height, &leader_id, block_id)
.unwrap_or_else(|e| {
debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
entry_stream.queued_block = None;
blockstream.queued_block = None;
}
entry_stream
blockstream
.emit_entry_event(slot, *tick_height, &leader_id, &entry)
.unwrap_or_else(|e| {
debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
if 0 == leader_scheduler.num_ticks_left_in_slot(*tick_height) {
entry_stream.queued_block = Some(EntryStreamBlock {
blockstream.queued_block = Some(BlockData {
slot,
tick_height: *tick_height,
id: entry.id,
@ -100,16 +100,16 @@ impl EntryStreamStage {
}
}
entry_stream_sender.send(entries)?;
blockstream_sender.send(entries)?;
Ok(())
}
}
impl Service for EntryStreamStage {
impl Service for BlockstreamService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_entry_stream.join()
self.t_blockstream.join()
}
}
@ -127,7 +127,7 @@ mod test {
use solana_sdk::system_transaction::SystemTransaction;
#[test]
fn test_entry_stream_stage_process_entries() {
fn test_blockstream_stage_process_entries() {
// Set up the bank and leader_scheduler
let ticks_per_slot = 5;
let starting_tick_height = 1;
@ -138,13 +138,12 @@ mod test {
let leader_scheduler = LeaderScheduler::new_with_bank(&leader_scheduler_config, &bank);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
// Set up entry stream
let mut entry_stream =
EntryStream::new("test_stream".to_string(), leader_scheduler.clone());
// Set up blockstream
let mut blockstream = Blockstream::new("test_stream".to_string(), leader_scheduler.clone());
// Set up dummy channels to host an EntryStreamStage
// Set up dummy channels to host an BlockstreamService
let (ledger_entry_sender, ledger_entry_receiver) = channel();
let (entry_stream_sender, entry_stream_receiver) = channel();
let (blockstream_sender, blockstream_receiver) = channel();
let mut last_id = Hash::default();
let mut entries = Vec::new();
@ -168,16 +167,16 @@ mod test {
entries.insert(ticks_per_slot as usize, entry);
ledger_entry_sender.send(entries).unwrap();
EntryStreamStage::process_entries(
BlockstreamService::process_entries(
&ledger_entry_receiver,
&entry_stream_sender,
&blockstream_sender,
&mut (starting_tick_height - 1),
&mut entry_stream,
&mut blockstream,
)
.unwrap();
assert_eq!(entry_stream.entries().len(), 8);
assert_eq!(blockstream.entries().len(), 8);
let (entry_events, block_events): (Vec<Value>, Vec<Value>) = entry_stream
let (entry_events, block_events): (Vec<Value>, Vec<Value>) = blockstream
.entries()
.iter()
.map(|item| {
@ -212,7 +211,7 @@ mod test {
}
// Ensure entries pass through stage unadulterated
let recv_entries = entry_stream_receiver.recv().unwrap();
let recv_entries = blockstream_receiver.recv().unwrap();
assert_eq!(expected_entries, recv_entries);
}
}

View File

@ -64,7 +64,7 @@ pub enum FullnodeReturnType {
pub struct FullnodeConfig {
pub sigverify_disabled: bool,
pub voting_disabled: bool,
pub entry_stream: Option<String>,
pub blockstream: Option<String>,
pub storage_rotate_count: u64,
pub leader_scheduler_config: LeaderSchedulerConfig,
pub tick_config: PohServiceConfig,
@ -78,7 +78,7 @@ impl Default for FullnodeConfig {
Self {
sigverify_disabled: false,
voting_disabled: false,
entry_stream: None,
blockstream: None,
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
leader_scheduler_config: LeaderSchedulerConfig::default(),
tick_config: PohServiceConfig::default(),
@ -270,7 +270,7 @@ impl Fullnode {
config.storage_rotate_count,
&rotation_sender,
&storage_state,
config.entry_stream.as_ref(),
config.blockstream.as_ref(),
ledger_signal_receiver,
leader_scheduler.clone(),
&subscriptions,

View File

@ -26,13 +26,13 @@ pub mod crds_gossip_push;
pub mod crds_value;
#[macro_use]
pub mod contact_info;
pub mod blockstream;
pub mod blockstream_service;
pub mod blocktree;
pub mod blocktree_processor;
pub mod cluster_info;
pub mod db_window;
pub mod entry;
pub mod entry_stream;
pub mod entry_stream_stage;
#[cfg(feature = "erasure")]
pub mod erasure;
pub mod fetch_stage;

View File

@ -14,9 +14,9 @@
use crate::bank_forks::BankForks;
use crate::blob_fetch_stage::BlobFetchStage;
use crate::blockstream_service::BlockstreamService;
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::entry_stream_stage::EntryStreamStage;
use crate::leader_scheduler::LeaderScheduler;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
@ -45,7 +45,7 @@ pub struct Tvu {
fetch_stage: BlobFetchStage,
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
entry_stream_stage: Option<EntryStreamStage>,
blockstream_service: Option<BlockstreamService>,
storage_stage: StorageStage,
exit: Arc<AtomicBool>,
}
@ -78,7 +78,7 @@ impl Tvu {
storage_rotate_count: u64,
to_leader_sender: &TvuRotationSender,
storage_state: &StorageState,
entry_stream: Option<&String>,
blockstream: Option<&String>,
ledger_signal_receiver: Receiver<bool>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>,
@ -134,16 +134,16 @@ impl Tvu {
subscriptions,
);
let entry_stream_stage = if entry_stream.is_some() {
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
let blockstream_service = if blockstream.is_some() {
let (blockstream_service, blockstream_receiver) = BlockstreamService::new(
previous_receiver,
entry_stream.unwrap().to_string(),
blockstream.unwrap().to_string(),
bank.tick_height(),
leader_scheduler,
exit.clone(),
);
previous_receiver = entry_stream_receiver;
Some(entry_stream_stage)
previous_receiver = blockstream_receiver;
Some(blockstream_service)
} else {
None
};
@ -163,7 +163,7 @@ impl Tvu {
fetch_stage,
retransmit_stage,
replay_stage,
entry_stream_stage,
blockstream_service,
storage_stage,
exit,
}
@ -193,8 +193,8 @@ impl Service for Tvu {
self.retransmit_stage.join()?;
self.fetch_stage.join()?;
self.storage_stage.join()?;
if self.entry_stream_stage.is_some() {
self.entry_stream_stage.unwrap().join()?;
if self.blockstream_service.is_some() {
self.blockstream_service.unwrap().join()?;
}
self.replay_stage.join()?;
Ok(())