reduces disk IO while waiting for the a new chain tip & updates the chain tip sender when the finalized tip has changed.

This commit is contained in:
Arya 2024-06-06 21:37:31 -04:00
parent f94bd417a1
commit d57c50aab1
1 changed files with 100 additions and 80 deletions

View File

@ -11,8 +11,8 @@ use zebra_chain::{
};
use zebra_node_services::rpc_client::RpcRequestClient;
use zebra_state::{
spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, CheckpointVerifiedBlock,
LatestChainTip, NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb,
spawn_init_read_only, ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip,
NonFinalizedState, ReadStateService, SemanticallyVerifiedBlock, ZebraDb,
MAX_BLOCK_REORG_HEIGHT,
};
@ -28,8 +28,6 @@ const POLL_DELAY: Duration = Duration::from_millis(100);
struct TrustedChainSync {
/// RPC client for calling Zebra's RPC methods.
rpc_client: RpcRequestClient,
/// Information about the next block height to request and how it should be processed.
cursor: SyncCursor,
/// The read state service
db: ZebraDb,
/// The non-finalized state - currently only contains the best chain.
@ -40,21 +38,28 @@ struct TrustedChainSync {
non_finalized_state_sender: tokio::sync::watch::Sender<NonFinalizedState>,
}
enum NewChainTip {
/// Information about the next block height to request and how it should be processed.
Cursor(SyncCursor),
/// The latest finalized tip.
Block(Arc<Block>),
}
struct SyncCursor {
/// The best chain tip height in this process.
tip_height: Height,
/// The best chain tip hash in this process.
tip_hash: block::Hash,
/// The best chain tip hash in the Zebra node.
node_tip_hash: block::Hash,
target_tip_hash: block::Hash,
}
impl SyncCursor {
fn new(tip_height: Height, tip_hash: block::Hash, node_tip_hash: block::Hash) -> Self {
fn new(tip_height: Height, tip_hash: block::Hash, target_tip_hash: block::Hash) -> Self {
Self {
tip_height,
tip_hash,
node_tip_hash,
target_tip_hash,
}
}
}
@ -68,21 +73,11 @@ impl TrustedChainSync {
) -> (LatestChainTip, ChainTipChange, JoinHandle<()>) {
let rpc_client = RpcRequestClient::new(rpc_address);
let non_finalized_state = NonFinalizedState::new(&db.network());
let cursor = wait_for_chain_tip_change(&rpc_client, &non_finalized_state, &db).await;
let tip = {
let db = db.clone();
tokio::task::spawn_blocking(move || db.tip_block())
.wait_for_panics()
.await
.expect("checked for genesis block above")
};
let chain_tip: ChainTipBlock = CheckpointVerifiedBlock::from(tip).into();
let (chain_tip_sender, latest_chain_tip, chain_tip_change) =
ChainTipSender::new(chain_tip, &db.network());
ChainTipSender::new(None, &db.network());
let mut syncer = Self {
rpc_client,
cursor,
db,
non_finalized_state,
chain_tip_sender,
@ -99,40 +94,42 @@ impl TrustedChainSync {
/// Starts syncing blocks from the node's non-finalized best chain.
async fn sync(&mut self) {
loop {
let has_found_new_best_chain = self.sync_new_blocks().await;
if has_found_new_best_chain {
self.non_finalized_state = NonFinalizedState::new(&self.db.network());
let _ = self
.non_finalized_state_sender
.send(self.non_finalized_state.clone());
}
// Wait until the best block hash in Zebra is different from the tip hash in this read state
self.cursor = self.wait_for_chain_tip_change().await;
match self.wait_for_chain_tip_change().await {
NewChainTip::Cursor(cursor) => {
self.sync_new_blocks(cursor).await;
}
NewChainTip::Block(block) => update_channels(
block,
&self.non_finalized_state,
&mut self.non_finalized_state_sender,
&mut self.chain_tip_sender,
),
}
}
}
async fn sync_new_blocks(&mut self) -> bool {
loop {
async fn sync_new_blocks(&mut self, mut cursor: SyncCursor) {
let has_found_new_best_chain = loop {
let Some(block) = self
.rpc_client
// If we fail to get a block for any reason, we assume the block is missing and the chain hasn't grown, so there must have
// been a chain re-org/fork, and we can clear the non-finalized state and re-fetch every block past the finalized tip.
// It should always deserialize successfully, but this resets the non-finalized state if it somehow fails.
.get_block(self.cursor.tip_height)
.get_block(cursor.tip_height)
.await
.map(SemanticallyVerifiedBlock::from)
// If the next block's previous block hash doesn't match the expected hash, there must have
// been a chain re-org/fork, and we can clear the non-finalized state and re-fetch every block
// past the finalized tip.
.filter(|block| block.block.header.previous_block_hash == self.cursor.tip_hash)
.filter(|block| block.block.header.previous_block_hash == cursor.tip_hash)
else {
break true;
};
let parent_hash = block.block.header.previous_block_hash;
if parent_hash != self.cursor.tip_hash {
if parent_hash != cursor.tip_hash {
break true;
}
@ -155,13 +152,6 @@ impl TrustedChainSync {
continue;
}
update_channels(
block,
&self.non_finalized_state,
&mut self.non_finalized_state_sender,
&mut self.chain_tip_sender,
);
while self
.non_finalized_state
.best_chain_len()
@ -172,24 +162,86 @@ impl TrustedChainSync {
self.non_finalized_state.finalize();
}
let _ = self
.non_finalized_state_sender
.send(self.non_finalized_state.clone());
update_channels(
block,
&self.non_finalized_state,
&mut self.non_finalized_state_sender,
&mut self.chain_tip_sender,
);
self.cursor.tip_height = block_height;
self.cursor.tip_hash = block_hash;
cursor.tip_height = block_height;
cursor.tip_hash = block_hash;
// If the block hash matches the output from the `getbestblockhash` RPC method, we can wait until
// the best block hash changes to get the next block.
if block_hash == self.cursor.node_tip_hash {
if block_hash == cursor.target_tip_hash {
break false;
}
};
if has_found_new_best_chain {
self.non_finalized_state = NonFinalizedState::new(&self.db.network());
let db = self.db.clone();
let finalized_tip = tokio::task::spawn_blocking(move || {
db.tip_block().expect("should have genesis block")
})
.wait_for_panics()
.await;
update_channels(
finalized_tip,
&self.non_finalized_state,
&mut self.non_finalized_state_sender,
&mut self.chain_tip_sender,
);
}
}
/// Polls `getbestblockhash` RPC method until there are new blocks in the Zebra node's non-finalized state.
async fn wait_for_chain_tip_change(&self) -> SyncCursor {
wait_for_chain_tip_change(&self.rpc_client, &self.non_finalized_state, &self.db).await
async fn wait_for_chain_tip_change(&self) -> NewChainTip {
let (tip_height, tip_hash) = if let Some(tip) = self.non_finalized_state.best_tip() {
tip
} else {
let db = self.db.clone();
tokio::task::spawn_blocking(move || db.tip())
.wait_for_panics()
.await
.expect("checked for genesis block above")
};
// Wait until the best block hash in Zebra is different from the tip hash in this read state
loop {
let Some(target_tip_hash) = self.rpc_client.get_best_block_hash().await else {
// Wait until the genesis block has been committed.
tokio::time::sleep(POLL_DELAY).await;
continue;
};
if target_tip_hash != tip_hash {
let cursor =
NewChainTip::Cursor(SyncCursor::new(tip_height, tip_hash, target_tip_hash));
// Check if there's are blocks in the non-finalized state, or that
// the node tip hash is different from our finalized tip hash before returning
// a cursor for syncing blocks via the `getblock` RPC.
if self.non_finalized_state.chain_count() != 0 {
break cursor;
}
let db = self.db.clone();
break tokio::task::spawn_blocking(move || {
if db.finalized_tip_hash() != target_tip_hash {
cursor
} else {
NewChainTip::Block(db.tip_block().unwrap())
}
})
.wait_for_panics()
.await;
}
tokio::time::sleep(POLL_DELAY).await;
}
}
}
@ -253,38 +305,6 @@ impl SyncerRpcMethods for RpcRequestClient {
}
}
async fn wait_for_chain_tip_change(
rpc_client: &RpcRequestClient,
non_finalized_state: &NonFinalizedState,
db: &ZebraDb,
) -> SyncCursor {
// Wait until the best block hash in Zebra is different from the tip hash in this read state
loop {
let Some(node_tip_hash) = rpc_client.get_best_block_hash().await else {
// Wait until the genesis block has been committed.
tokio::time::sleep(POLL_DELAY).await;
continue;
};
let (tip_height, tip_hash) = if let Some(tip) = non_finalized_state.best_tip() {
tip
} else {
let db = db.clone();
tokio::task::spawn_blocking(move || db.tip())
.wait_for_panics()
.await
.expect("checked for genesis block above")
};
// TODO: Return when the finalized tip has changed as well.
if node_tip_hash != tip_hash {
break SyncCursor::new(tip_height, tip_hash, node_tip_hash);
}
tokio::time::sleep(POLL_DELAY).await;
}
}
/// Sends the new chain tip and non-finalized state to the latest chain channels.
fn update_channels(
best_tip: impl Into<ChainTipBlock>,