feature(rpc): Add basic long polling support to the `getblocktemplate` RPC (#5843)

* fastmod longpollid long_poll_id

* Fix a comment

* Implement inefficient long polling
This commit is contained in:
teor 2022-12-15 10:30:37 +10:00 committed by GitHub
parent 67894cad4c
commit f7011a903e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 133 additions and 60 deletions

View File

@ -1,8 +1,8 @@
//! RPC methods related to mining only available with `getblocktemplate-rpcs` rust feature.
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use futures::{FutureExt, TryFutureExt};
use futures::{future::OptionFuture, FutureExt, TryFutureExt};
use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use tower::{buffer::Buffer, Service, ServiceExt};
@ -333,40 +333,113 @@ where
// These checks always have the same result during long polling.
let miner_address = check_miner_address(miner_address)?;
let mut client_long_poll_id = None;
if let Some(parameters) = parameters {
check_block_template_parameters(parameters)?;
check_block_template_parameters(&parameters)?;
client_long_poll_id = parameters.long_poll_id;
}
// - Checks and fetches that can change during long polling
// Check if we are synced to the tip.
// The result of this check can change during long polling.
check_synced_to_tip(network, latest_chain_tip, sync_status)?;
// Fetch the state data and local time for the block template:
// - if the tip block hash changes, we must return from long polling,
// - if the local clock changes on testnet, we might return from long polling
//
// We also return after 90 minutes on mainnet, even if we have the same response.
let chain_tip_and_local_time = fetch_state_tip_and_local_time(state).await?;
// Set up the loop.
let mut max_time_reached = false;
// Fetch the mempool data for the block template:
// - if the mempool transactions change, we might return from long polling.
let mempool_txs = fetch_mempool_transactions(mempool).await?;
// The loop returns the server long poll ID,
// which should be different to the client long poll ID.
let (server_long_poll_id, chain_tip_and_local_time, mempool_txs) = loop {
// Check if we are synced to the tip.
// The result of this check can change during long polling.
//
// TODO:
// - add `async changed()` methods to ChainTip and ChainSyncStatus
// (using `changed() -> Changed` and `impl Future<()> for Changed`)
check_synced_to_tip(network, latest_chain_tip.clone(), sync_status.clone())?;
// - Long poll ID calculation
//
// TODO: check if the client passed the same long poll id,
// if they did, wait until the inputs change,
// or the max time, or Zebra is no longer synced to the tip.
// Fetch the state data and local time for the block template:
// - if the tip block hash changes, we must return from long polling,
// - if the local clock changes on testnet, we might return from long polling
//
// We always return after 90 minutes on mainnet, even if we have the same response,
// because the max time has been reached.
//
// TODO: timeout and exit the loop when max time is reached
let chain_tip_and_local_time =
fetch_state_tip_and_local_time(state.clone()).await?;
let long_poll_id = LongPollInput::new(
chain_tip_and_local_time.tip_height,
chain_tip_and_local_time.tip_hash,
chain_tip_and_local_time.max_time,
mempool_txs.iter().map(|tx| tx.transaction.id),
)
.into();
// Fetch the mempool data for the block template:
// - if the mempool transactions change, we might return from long polling.
//
// TODO:
// - add a `MempoolChange` type with an `async changed()` method.
// - if we are long polling, pause between state and mempool,
// to allow transactions to re-verify (only works after PR #5841)
let mempool_txs = fetch_mempool_transactions(mempool.clone()).await?;
// - Long poll ID calculation
let server_long_poll_id = LongPollInput::new(
chain_tip_and_local_time.tip_height,
chain_tip_and_local_time.tip_hash,
chain_tip_and_local_time.max_time,
mempool_txs.iter().map(|tx| tx.transaction.id),
)
.generate_id();
// The loop finishes if:
// - the client didn't pass a long poll ID,
// - the server long poll ID is different to the client long poll ID, or
// - the previous loop iteration waited until the max time.
if Some(&server_long_poll_id) != client_long_poll_id.as_ref() || max_time_reached {
break (server_long_poll_id, chain_tip_and_local_time, mempool_txs);
}
// This duration can be slightly lower than needed, if cur_time was clamped
// to min_time. In that case the wait is very long, and it's ok to return early.
//
// It can also be zero if cur_time was clamped to max_time.
// In that case, we want to wait for another change, and ignore this timeout.
let duration_until_max_time = chain_tip_and_local_time
.max_time
.saturating_duration_since(chain_tip_and_local_time.cur_time);
let wait_for_max_time: OptionFuture<_> = if duration_until_max_time.seconds() > 0 {
Some(tokio::time::sleep(duration_until_max_time.to_std()))
} else {
None
}
.into();
// TODO: remove this polling wait after we've switched to
// using futures to detect state tip, sync status, and mempool changes
let temp_wait_before_requests = tokio::time::sleep(Duration::from_secs(5));
tokio::select! {
// Poll the futures in the same order as they are listed here.
biased;
// TODO: change logging to debug after testing
Some(_elapsed) = wait_for_max_time => {
tracing::info!(
max_time = ?chain_tip_and_local_time.max_time,
cur_time = ?chain_tip_and_local_time.cur_time,
?server_long_poll_id,
?client_long_poll_id,
"returning from long poll because max time was reached"
);
max_time_reached = true;
}
_elapsed = temp_wait_before_requests => {
tracing::info!(
max_time = ?chain_tip_and_local_time.max_time,
cur_time = ?chain_tip_and_local_time.cur_time,
?server_long_poll_id,
?client_long_poll_id,
"checking long poll inputs again after waiting 5 seconds"
);
}
}
};
// - Processing fetched data to create a transaction template
//
@ -405,7 +478,7 @@ where
let response = GetBlockTemplate::new(
next_block_height,
&chain_tip_and_local_time,
long_poll_id,
server_long_poll_id,
coinbase_txn,
&mempool_txs,
default_roots,

View File

@ -32,7 +32,7 @@ pub use crate::methods::get_block_template_rpcs::types::get_block_template::*;
/// Returns an error if the get block template RPC `parameters` are invalid.
pub fn check_block_template_parameters(
parameters: get_block_template::JsonParameters,
parameters: &get_block_template::JsonParameters,
) -> Result<()> {
if parameters.data.is_some() || parameters.mode == GetBlockTemplateRequestMode::Proposal {
return Err(Error {

View File

@ -85,5 +85,6 @@ pub struct JsonParameters {
/// An ID that delays the RPC response until the template changes.
///
/// In Zebra, the ID represents the chain tip, max time, and mempool contents.
pub longpollid: Option<LongPollId>,
#[serde(rename = "longpollid")]
pub long_poll_id: Option<LongPollId>,
}

View File

@ -81,6 +81,32 @@ impl LongPollInput {
mempool_transaction_mined_ids,
}
}
/// Returns the [`LongPollId`] for this [`LongPollInput`].
/// Performs lossy conversion on some fields.
pub fn generate_id(&self) -> LongPollId {
let mut tip_hash_checksum = 0;
update_checksum(&mut tip_hash_checksum, self.tip_hash.0);
let mut mempool_transaction_content_checksum: u32 = 0;
for tx_mined_id in self.mempool_transaction_mined_ids.iter() {
update_checksum(&mut mempool_transaction_content_checksum, tx_mined_id.0);
}
LongPollId {
tip_height: self.tip_height.0,
tip_hash_checksum,
max_timestamp: self.max_time.timestamp(),
// It's ok to do wrapping conversions here,
// because long polling checks are probabilistic.
mempool_transaction_count: self.mempool_transaction_mined_ids.len() as u32,
mempool_transaction_content_checksum,
}
}
}
/// The encoded long poll ID, generated from the [`LongPollInput`].
@ -161,33 +187,6 @@ pub struct LongPollId {
pub mempool_transaction_content_checksum: u32,
}
impl From<LongPollInput> for LongPollId {
/// Lossy conversion from LongPollInput to LongPollId.
fn from(input: LongPollInput) -> Self {
let mut tip_hash_checksum = 0;
update_checksum(&mut tip_hash_checksum, input.tip_hash.0);
let mut mempool_transaction_content_checksum: u32 = 0;
for tx_mined_id in input.mempool_transaction_mined_ids.iter() {
update_checksum(&mut mempool_transaction_content_checksum, tx_mined_id.0);
}
Self {
tip_height: input.tip_height.0,
tip_hash_checksum,
max_timestamp: input.max_time.timestamp(),
// It's ok to do wrapping conversions here,
// because long polling checks are probabilistic.
mempool_transaction_count: input.mempool_transaction_mined_ids.len() as u32,
mempool_transaction_content_checksum,
}
}
}
/// Update `checksum` from `item`, so changes in `item` are likely to also change `checksum`.
///
/// This checksum is not cryptographically secure.

View File

@ -1117,7 +1117,7 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) {
.get_block_template(Some(get_block_template::JsonParameters {
// This must parse as a LongPollId.
// It must be the correct length and have hex/decimal digits.
longpollid: Some(
long_poll_id: Some(
"0".repeat(LONG_POLL_ID_LENGTH)
.parse()
.expect("unexpected invalid LongPollId"),

View File

@ -150,7 +150,7 @@ pub struct GetBlockTemplateChainInfo {
pub tip_hash: block::Hash,
/// The current state tip height.
/// The block template OAfor the candidate block is the next block after this block.
/// The block template for the candidate block is the next block after this block.
/// Depends on the `tip_hash`.
pub tip_height: block::Height,