Remove in-memory state service (#974)

* Remove in-memory state service

* make the config compatible with toml again

* checkpoint commit to see how much I still have to revert

* back to the starting point...

* remove unused dependency

* reorganize error handling a bit

* need to make a new color-eyre release now

* reorder again because I have problems

* remove unnecessary helpers

* revert changes to config loading

* add back missing space

* Switch to released color-eyre version

* add back missing newline again...

* improve error message on unix when terminated by signal

* add context to last few asserts in acceptance tests

* instrument some of the helpers

* remove accidental extra space

* try to make this compile on windows

* reorg platform specific code

* hide on_disk module and fix broken link
This commit is contained in:
Jane Lusby 2020-09-01 12:39:04 -07:00 committed by GitHub
parent 3fdfcb3179
commit ffdec0cb23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 216 additions and 297 deletions

View File

@ -7,7 +7,10 @@ use color_eyre::eyre::{eyre, Report};
use once_cell::sync::Lazy;
use zebra_chain::block::{self, Block};
use zebra_chain::serialization::{ZcashDeserialize, ZcashDeserializeInto};
use zebra_chain::{
parameters::Network,
serialization::{ZcashDeserialize, ZcashDeserializeInto},
};
use zebra_test::transcript::{TransError, Transcript};
static VALID_BLOCK_TRANSCRIPT: Lazy<Vec<(Arc<Block>, Result<block::Hash, TransError>)>> =
@ -101,7 +104,10 @@ async fn check_transcripts_test() -> Result<(), Report> {
#[spandoc::spandoc]
async fn check_transcripts() -> Result<(), Report> {
zebra_test::init();
let state_service = zebra_state::in_memory::init();
let network = Network::Mainnet;
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), network);
let block_verifier = super::init(state_service.clone());
for transcript_data in &[

View File

@ -70,7 +70,7 @@ fn verifiers_from_checkpoint_list(
+ Clone
+ 'static,
) {
let state_service = zebra_state::in_memory::init();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), network);
let block_verifier = crate::block::init(state_service.clone());
let chain_verifier = super::init_from_verifiers(
network,
@ -245,12 +245,14 @@ async fn verify_checkpoint_test() -> Result<(), Report> {
async fn verify_checkpoint(config: Config) -> Result<(), Report> {
zebra_test::init();
let network = Network::Mainnet;
// Test that the chain::init function works. Most of the other tests use
// init_from_verifiers.
let chain_verifier = super::init(
config.clone(),
Network::Mainnet,
zebra_state::in_memory::init(),
network,
zebra_state::init(zebra_state::Config::ephemeral(), network),
)
.await;
@ -362,6 +364,7 @@ async fn continuous_blockchain_test() -> Result<(), Report> {
#[spandoc::spandoc]
async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<(), Report> {
zebra_test::init();
let network = Network::Mainnet;
// A continuous blockchain
let mut blockchain = Vec::new();
@ -401,7 +404,7 @@ async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<
.collect();
let checkpoint_list = CheckpointList::from_list(checkpoint_list).map_err(|e| eyre!(e))?;
let mut state_service = zebra_state::in_memory::init();
let mut state_service = zebra_state::init(zebra_state::Config::ephemeral(), network);
/// SPANDOC: Add blocks to the state from 0..=restart_height {?restart_height}
if restart_height.is_some() {
for block in blockchain
@ -426,7 +429,7 @@ async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<
let block_verifier = crate::block::init(state_service.clone());
let mut chain_verifier = super::init_from_verifiers(
Network::Mainnet,
network,
block_verifier,
Some(checkpoint_list),
state_service.clone(),

View File

@ -1,125 +0,0 @@
//! A basic implementation of the zebra-state service entirely in memory
//!
//! This service is provided as an independent implementation of the
//! zebra-state service to use in verifying the correctness of `on_disk`'s
//! `Service` implementation.
use super::{Request, Response};
use futures::prelude::*;
use std::{
error,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower::{buffer::Buffer, Service};
use zebra_chain::block;
mod block_index;
#[derive(Default)]
struct InMemoryState {
index: block_index::BlockIndex,
}
impl InMemoryState {
fn contains(&mut self, _hash: block::Hash) -> Result<Option<u32>, Error> {
todo!()
}
}
impl Service<Request> for InMemoryState {
type Response = Response;
type Error = Error;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
tracing::debug!(?req);
match req {
Request::AddBlock { block } => {
let result = self
.index
.insert(block)
.map(|hash| Response::Added { hash });
async { result }.boxed()
}
Request::GetBlock { hash } => {
let result = self
.index
.get(hash)
.map(|block| Response::Block { block })
.ok_or_else(|| "block could not be found".into());
async move { result }.boxed()
}
Request::GetTip => {
let result = self
.index
.get_tip()
.map(|block| block.hash())
.map(|hash| Response::Tip { hash })
.ok_or_else(|| "zebra-state contains no blocks".into());
async move { result }.boxed()
}
Request::GetDepth { hash } => {
let res = self.contains(hash);
async move {
let depth = res?;
Ok(Response::Depth(depth))
}
.boxed()
}
Request::GetBlockLocator { genesis } => {
let tip = self.index.get_tip();
let tip = match tip {
Some(tip) => tip,
None => {
return async move {
Ok(Response::BlockLocator {
block_locator: vec![genesis],
})
}
.boxed()
}
};
let tip_height = tip
.coinbase_height()
.expect("tip block will have a coinbase height");
let block_locator = crate::block_locator_heights(tip_height)
.map(|height| {
self.index
.get_main_chain_at(height)
.expect("there should be no holes in the chain")
})
.collect();
async move { Ok(Response::BlockLocator { block_locator }) }.boxed()
}
}
}
}
/// Return's a type that implement's the `zebra_state::Service` entirely in
/// memory using `HashMaps`
pub fn init() -> impl Service<
Request,
Response = Response,
Error = Error,
Future = impl Future<Output = Result<Response, Error>>,
> + Send
+ Clone
+ 'static {
Buffer::new(InMemoryState::default(), 1)
}
type Error = Box<dyn error::Error + Send + Sync + 'static>;

View File

@ -1,46 +0,0 @@
use std::{
collections::{btree_map::Entry, BTreeMap, HashMap},
error::Error,
sync::Arc,
};
use zebra_chain::block::{self, Block};
#[derive(Default)]
pub(super) struct BlockIndex {
by_hash: HashMap<block::Hash, Arc<Block>>,
height_map: BTreeMap<block::Height, block::Hash>,
}
impl BlockIndex {
pub(super) fn insert(
&mut self,
block: impl Into<Arc<Block>>,
) -> Result<block::Hash, Box<dyn Error + Send + Sync + 'static>> {
let block = block.into();
let hash = block.as_ref().into();
let height = block.coinbase_height().unwrap();
match self.height_map.entry(height) {
Entry::Vacant(entry) => {
let _ = entry.insert(hash);
let _ = self.by_hash.insert(hash, block);
Ok(hash)
}
Entry::Occupied(_) => Err("forks in the chain aren't supported yet")?,
}
}
pub(super) fn get(&self, hash: block::Hash) -> Option<Arc<Block>> {
self.by_hash.get(&hash).cloned()
}
pub(super) fn get_main_chain_at(&self, height: block::Height) -> Option<block::Hash> {
self.height_map.get(&height).cloned()
}
pub(super) fn get_tip(&self) -> Option<Arc<Block>> {
self.height_map.iter().next_back().map(|(_height, &hash)| {
self.get(hash)
.expect("block must be in pool to be in the height map")
})
}
}

View File

@ -20,14 +20,14 @@ use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{error, iter, sync::Arc};
use tower::{Service, ServiceExt};
use zebra_chain::{
block::{self, Block},
parameters::Network,
};
pub mod in_memory;
pub mod on_disk;
pub use on_disk::init;
mod on_disk;
/// The maturity threshold for transparent coinbase outputs.
///
@ -75,6 +75,8 @@ pub struct Config {
/// Ephemeral databases are stored in memory on Linux, and in a temporary directory on other OSes.
///
/// Set to `false` by default. If this is set to `true`, [`cache_dir`] is ignored.
///
/// [`cache_dir`]: struct.Config.html#structfield.cache_dir
pub ephemeral: bool,
}
@ -90,6 +92,7 @@ impl Config {
let config = sled::Config::default()
.cache_capacity(self.memory_cache_bytes)
.mode(sled::Mode::LowSpace);
if self.ephemeral {
config.temporary(self.ephemeral)
} else {
@ -97,6 +100,13 @@ impl Config {
config.path(path)
}
}
/// Construct a config for an ephemeral in memory database
pub fn ephemeral() -> Self {
let mut config = Self::default();
config.ephemeral = true;
config
}
}
impl Default for Config {
@ -104,6 +114,7 @@ impl Default for Config {
let cache_dir = dirs::cache_dir()
.unwrap_or_else(|| std::env::current_dir().unwrap().join("cache"))
.join("zebra");
Self {
cache_dir,
memory_cache_bytes: 512 * 1024 * 1024,

View File

@ -116,14 +116,9 @@ async fn check_transcripts(network: Network) -> Result<(), Report> {
Network::Testnet => testnet_transcript,
_ => mainnet_transcript,
} {
let service = in_memory::init();
let transcript = Transcript::from(transcript_data.iter().cloned());
/// SPANDOC: check the in memory service against the transcript
transcript.check(service).await?;
let storage_guard = TempDir::new("")?;
let cache_dir = storage_guard.path().to_owned();
let service = on_disk::init(
let service = zebra_state::init(
Config {
cache_dir,
..Config::default()

View File

@ -12,7 +12,7 @@ hex = "0.4.2"
lazy_static = "1.4.0"
tower = "0.3.1"
futures = "0.3.5"
color-eyre = "0.5"
color-eyre = "0.5.2"
tracing = "0.1.19"
tracing-subscriber = "0.2.11"
tracing-error = "0.1.2"

View File

@ -7,6 +7,7 @@ use std::process::{Child, Command, ExitStatus, Output};
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
use tracing::instrument;
/// Runs a command
pub fn test_cmd(command_path: &str, tempdir: &PathBuf) -> Result<Command> {
@ -83,54 +84,22 @@ pub struct TestStatus {
impl TestStatus {
pub fn assert_success(self) -> Result<Self> {
assert_success(&self.status, &self.cmd)?;
if !self.status.success() {
Err(eyre!("command exited unsuccessfully")).context_from(&self)?;
}
Ok(self)
}
pub fn assert_failure(self) -> Result<Self> {
assert_failure(&self.status, &self.cmd)?;
if self.status.success() {
Err(eyre!("command unexpectedly exited successfully")).context_from(&self)?;
}
Ok(self)
}
}
fn assert_success(status: &ExitStatus, cmd: &str) -> Result<()> {
if !status.success() {
let exit_code = || {
if let Some(code) = status.code() {
format!("Exit Code: {}", code)
} else {
"Exit Code: None".into()
}
};
Err(eyre!("command exited unsuccessfully"))
.with_section(|| cmd.to_string().header("Command:"))
.with_section(exit_code)?;
}
Ok(())
}
fn assert_failure(status: &ExitStatus, cmd: &str) -> Result<()> {
if status.success() {
let exit_code = || {
if let Some(code) = status.code() {
format!("Exit Code: {}", code)
} else {
"Exit Code: None".into()
}
};
Err(eyre!("command unexpectedly exited successfully"))
.with_section(|| cmd.to_string().header("Command:"))
.with_section(exit_code)?;
}
Ok(())
}
#[derive(Debug)]
pub struct TestChild {
pub cmd: String,
@ -141,9 +110,7 @@ impl TestChild {
#[spandoc::spandoc]
pub fn kill(&mut self) -> Result<()> {
/// SPANDOC: Killing child process
self.child
.kill()
.with_section(|| self.cmd.clone().header("Child Process:"))?;
self.child.kill().context_from(self)?;
Ok(())
}
@ -170,41 +137,22 @@ pub struct TestOutput {
impl TestOutput {
pub fn assert_success(self) -> Result<Self> {
let output = &self.output;
assert_success(&self.output.status, &self.cmd)
.with_section(|| {
String::from_utf8_lossy(output.stdout.as_slice())
.to_string()
.header("Stdout:")
})
.with_section(|| {
String::from_utf8_lossy(output.stderr.as_slice())
.to_string()
.header("Stderr:")
})?;
if !self.output.status.success() {
Err(eyre!("command exited unsuccessfully")).context_from(&self)?;
}
Ok(self)
}
pub fn assert_failure(self) -> Result<Self> {
let output = &self.output;
assert_failure(&self.output.status, &self.cmd)
.with_section(|| {
String::from_utf8_lossy(output.stdout.as_slice())
.to_string()
.header("Stdout:")
})
.with_section(|| {
String::from_utf8_lossy(output.stderr.as_slice())
.to_string()
.header("Stderr:")
})?;
if self.output.status.success() {
Err(eyre!("command unexpectedly exited successfully")).context_from(&self)?;
}
Ok(self)
}
#[instrument(skip(self))]
pub fn stdout_contains(&self, regex: &str) -> Result<&Self> {
let re = regex::Regex::new(regex)?;
let stdout = String::from_utf8_lossy(&self.output.stdout);
@ -215,16 +163,13 @@ impl TestOutput {
}
}
let command = || self.cmd.clone().header("Command:");
let stdout = || stdout.into_owned().header("Stdout:");
Err(eyre!(
"stdout of command did not contain any matches for the given regex"
))
.with_section(command)
.with_section(stdout)
.context_from(self)
}
#[instrument(skip(self))]
pub fn stdout_equals(&self, s: &str) -> Result<&Self> {
let stdout = String::from_utf8_lossy(&self.output.stdout);
@ -232,14 +177,10 @@ impl TestOutput {
return Ok(self);
}
let command = || self.cmd.clone().header("Command:");
let stdout = || stdout.into_owned().header("Stdout:");
Err(eyre!("stdout of command is not equal the given string"))
.with_section(command)
.with_section(stdout)
Err(eyre!("stdout of command is not equal the given string")).context_from(self)
}
#[instrument(skip(self))]
pub fn stdout_matches(&self, regex: &str) -> Result<&Self> {
let re = regex::Regex::new(regex)?;
let stdout = String::from_utf8_lossy(&self.output.stdout);
@ -248,20 +189,139 @@ impl TestOutput {
return Ok(self);
}
let command = || self.cmd.clone().header("Command:");
let stdout = || stdout.into_owned().header("Stdout:");
Err(eyre!("stdout of command is not equal to the given regex"))
.with_section(command)
.with_section(stdout)
Err(eyre!("stdout of command is not equal to the given regex")).context_from(self)
}
/// Returns true if the program was killed, false if exit was by another reason.
pub fn was_killed(&self) -> bool {
#[cfg(unix)]
return self.output.status.signal() == Some(9);
/// Returns Ok if the program was killed, Err(Report) if exit was by another
/// reason.
pub fn assert_was_killed(&self) -> Result<()> {
if self.was_killed() {
Err(eyre!("command was killed")).context_from(self)?
}
#[cfg(not(unix))]
return self.output.status.code() == Some(1);
Ok(())
}
/// Returns Ok if the program was not killed, Err(Report) if exit was by
/// another reason.
pub fn assert_was_not_killed(&self) -> Result<()> {
if !self.was_killed() {
Err(eyre!("command wasn't killed")).context_from(self)?
}
Ok(())
}
#[cfg(not(unix))]
fn was_killed(&self) -> bool {
self.output.status.code() != Some(1)
}
#[cfg(unix)]
fn was_killed(&self) -> bool {
self.output.status.signal() != Some(9)
}
}
/// Add context to an error report
pub trait ContextFrom<S> {
type Return;
fn context_from(self, source: &S) -> Self::Return;
}
impl<C, T, E> ContextFrom<C> for Result<T, E>
where
E: Into<Report>,
Report: ContextFrom<C, Return = Report>,
{
type Return = Result<T, Report>;
fn context_from(self, source: &C) -> Self::Return {
self.map_err(|e| e.into())
.map_err(|report| report.context_from(source))
}
}
impl ContextFrom<TestStatus> for Report {
type Return = Report;
fn context_from(self, source: &TestStatus) -> Self::Return {
let command = || source.cmd.clone().header("Command:");
self.with_section(command).context_from(&source.status)
}
}
impl ContextFrom<TestChild> for Report {
type Return = Report;
fn context_from(self, source: &TestChild) -> Self::Return {
let command = || source.cmd.clone().header("Command:");
let child = || format!("{:?}", source.child).header("Child Process:");
self.with_section(command).with_section(child)
}
}
impl ContextFrom<TestOutput> for Report {
type Return = Report;
fn context_from(self, source: &TestOutput) -> Self::Return {
self.with_section(|| source.cmd.clone().header("Command:"))
.context_from(&source.output)
}
}
impl ContextFrom<Output> for Report {
type Return = Report;
fn context_from(self, source: &Output) -> Self::Return {
let stdout = || {
String::from_utf8_lossy(&source.stdout)
.into_owned()
.header("Stdout:")
};
let stderr = || {
String::from_utf8_lossy(&source.stderr)
.into_owned()
.header("Stderr:")
};
self.context_from(&source.status)
.with_section(stdout)
.with_section(stderr)
}
}
impl ContextFrom<ExitStatus> for Report {
type Return = Report;
fn context_from(self, source: &ExitStatus) -> Self::Return {
let how = if source.success() {
"successfully"
} else {
"unsuccessfully"
};
if let Some(code) = source.code() {
return self.with_section(|| {
format!("command exited {} with status code {}", how, code).header("Exit Status:")
});
}
#[cfg(unix)]
if let Some(signal) = source.signal() {
self.with_section(|| {
format!("command terminated {} by signal {}", how, signal).header("Exit Status:")
})
} else {
unreachable!("on unix all processes either terminate via signal or with an exit code");
}
#[cfg(not(unix))]
self.with_section(|| {
format!("command exited {} without a status code or signal", how).header("Exit Status:")
})
}
}

View File

@ -42,7 +42,7 @@ impl StartCmd {
info!(?self, "starting to connect to the network");
let config = app_config();
let state = zebra_state::on_disk::init(config.state.clone(), config.network.network);
let state = zebra_state::init(config.state.clone(), config.network.network);
let verifier = zebra_consensus::chain::init(
config.consensus.clone(),
config.network.network,

View File

@ -13,7 +13,7 @@ use zebrad::config::ZebradConfig;
fn default_test_config() -> Result<ZebradConfig> {
let mut config = ZebradConfig::default();
config.state.ephemeral = true;
config.state = zebra_state::Config::ephemeral();
config.state.memory_cache_bytes = 256000000;
config.network.listen_addr = "127.0.0.1:0".parse()?;
@ -72,6 +72,21 @@ fn generate_no_args() -> Result<()> {
Ok(())
}
macro_rules! assert_with_context {
($pred:expr, $source:expr) => {
if !$pred {
use color_eyre::Section as _;
use color_eyre::SectionExt as _;
use zebra_test::command::ContextFrom as _;
let report = color_eyre::eyre::eyre!("failed assertion")
.section(stringify!($pred).header("Predicate:"))
.context_from($source);
panic!("Error: {:?}", report);
}
};
}
#[test]
fn generate_args() -> Result<()> {
zebra_test::init();
@ -103,13 +118,13 @@ fn generate_args() -> Result<()> {
)?;
let output = child.wait_with_output()?;
output.assert_success()?;
let output = output.assert_success()?;
// Check if the temp dir still exist
assert!(tempdir.exists());
assert_with_context!(tempdir.exists(), &output);
// Check if the file was created
assert!(generated_config_path.exists());
assert_with_context!(generated_config_path.exists(), &output);
Ok(())
}
@ -182,7 +197,7 @@ fn seed_no_args() -> Result<()> {
output.stdout_contains(r"Starting zebrad in seed mode")?;
// Make sure the command was killed
assert!(output.was_killed());
output.assert_was_killed()?;
Ok(())
}
@ -230,7 +245,7 @@ fn start_no_args() -> Result<()> {
output.stdout_contains(r"Starting zebrad$")?;
// Make sure the command was killed
assert!(output.was_killed());
output.assert_was_killed()?;
Ok(())
}
@ -248,7 +263,7 @@ fn start_args() -> Result<()> {
let output = child.wait_with_output()?;
// Make sure the command was killed
assert!(output.was_killed());
output.assert_was_killed()?;
output.assert_failure()?;
@ -273,11 +288,11 @@ fn persistent_mode() -> Result<()> {
let output = child.wait_with_output()?;
// Make sure the command was killed
assert!(output.was_killed());
output.assert_was_killed()?;
// Check that we have persistent sled database
let cache_dir = tempdir.join("state");
assert!(cache_dir.read_dir()?.count() > 0);
assert_with_context!(cache_dir.read_dir()?.count() > 0, &output);
Ok(())
}
@ -295,10 +310,10 @@ fn ephemeral_mode() -> Result<()> {
let output = child.wait_with_output()?;
// Make sure the command was killed
assert!(output.was_killed());
output.assert_was_killed()?;
let cache_dir = tempdir.join("state");
assert!(!cache_dir.exists());
assert_with_context!(!cache_dir.exists(), &output);
Ok(())
}
@ -330,10 +345,10 @@ fn misconfigured_ephemeral_mode() -> Result<()> {
let output = child.wait_with_output()?;
// Make sure the command was killed
assert!(output.was_killed());
output.assert_was_killed()?;
// Check that ephemeral takes precedence over cache_dir
assert_eq!(cache_dir.read_dir()?.count(), 0);
assert_with_context!(cache_dir.read_dir()?.count() == 0, &output);
Ok(())
}
@ -410,10 +425,10 @@ fn valid_generated_config(command: &str, expected_output: &str) -> Result<()> {
)?;
let output = child.wait_with_output()?;
output.assert_success()?;
let output = output.assert_success()?;
// Check if the file was created
assert!(generated_config_path.exists());
assert_with_context!(generated_config_path.exists(), &output);
// Run command using temp dir and kill it at 1 second
let mut child = get_child(
@ -436,13 +451,13 @@ fn valid_generated_config(command: &str, expected_output: &str) -> Result<()> {
// - run the tests in an isolated environment,
// - run zebrad on a custom cache path and port,
// - run zcashd on a custom port.
assert!(output.was_killed(), "Expected zebrad with generated config to succeed. Are there other acceptance test, zebrad, or zcashd processes running?");
output.assert_was_killed().expect("Expected zebrad with generated config to succeed. Are there other acceptance test, zebrad, or zcashd processes running?");
// Check if the temp dir still exists
assert!(tempdir.exists());
assert_with_context!(tempdir.exists(), &output);
// Check if the created config file still exists
assert!(generated_config_path.exists());
assert_with_context!(generated_config_path.exists(), &output);
Ok(())
}