change(rpc): Adds a TrustedChainSync struct for keeping up with Zebra's non-finalized best chain from a separate process (#8596)

* Adds an init_read_only() fn in zebra-state

* moves elasticsearch initialization to `FinalizedState::new_with_debug()`

* Updates callers of `FinalizedState::{new, new_with_debug}` to pass a bool to try enabling elasticsearch

* Adds a non-finalized read state syncer to zebra-rpc

* moves, removes, updates, or addresses TODOs

* reduces disk IO while waiting for the a new chain tip & updates the chain tip sender when the finalized tip has changed.

* Returns boxed errors from RpcRequestClient methods instead of color_eyre type

* Avoids resetting the non-finalized state when there's an error getting a block unless it has the missing block error code.

* Adds stub for acceptance test(s) and removes outdated TODO

* adds TODOs for testing

* Tests that `ChainTipChange` is updated when the non-finalized best chain grows

* adds a last_chain_tip_hash and uses a FuturesOrdered for getblock requests

* Fixes a pre-flush sync issue by using a secondary db instead of a read-only db

* Moves disk IO to blocking tasks

* Updates acceptance test to how forks are handled

* Checks synced read state for all of the expected blocks

* checks that there isn't a tip change until the best chain changes

* checks for chain tip changes in test

* run test without feature

* fixes lint

* Fixes compilation/test issues

* Adds docs / comments, moves HexData out from behind the getblocktemplate-rpcs feature flag, moves test behind the mining feature flag.

* Fixes lints

* removes syncer and rpc-syncer features

* Fixes test on Windows, applies suggestions from code review

* Updates `POLL_DELAY` documentation

* Updates method docs

* Fixes a test bug

* use rpc-client feature in zebrad production code

* use rpc-client feature in zebra-node-services for building zebra-rpc crate

---------

Co-authored-by: Pili Guerra <mpguerra@users.noreply.github.com>
Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
Arya 2024-07-09 10:15:47 -04:00 committed by GitHub
parent 2419e8a342
commit 4213e82a4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 855 additions and 76 deletions

View File

@ -1,12 +1,12 @@
//! A client for calling Zebra's JSON-RPC methods.
//!
//! Only used in tests and tools.
//! Used in the rpc sync scanning functionality and in various tests and tools.
use std::net::SocketAddr;
use reqwest::Client;
use color_eyre::{eyre::eyre, Result};
use crate::BoxError;
/// An HTTP client for making JSON-RPC requests.
#[derive(Clone, Debug)]
@ -99,7 +99,7 @@ impl RpcRequestClient {
&self,
method: impl AsRef<str>,
params: impl AsRef<str>,
) -> Result<T> {
) -> std::result::Result<T, BoxError> {
Self::json_result_from_response_text(&self.text_from_call(method, params).await?)
}
@ -107,13 +107,13 @@ impl RpcRequestClient {
/// Returns `Ok` with a deserialized `result` value in the expected type, or an error report.
fn json_result_from_response_text<T: serde::de::DeserializeOwned>(
response_text: &str,
) -> Result<T> {
) -> std::result::Result<T, BoxError> {
use jsonrpc_core::Output;
let output: Output = serde_json::from_str(response_text)?;
match output {
Output::Success(success) => Ok(serde_json::from_value(success.result)?),
Output::Failure(failure) => Err(eyre!("RPC call failed with: {failure:?}")),
Output::Failure(failure) => Err(failure.error.into()),
}
}
}

View File

@ -15,7 +15,6 @@ keywords = ["zebra", "zcash"]
categories = ["asynchronous", "cryptography::cryptocurrencies", "encoding", "network-programming"]
[features]
default = []
# Production features that activate extra dependencies, or extra features in dependencies
@ -77,7 +76,7 @@ proptest = { version = "1.4.0", optional = true }
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38", features = ["json-conversion"] }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] }
zebra-script = { path = "../zebra-script", version = "1.0.0-beta.38" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" }

View File

@ -9,6 +9,7 @@ pub mod constants;
pub mod methods;
pub mod queue;
pub mod server;
pub mod sync;
#[cfg(test)]
mod tests;

View File

@ -38,6 +38,7 @@ use crate::{
};
mod errors;
pub mod hex_data;
use errors::{MapServerError, OkOrServerError};
@ -171,6 +172,14 @@ pub trait Rpc {
#[rpc(name = "getbestblockhash")]
fn get_best_block_hash(&self) -> Result<GetBlockHash>;
/// Returns the height and hash of the current best blockchain tip block, as a [`GetBlockHeightAndHash`] JSON struct.
///
/// zcashd reference: none
/// method: post
/// tags: blockchain
#[rpc(name = "getbestblockheightandhash")]
fn get_best_block_height_and_hash(&self) -> Result<GetBlockHeightAndHash>;
/// Returns all transaction ids in the memory pool, as a JSON array.
///
/// zcashd reference: [`getrawmempool`](https://zcash.github.io/rpc/getrawmempool.html)
@ -867,7 +876,6 @@ where
.boxed()
}
// TODO: use a generic error constructor (#5548)
fn get_best_block_hash(&self) -> Result<GetBlockHash> {
self.latest_chain_tip
.best_tip_hash()
@ -875,7 +883,13 @@ where
.ok_or_server_error("No blocks in state")
}
// TODO: use a generic error constructor (#5548)
fn get_best_block_height_and_hash(&self) -> Result<GetBlockHeightAndHash> {
self.latest_chain_tip
.best_tip_height_and_hash()
.map(|(height, hash)| GetBlockHeightAndHash { height, hash })
.ok_or_server_error("No blocks in state")
}
fn get_raw_mempool(&self) -> BoxFuture<Result<Vec<String>>> {
#[cfg(feature = "getblocktemplate-rpcs")]
use zebra_chain::block::MAX_BLOCK_BYTES;
@ -1547,6 +1561,24 @@ impl Default for GetBlock {
#[serde(transparent)]
pub struct GetBlockHash(#[serde(with = "hex")] pub block::Hash);
/// Response to a `getbestblockheightandhash` RPC request.
#[derive(Copy, Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
pub struct GetBlockHeightAndHash {
/// The best chain tip block height
pub height: block::Height,
/// The best chain tip block hash
pub hash: block::Hash,
}
impl Default for GetBlockHeightAndHash {
fn default() -> Self {
Self {
height: block::Height::MIN,
hash: block::Hash([0; 32]),
}
}
}
impl Default for GetBlockHash {
fn default() -> Self {
GetBlockHash(block::Hash([0; 32]))

View File

@ -46,7 +46,6 @@ use crate::methods::{
types::{
get_block_template::GetBlockTemplate,
get_mining_info,
hex_data::HexData,
long_poll::LongPollInput,
peer_info::PeerInfo,
submit_block,
@ -54,7 +53,9 @@ use crate::methods::{
unified_address, validate_address, z_validate_address,
},
},
height_from_signed_int, GetBlockHash, MISSING_BLOCK_ERROR_CODE,
height_from_signed_int,
hex_data::HexData,
GetBlockHash, MISSING_BLOCK_ERROR_CODE,
};
pub mod constants;

View File

@ -3,7 +3,6 @@
pub mod default_roots;
pub mod get_block_template;
pub mod get_mining_info;
pub mod hex_data;
pub mod long_poll;
pub mod peer_info;
pub mod submit_block;

View File

@ -1,6 +1,6 @@
//! Parameter types for the `getblocktemplate` RPC.
use crate::methods::get_block_template_rpcs::types::{hex_data::HexData, long_poll::LongPollId};
use crate::methods::{get_block_template_rpcs::types::long_poll::LongPollId, hex_data::HexData};
/// Defines whether the RPC method should generate a block template or attempt to validate a block proposal.
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]

View File

@ -29,7 +29,7 @@ pub struct JsonParameters {
/// Response to a `submitblock` RPC request.
///
/// Zebra never returns "duplicate-invalid", because it does not store invalid blocks.
#[derive(Debug, PartialEq, Eq, serde::Serialize)]
#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum ErrorResponse {
/// Block was already committed to the non-finalized or finalized state
@ -45,7 +45,7 @@ pub enum ErrorResponse {
/// Response to a `submitblock` RPC request.
///
/// Zebra never returns "duplicate-invalid", because it does not store invalid blocks.
#[derive(Debug, PartialEq, Eq, serde::Serialize)]
#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(untagged)]
pub enum Response {
/// Block was not successfully submitted, return error

View File

@ -39,13 +39,13 @@ use crate::methods::{
get_block_template_rpcs::types::{
get_block_template::{self, GetBlockTemplateRequestMode},
get_mining_info,
hex_data::HexData,
long_poll::{LongPollId, LONG_POLL_ID_LENGTH},
peer_info::PeerInfo,
submit_block,
subsidy::BlockSubsidy,
unified_address, validate_address, z_validate_address,
},
hex_data::HexData,
tests::{snapshot::EXCESSIVE_BLOCK_HEIGHT, utils::fake_history_tree},
GetBlockHash, GetBlockTemplateRpc, GetBlockTemplateRpcImpl,
};

View File

@ -1259,8 +1259,9 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
GET_BLOCK_TEMPLATE_NONCE_RANGE_FIELD,
},
get_block_template::{self, GetBlockTemplateRequestMode},
types::{hex_data::HexData, long_poll::LONG_POLL_ID_LENGTH},
types::long_poll::LONG_POLL_ID_LENGTH,
},
hex_data::HexData,
tests::utils::fake_history_tree,
};
@ -1547,7 +1548,7 @@ async fn rpc_submitblock_errors() {
use zebra_chain::chain_sync_status::MockSyncStatus;
use zebra_network::address_book_peers::MockAddressBookPeers;
use crate::methods::get_block_template_rpcs::types::{hex_data::HexData, submit_block};
use crate::methods::{get_block_template_rpcs::types::submit_block, hex_data::HexData};
let _init_guard = zebra_test::init();

392
zebra-rpc/src/sync.rs Normal file
View File

@ -0,0 +1,392 @@
//! Syncer task for maintaining a non-finalized state in Zebra's ReadStateService and updating `ChainTipSender` via RPCs
use std::{net::SocketAddr, ops::RangeInclusive, sync::Arc, time::Duration};
use futures::{stream::FuturesOrdered, StreamExt};
use tokio::task::JoinHandle;
use tower::BoxError;
use tracing::info;
use zebra_chain::{
block::{self, Block, Height},
parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
serialization::ZcashDeserializeInto,
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_state::{
spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb,
MAX_BLOCK_REORG_HEIGHT,
};
use zebra_chain::diagnostic::task::WaitForPanics;
use crate::{
constants::MISSING_BLOCK_ERROR_CODE,
methods::{hex_data::HexData, GetBlockHeightAndHash},
};
/// How long to wait between calls to `getbestblockheightandhash` when it:
/// - Returns an error, or
/// - Returns the block hash of a block that the read state already contains,
/// (so that there's nothing for the syncer to do except wait for the next chain tip change).
/// See the [`TrustedChainSync::wait_for_chain_tip_change()`] method documentation for more information.
const POLL_DELAY: Duration = Duration::from_millis(200);
/// Syncs non-finalized blocks in the best chain from a trusted Zebra node's RPC methods.
#[derive(Debug)]
struct TrustedChainSync {
/// RPC client for calling Zebra's RPC methods.
rpc_client: RpcRequestClient,
/// The read state service.
db: ZebraDb,
/// The non-finalized state - currently only contains the best chain.
non_finalized_state: NonFinalizedState,
/// The chain tip sender for updating [`LatestChainTip`] and [`ChainTipChange`].
chain_tip_sender: ChainTipSender,
/// The non-finalized state sender, for updating the [`ReadStateService`] when the non-finalized best chain changes.
non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
}
impl TrustedChainSync {
/// Creates a new [`TrustedChainSync`] with a [`ChainTipSender`], then spawns a task to sync blocks
/// from the node's non-finalized best chain.
///
/// Returns the [`LatestChainTip`], [`ChainTipChange`], and a [`JoinHandle`] for the sync task.
pub async fn spawn(
rpc_address: SocketAddr,
db: ZebraDb,
non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
) -> (LatestChainTip, ChainTipChange, JoinHandle<()>) {
let rpc_client = RpcRequestClient::new(rpc_address);
let non_finalized_state = NonFinalizedState::new(&db.network());
let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
ChainTipSender::new(None, &db.network());
let mut syncer = Self {
rpc_client,
db,
non_finalized_state,
chain_tip_sender,
non_finalized_state_sender,
};
let sync_task = tokio::spawn(async move {
syncer.sync().await;
});
(latest_chain_tip, chain_tip_change, sync_task)
}
/// Starts syncing blocks from the node's non-finalized best chain and checking for chain tip changes in the finalized state.
///
/// When the best chain tip in Zebra is not available in the finalized state or the local non-finalized state,
/// gets any unavailable blocks in Zebra's best chain from the RPC server, adds them to the local non-finalized state, then
/// sends the updated chain tip block and non-finalized state to the [`ChainTipSender`] and non-finalized state sender.
async fn sync(&mut self) {
self.try_catch_up_with_primary().await;
let mut last_chain_tip_hash =
if let Some(finalized_tip_block) = self.finalized_chain_tip_block().await {
let last_chain_tip_hash = finalized_tip_block.hash;
self.chain_tip_sender.set_finalized_tip(finalized_tip_block);
last_chain_tip_hash
} else {
GENESIS_PREVIOUS_BLOCK_HASH
};
loop {
let (target_tip_height, target_tip_hash) =
self.wait_for_chain_tip_change(last_chain_tip_hash).await;
info!(
?target_tip_height,
?target_tip_hash,
"got a chain tip change"
);
if self.is_finalized_tip_change(target_tip_hash).await {
let block = self.finalized_chain_tip_block().await.expect(
"should have genesis block after successful bestblockheightandhash response",
);
last_chain_tip_hash = block.hash;
self.chain_tip_sender.set_finalized_tip(block);
continue;
}
// If the new best chain tip is unavailable in the finalized state, start syncing non-finalized blocks from
// the non-finalized best chain tip height or finalized tip height.
let (next_block_height, mut current_tip_hash) =
self.next_block_height_and_prev_hash().await;
last_chain_tip_hash = current_tip_hash;
let rpc_client = self.rpc_client.clone();
let mut block_futs =
rpc_client.block_range_ordered(next_block_height..=target_tip_height);
let should_reset_non_finalized_state = loop {
let block = match block_futs.next().await {
Some(Ok(Some(block)))
if block.header.previous_block_hash == current_tip_hash =>
{
SemanticallyVerifiedBlock::from(block)
}
// Clear the non-finalized state and re-fetch every block past the finalized tip if:
// - the next block's previous block hash doesn't match the expected hash,
// - the next block is missing
// - the target tip hash is missing from the blocks in `block_futs`
// because there was likely a chain re-org/fork.
Some(Ok(_)) | None => break true,
// If calling the `getblock` RPC method fails with an unexpected error, wait for the next chain tip change
// without resetting the non-finalized state.
Some(Err(err)) => {
tracing::warn!(
?err,
"encountered an unexpected error while calling getblock method"
);
break false;
}
};
let block_hash = block.hash;
let commit_result = if self.non_finalized_state.chain_count() == 0 {
self.non_finalized_state
.commit_new_chain(block.clone(), &self.db)
} else {
self.non_finalized_state
.commit_block(block.clone(), &self.db)
};
// The previous block hash is checked above, if committing the block fails for some reason, try again.
if let Err(error) = commit_result {
tracing::warn!(
?error,
?block_hash,
"failed to commit block to non-finalized state"
);
break false;
}
// TODO: Check the finalized tip height and finalize blocks from the non-finalized state until
// all non-finalized state chain root previous block hashes match the finalized tip hash.
while self
.non_finalized_state
.best_chain_len()
.expect("just successfully inserted a non-finalized block above")
> MAX_BLOCK_REORG_HEIGHT
{
tracing::trace!("finalizing block past the reorg limit");
self.non_finalized_state.finalize();
}
self.update_channels(block);
current_tip_hash = block_hash;
last_chain_tip_hash = current_tip_hash;
// If the block hash matches the output from the `getbestblockhash` RPC method, we can wait until
// the best block hash changes to get the next block.
if block_hash == target_tip_hash {
break false;
}
};
if should_reset_non_finalized_state {
self.try_catch_up_with_primary().await;
let block = self.finalized_chain_tip_block().await.expect(
"should have genesis block after successful bestblockheightandhash response",
);
last_chain_tip_hash = block.hash;
self.non_finalized_state =
NonFinalizedState::new(&self.non_finalized_state.network);
self.update_channels(block);
}
}
}
/// Tries to catch up to the primary db instance for an up-to-date view of finalized blocks.
async fn try_catch_up_with_primary(&self) {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
if let Err(catch_up_error) = db.try_catch_up_with_primary() {
tracing::warn!(?catch_up_error, "failed to catch up to primary");
}
})
.wait_for_panics()
.await
}
/// If the non-finalized state is empty, tries to catch up to the primary db instance for
/// an up-to-date view of finalized blocks.
///
/// Returns true if the non-finalized state is empty and the target hash is in the finalized state.
async fn is_finalized_tip_change(&self, target_tip_hash: block::Hash) -> bool {
self.non_finalized_state.chain_count() == 0 && {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
if let Err(catch_up_error) = db.try_catch_up_with_primary() {
tracing::warn!(?catch_up_error, "failed to catch up to primary");
}
db.contains_hash(target_tip_hash)
})
.wait_for_panics()
.await
}
}
/// Returns the current tip hash and the next height immediately after the current tip height.
async fn next_block_height_and_prev_hash(&self) -> (block::Height, block::Hash) {
if let Some(tip) = self.non_finalized_state.best_tip() {
Some(tip)
} else {
let db = self.db.clone();
tokio::task::spawn_blocking(move || db.tip())
.wait_for_panics()
.await
}
.map(|(current_tip_height, current_tip_hash)| {
(
current_tip_height.next().expect("should be valid height"),
current_tip_hash,
)
})
.unwrap_or((Height::MIN, GENESIS_PREVIOUS_BLOCK_HASH))
}
/// Reads the finalized tip block from the secondary db instance and converts it to a [`ChainTipBlock`].
async fn finalized_chain_tip_block(&self) -> Option<ChainTipBlock> {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let (height, hash) = db.tip()?;
db.block(height.into())
.map(|block| CheckpointVerifiedBlock::with_hash(block, hash))
.map(ChainTipBlock::from)
})
.wait_for_panics()
.await
}
/// Accepts a block hash.
///
/// Polls `getbestblockheightandhash` RPC method until it successfully returns a block hash that is different from the last chain tip hash.
///
/// Returns the node's best block hash.
async fn wait_for_chain_tip_change(
&self,
last_chain_tip_hash: block::Hash,
) -> (block::Height, block::Hash) {
loop {
let Some(target_height_and_hash) = self
.rpc_client
.get_best_block_height_and_hash()
.await
.filter(|&(_height, hash)| hash != last_chain_tip_hash)
else {
// If `get_best_block_height_and_hash()` returns an error, or returns
// the current chain tip hash, wait [`POLL_DELAY`], then try again.
tokio::time::sleep(POLL_DELAY).await;
continue;
};
break target_height_and_hash;
}
}
/// Sends the new chain tip and non-finalized state to the latest chain channels.
fn update_channels(&mut self, best_tip: impl Into<ChainTipBlock>) {
// If the final receiver was just dropped, ignore the error.
let _ = self
.non_finalized_state_sender
.send(self.non_finalized_state.clone());
self.chain_tip_sender
.set_best_non_finalized_tip(Some(best_tip.into()));
}
}
/// Accepts a [zebra-state configuration](zebra_state::Config), a [`Network`], and
/// the [`SocketAddr`] of a Zebra node's RPC server.
///
/// Initializes a [`ReadStateService`] and a [`TrustedChainSync`] to update the
/// non-finalized best chain and the latest chain tip.
///
/// Returns a [`ReadStateService`], [`LatestChainTip`], [`ChainTipChange`], and
/// a [`JoinHandle`] for the sync task.
pub fn init_read_state_with_syncer(
config: zebra_state::Config,
network: &Network,
rpc_address: SocketAddr,
) -> tokio::task::JoinHandle<
Result<
(
ReadStateService,
LatestChainTip,
ChainTipChange,
tokio::task::JoinHandle<()>,
),
BoxError,
>,
> {
let network = network.clone();
tokio::spawn(async move {
if config.ephemeral {
return Err("standalone read state service cannot be used with ephemeral state".into());
}
let (read_state, db, non_finalized_state_sender) =
spawn_init_read_only(config, &network).await?;
let (latest_chain_tip, chain_tip_change, sync_task) =
TrustedChainSync::spawn(rpc_address, db, non_finalized_state_sender).await;
Ok((read_state, latest_chain_tip, chain_tip_change, sync_task))
})
}
trait SyncerRpcMethods {
async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)>;
async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError>;
fn block_range_ordered(
&self,
height_range: RangeInclusive<Height>,
) -> FuturesOrdered<impl std::future::Future<Output = Result<Option<Arc<Block>>, BoxError>>>
{
let &Height(start_height) = height_range.start();
let &Height(end_height) = height_range.end();
let mut futs = FuturesOrdered::new();
for height in start_height..=end_height {
futs.push_back(self.get_block(height));
}
futs
}
}
impl SyncerRpcMethods for RpcRequestClient {
async fn get_best_block_height_and_hash(&self) -> Option<(block::Height, block::Hash)> {
self.json_result_from_call("getbestblockheightandhash", "[]")
.await
.map(|GetBlockHeightAndHash { height, hash }| (height, hash))
.ok()
}
async fn get_block(&self, height: u32) -> Result<Option<Arc<Block>>, BoxError> {
match self
.json_result_from_call("getblock", format!(r#"["{}", 0]"#, height))
.await
{
Ok(HexData(raw_block)) => {
let block = raw_block.zcash_deserialize_into::<Block>()?;
Ok(Some(Arc::new(block)))
}
Err(err)
if err
.downcast_ref::<jsonrpc_core::Error>()
.is_some_and(|err| err.code == MISSING_BLOCK_ERROR_CODE) =>
{
Ok(None)
}
Err(err) => Err(err),
}
}
}

View File

@ -63,14 +63,12 @@ pub use service::finalized_state::{
// Allow use in the scanner and external tests
#[cfg(any(test, feature = "proptest-impl", feature = "shielded-scan"))]
pub use service::{
finalized_state::{
DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, TypedColumnFamily, WriteDisk,
WriteTypedBatch, ZebraDb,
},
ReadStateService,
pub use service::finalized_state::{
DiskWriteBatch, FromDisk, IntoDisk, ReadDisk, TypedColumnFamily, WriteDisk, WriteTypedBatch,
};
pub use service::{finalized_state::ZebraDb, ReadStateService};
#[cfg(feature = "getblocktemplate-rpcs")]
pub use response::GetBlockTemplateChainInfo;

View File

@ -567,6 +567,11 @@ impl DiskDb {
);
}
/// When called with a secondary DB instance, tries to catch up with the primary DB instance
pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
self.db.try_catch_up_with_primary()
}
/// Returns a forward iterator over the items in `cf` in `range`.
///
/// Holding this iterator open might delay block commit transactions.
@ -834,7 +839,23 @@ impl DiskDb {
.map(|cf_name| rocksdb::ColumnFamilyDescriptor::new(cf_name, db_options.clone()));
let db_result = if read_only {
DB::open_cf_descriptors_read_only(&db_options, &path, column_families, false)
// Use a tempfile for the secondary instance cache directory
let secondary_config = Config {
ephemeral: true,
..config.clone()
};
let secondary_path =
secondary_config.db_path("secondary_state", format_version_in_code.major, network);
let create_dir_result = std::fs::create_dir_all(&secondary_path);
info!(?create_dir_result, "creating secondary db directory");
DB::open_cf_descriptors_as_secondary(
&db_options,
&path,
&secondary_path,
column_families,
)
} else {
DB::open_cf_descriptors(&db_options, &path, column_families)
};

View File

@ -233,6 +233,11 @@ impl ZebraDb {
}
}
/// When called with a secondary DB instance, tries to catch up with the primary DB instance
pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
self.db.try_catch_up_with_primary()
}
/// Shut down the database, cleaning up background tasks and ephemeral data.
///
/// If `force` is true, clean up regardless of any shared references.

View File

@ -161,7 +161,7 @@ test_sync_past_mandatory_checkpoint_testnet = []
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.38" }
zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.38" }
zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38" }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] }
zebra-rpc = { path = "../zebra-rpc", version = "1.0.0-beta.38" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38" }
@ -286,8 +286,6 @@ zebra-network = { path = "../zebra-network", version = "1.0.0-beta.38", features
zebra-scan = { path = "../zebra-scan", version = "0.1.0-alpha.7", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.38", features = ["proptest-impl"] }
zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.38", features = ["rpc-client"] }
zebra-test = { path = "../zebra-test", version = "1.0.0-beta.38" }
zebra-grpc = { path = "../zebra-grpc", version = "0.1.0-alpha.5" }

View File

@ -30,13 +30,11 @@ use zebra_node_services::mempool;
use zebra_rpc::{
config::mining::Config,
methods::{
get_block_template_rpcs::{
get_block_template::{
get_block_template_rpcs::get_block_template::{
self, proposal::TimeSource, proposal_block_from_template,
GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*,
},
types::hex_data::HexData,
},
hex_data::HexData,
GetBlockTemplateRpc, GetBlockTemplateRpcImpl,
},
};

View File

@ -3161,3 +3161,282 @@ async fn regtest_submit_blocks() -> Result<()> {
common::regtest::submit_blocks_test().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[cfg(feature = "getblocktemplate-rpcs")]
async fn trusted_chain_sync_handles_forks_correctly() -> Result<()> {
use std::sync::Arc;
use common::regtest::MiningRpcMethods;
use eyre::Error;
use tokio::time::timeout;
use zebra_chain::{
chain_tip::ChainTip, parameters::NetworkUpgrade,
primitives::byte_array::increment_big_endian,
};
use zebra_rpc::methods::GetBlockHash;
use zebra_state::{ReadResponse, Response};
let _init_guard = zebra_test::init();
let mut config = random_known_rpc_port_config(false, &Network::new_regtest(None))?;
config.state.ephemeral = false;
let network = config.network.network.clone();
let rpc_address = config.rpc.listen_addr.unwrap();
let test_dir = testdir()?.with_config(&mut config)?;
let mut child = test_dir.spawn_child(args!["start"])?;
tracing::info!("waiting for Zebra state cache to be opened");
#[cfg(not(target_os = "windows"))]
child.expect_stdout_line_matches("marked database format as newly created")?;
#[cfg(target_os = "windows")]
tokio::time::sleep(LAUNCH_DELAY).await;
tracing::info!("starting read state with syncer");
// Spawn a read state with the RPC syncer to check that it has the same best chain as Zebra
let (read_state, _latest_chain_tip, mut chain_tip_change, _sync_task) =
zebra_rpc::sync::init_read_state_with_syncer(
config.state,
&config.network.network,
rpc_address,
)
.await?
.map_err(|err| eyre!(err))?;
tracing::info!("waiting for first chain tip change");
// Wait for Zebrad to start up
let tip_action = timeout(LAUNCH_DELAY, chain_tip_change.wait_for_tip_change()).await??;
assert!(
tip_action.is_reset(),
"first tip action should be a reset for the genesis block"
);
tracing::info!("got genesis chain tip change, submitting more blocks ..");
let rpc_client = RpcRequestClient::new(rpc_address);
let mut blocks = Vec::new();
for _ in 0..10 {
let (block, height) = rpc_client.submit_block_from_template().await?;
blocks.push(block);
let tip_action = timeout(
Duration::from_secs(1),
chain_tip_change.wait_for_tip_change(),
)
.await??;
assert_eq!(
tip_action.best_tip_height(),
height,
"tip action height should match block submission"
);
}
tracing::info!("checking that read state has the new non-finalized best chain blocks");
for expected_block in blocks.clone() {
let height = expected_block.coinbase_height().unwrap();
let zebra_block = rpc_client
.get_block(height.0 as i32)
.await
.map_err(|err| eyre!(err))?
.expect("Zebra test child should have the expected block");
assert_eq!(
zebra_block,
Arc::new(expected_block),
"Zebra should have the same block"
);
let ReadResponse::Block(read_state_block) = read_state
.clone()
.oneshot(zebra_state::ReadRequest::Block(height.into()))
.await
.map_err(|err| eyre!(err))?
else {
unreachable!("unexpected read response to a block request")
};
assert_eq!(
zebra_block,
read_state_block.expect("read state should have the block"),
"read state should have the same block"
);
}
tracing::info!("getting next block template");
let (block_11, _) = rpc_client.block_from_template(Height(100)).await?;
let next_blocks: Vec<_> = blocks
.split_off(5)
.into_iter()
.chain(std::iter::once(block_11))
.collect();
tracing::info!("creating populated state");
let genesis_block = regtest_genesis_block();
let (state2, read_state2, latest_chain_tip2, _chain_tip_change2) =
zebra_state::populated_state(
std::iter::once(genesis_block).chain(blocks.iter().cloned().map(Arc::new)),
&network,
)
.await;
tracing::info!("attempting to trigger a best chain change");
for mut block in next_blocks {
let is_chain_history_activation_height = NetworkUpgrade::Heartwood
.activation_height(&network)
== Some(block.coinbase_height().unwrap());
let header = Arc::make_mut(&mut block.header);
increment_big_endian(header.nonce.as_mut());
let ReadResponse::ChainInfo(chain_info) = read_state2
.clone()
.oneshot(zebra_state::ReadRequest::ChainInfo)
.await
.map_err(|err| eyre!(err))?
else {
unreachable!("wrong response variant");
};
header.previous_block_hash = chain_info.tip_hash;
header.commitment_bytes = chain_info
.history_tree
.hash()
.or(is_chain_history_activation_height.then_some([0; 32].into()))
.expect("history tree can't be empty")
.bytes_in_serialized_order()
.into();
let Response::Committed(block_hash) = state2
.clone()
.oneshot(zebra_state::Request::CommitSemanticallyVerifiedBlock(
Arc::new(block.clone()).into(),
))
.await
.map_err(|err| eyre!(err))?
else {
unreachable!("wrong response variant");
};
assert!(
chain_tip_change.last_tip_change().is_none(),
"there should be no tip change until the last block is submitted"
);
rpc_client.submit_block(block.clone()).await?;
blocks.push(block);
let GetBlockHash(best_block_hash) = rpc_client
.json_result_from_call("getbestblockhash", "[]")
.await
.map_err(|err| eyre!(err))?;
if block_hash == best_block_hash {
break;
}
}
tracing::info!("newly submitted blocks are in the best chain, checking for reset");
tokio::time::sleep(Duration::from_secs(3)).await;
let tip_action = timeout(
Duration::from_secs(1),
chain_tip_change.wait_for_tip_change(),
)
.await??;
let (expected_height, expected_hash) = latest_chain_tip2
.best_tip_height_and_hash()
.expect("should have a chain tip");
assert!(tip_action.is_reset(), "tip action should be reset");
assert_eq!(
tip_action.best_tip_hash_and_height(),
(expected_hash, expected_height),
"tip action hashes and heights should match"
);
tracing::info!("checking that read state has the new non-finalized best chain blocks");
for expected_block in blocks {
let height = expected_block.coinbase_height().unwrap();
let zebra_block = rpc_client
.get_block(height.0 as i32)
.await
.map_err(|err| eyre!(err))?
.expect("Zebra test child should have the expected block");
assert_eq!(
zebra_block,
Arc::new(expected_block),
"Zebra should have the same block"
);
let ReadResponse::Block(read_state_block) = read_state
.clone()
.oneshot(zebra_state::ReadRequest::Block(height.into()))
.await
.map_err(|err| eyre!(err))?
else {
unreachable!("unexpected read response to a block request")
};
assert_eq!(
zebra_block,
read_state_block.expect("read state should have the block"),
"read state should have the same block"
);
}
tracing::info!("restarting Zebra on Mainnet");
child.kill(false)?;
let output = child.wait_with_output()?;
// Make sure the command was killed
output.assert_was_killed()?;
output.assert_failure()?;
let mut config = random_known_rpc_port_config(false, &Network::Mainnet)?;
config.state.ephemeral = false;
let rpc_address = config.rpc.listen_addr.unwrap();
let test_dir = testdir()?.with_config(&mut config)?;
let mut child = test_dir.spawn_child(args!["start"])?;
tracing::info!("waiting for Zebra state cache to be opened");
#[cfg(not(target_os = "windows"))]
child.expect_stdout_line_matches("marked database format as newly created")?;
#[cfg(target_os = "windows")]
tokio::time::sleep(LAUNCH_DELAY).await;
tracing::info!("starting read state with syncer");
// Spawn a read state with the RPC syncer to check that it has the same best chain as Zebra
let (_read_state, _latest_chain_tip, mut chain_tip_change, _sync_task) =
zebra_rpc::sync::init_read_state_with_syncer(
config.state,
&config.network.network,
rpc_address,
)
.await?
.map_err(|err| eyre!(err))?;
tracing::info!("waiting for finalized chain tip changes");
timeout(
Duration::from_secs(100),
tokio::spawn(async move {
for _ in 0..2 {
chain_tip_change
.wait_for_tip_change()
.await
.map_err(|err| eyre!(err))?;
}
Ok::<(), Error>(())
}),
)
.await???;
Ok(())
}

View File

@ -1,6 +1,6 @@
//! Tests that `getpeerinfo` RPC method responds with info about at least 1 peer.
use color_eyre::eyre::{Context, Result};
use color_eyre::eyre::{eyre, Context, Result};
use zebra_chain::parameters::Network;
use zebra_node_services::rpc_client::RpcRequestClient;
@ -41,7 +41,8 @@ pub(crate) async fn run() -> Result<()> {
// call `getpeerinfo` RPC method
let peer_info_result: Vec<PeerInfo> = RpcRequestClient::new(rpc_address)
.json_result_from_call("getpeerinfo", "[]".to_string())
.await?;
.await
.map_err(|err| eyre!(err))?;
assert!(
!peer_info_result.is_empty(),

View File

@ -5,19 +5,28 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};
use color_eyre::eyre::{Context, Result};
use color_eyre::eyre::{eyre, Context, Result};
use tower::BoxError;
use tracing::*;
use zebra_chain::{
block::{Block, Height},
parameters::{testnet::REGTEST_NU5_ACTIVATION_HEIGHT, Network, NetworkUpgrade},
primitives::byte_array::increment_big_endian,
serialization::ZcashSerialize,
serialization::{ZcashDeserializeInto, ZcashSerialize},
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_rpc::{
methods::get_block_template_rpcs::get_block_template::{
constants::MISSING_BLOCK_ERROR_CODE,
methods::{
get_block_template_rpcs::{
get_block_template::{
proposal::TimeSource, proposal_block_from_template, GetBlockTemplate,
},
types::submit_block,
},
hex_data::HexData,
},
server::OPENED_RPC_ENDPOINT_MSG,
};
use zebra_test::args;
@ -66,23 +75,10 @@ pub(crate) async fn submit_blocks_test() -> Result<()> {
async fn submit_blocks(network: Network, rpc_address: SocketAddr) -> Result<()> {
let client = RpcRequestClient::new(rpc_address);
for height in 1..=NUM_BLOCKS_TO_SUBMIT {
let block_template: GetBlockTemplate = client
.json_result_from_call("getblocktemplate", "[]".to_string())
.await
.expect("response should be success output with a serialized `GetBlockTemplate`");
let network_upgrade = if height < REGTEST_NU5_ACTIVATION_HEIGHT.try_into().unwrap() {
NetworkUpgrade::Canopy
} else {
NetworkUpgrade::Nu5
};
let mut block =
proposal_block_from_template(&block_template, TimeSource::default(), network_upgrade)?;
let height = block
.coinbase_height()
.expect("should have a coinbase height");
for _ in 1..=NUM_BLOCKS_TO_SUBMIT {
let (mut block, height) = client
.block_from_template(Height(REGTEST_NU5_ACTIVATION_HEIGHT))
.await?;
while !network.disable_pow()
&& zebra_consensus::difficulty_is_valid(&block.header, &network, &height, &block.hash())
@ -91,29 +87,87 @@ async fn submit_blocks(network: Network, rpc_address: SocketAddr) -> Result<()>
increment_big_endian(Arc::make_mut(&mut block.header).nonce.as_mut());
}
let block_data = hex::encode(block.zcash_serialize_to_vec()?);
let submit_block_response = client
.text_from_call("submitblock", format!(r#"["{block_data}"]"#))
.await?;
let was_submission_successful = submit_block_response.contains(r#""result":null"#);
if height.0 % 40 == 0 {
info!(
was_submission_successful,
?block_template,
?network_upgrade,
"submitted block"
);
info!(?block, ?height, "submitting block");
}
// Check that the block was validated and committed.
assert!(
submit_block_response.contains(r#""result":null"#),
"unexpected response from submitblock RPC, should be null, was: {submit_block_response}"
);
client.submit_block(block).await?;
}
Ok(())
}
pub trait MiningRpcMethods {
async fn block_from_template(&self, nu5_activation_height: Height) -> Result<(Block, Height)>;
async fn submit_block(&self, block: Block) -> Result<()>;
async fn submit_block_from_template(&self) -> Result<(Block, Height)>;
async fn get_block(&self, height: i32) -> Result<Option<Arc<Block>>, BoxError>;
}
impl MiningRpcMethods for RpcRequestClient {
async fn block_from_template(&self, nu5_activation_height: Height) -> Result<(Block, Height)> {
let block_template: GetBlockTemplate = self
.json_result_from_call("getblocktemplate", "[]".to_string())
.await
.expect("response should be success output with a serialized `GetBlockTemplate`");
let height = Height(block_template.height);
let network_upgrade = if height < nu5_activation_height {
NetworkUpgrade::Canopy
} else {
NetworkUpgrade::Nu5
};
Ok((
proposal_block_from_template(&block_template, TimeSource::default(), network_upgrade)?,
height,
))
}
async fn submit_block(&self, block: Block) -> Result<()> {
let block_data = hex::encode(block.zcash_serialize_to_vec()?);
let submit_block_response: submit_block::Response = self
.json_result_from_call("submitblock", format!(r#"["{block_data}"]"#))
.await
.map_err(|err| eyre!(err))?;
match submit_block_response {
submit_block::Response::Accepted => Ok(()),
submit_block::Response::ErrorResponse(err) => {
Err(eyre!("block submission failed: {err:?}"))
}
}
}
async fn submit_block_from_template(&self) -> Result<(Block, Height)> {
let (block, height) = self
.block_from_template(Height(REGTEST_NU5_ACTIVATION_HEIGHT))
.await?;
self.submit_block(block.clone()).await?;
Ok((block, height))
}
async fn get_block(&self, height: i32) -> Result<Option<Arc<Block>>, BoxError> {
match self
.json_result_from_call("getblock", format!(r#"["{}", 0]"#, height))
.await
{
Ok(HexData(raw_block)) => {
let block = raw_block.zcash_deserialize_into::<Block>()?;
Ok(Some(Arc::new(block)))
}
Err(err)
if err
.downcast_ref::<jsonrpc_core::Error>()
.is_some_and(|err| err.code == MISSING_BLOCK_ERROR_CODE) =>
{
Ok(None)
}
Err(err) => Err(err),
}
}
}