fix: get full snapshot based on the basis slot of the targetted incremental
This commit is contained in:
parent
6275c69996
commit
8a2a2fee21
|
@ -29,7 +29,7 @@ use solana_sdk::clock::Slot;
|
|||
use tokio::task;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::find::{latest_full_snapshot, latest_incremental_snapshot};
|
||||
use crate::find::{find_full_snapshot, latest_full_snapshot, latest_incremental_snapshot};
|
||||
use crate::{Config, HostUrl};
|
||||
|
||||
pub struct Loader {
|
||||
|
@ -54,9 +54,9 @@ impl Loader {
|
|||
Self { cfg }
|
||||
}
|
||||
|
||||
pub async fn load_latest_snapshot(&self) -> anyhow::Result<FullSnapshot> {
|
||||
pub async fn load_full_snapshot_at_slot(&self, slot: Slot) -> anyhow::Result<FullSnapshot> {
|
||||
let snapshot =
|
||||
latest_full_snapshot(self.cfg.hosts.to_vec(), self.cfg.not_before_slot).await?;
|
||||
find_full_snapshot(self.cfg.hosts.to_vec(), slot).await?;
|
||||
|
||||
self.ensure_paths_exists().await;
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ use tokio::task;
|
|||
use crate::HostUrl;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LatestFullSnapshot {
|
||||
pub struct FullSnapshot {
|
||||
pub host: HostUrl,
|
||||
pub slot: Slot,
|
||||
pub hash: SnapshotHash,
|
||||
|
@ -27,10 +27,50 @@ pub struct LatestIncrementalSnapshot {
|
|||
pub hash: SnapshotHash,
|
||||
}
|
||||
|
||||
|
||||
pub async fn find_full_snapshot(
|
||||
hosts: impl IntoIterator<Item = HostUrl>,
|
||||
slot: Slot,
|
||||
) -> anyhow::Result<FullSnapshot> {
|
||||
let hosts_and_uris = collect_redirects(hosts, "snapshot.tar.bz2").await?;
|
||||
|
||||
let mut snapshots = Vec::with_capacity(hosts_and_uris.len());
|
||||
|
||||
for (host, uri) in hosts_and_uris {
|
||||
if let Some(data) = uri
|
||||
.strip_prefix("/snapshot-")
|
||||
.and_then(|s| s.strip_suffix(".tar.zst"))
|
||||
{
|
||||
let parts: Vec<&str> = data.split('-').collect();
|
||||
|
||||
if parts.len() == 2 {
|
||||
let full_slot = parts[0].parse::<u64>().unwrap();
|
||||
|
||||
debug!("{} has snapshot of {}", &host, full_slot);
|
||||
if full_slot != slot {
|
||||
continue;
|
||||
}
|
||||
|
||||
let hash = SnapshotHash(Hash::from_str(parts[1]).unwrap());
|
||||
snapshots.push(FullSnapshot {
|
||||
host: host.clone(),
|
||||
slot: full_slot,
|
||||
hash,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
snapshots
|
||||
.into_iter()
|
||||
.max_by(|left, right| left.slot.cmp(&right.slot))
|
||||
.ok_or_else(|| anyhow!("Unable to find full snapshot at slot {}", slot))
|
||||
}
|
||||
|
||||
pub async fn latest_full_snapshot(
|
||||
hosts: impl IntoIterator<Item = HostUrl>,
|
||||
not_before_slot: Slot,
|
||||
) -> anyhow::Result<LatestFullSnapshot> {
|
||||
) -> anyhow::Result<FullSnapshot> {
|
||||
let hosts_and_uris = collect_redirects(hosts, "snapshot.tar.bz2").await?;
|
||||
|
||||
let mut snapshots = Vec::with_capacity(hosts_and_uris.len());
|
||||
|
@ -51,7 +91,7 @@ pub async fn latest_full_snapshot(
|
|||
}
|
||||
|
||||
let hash = SnapshotHash(Hash::from_str(parts[1]).unwrap());
|
||||
snapshots.push(LatestFullSnapshot {
|
||||
snapshots.push(FullSnapshot {
|
||||
host: host.clone(),
|
||||
slot: full_slot,
|
||||
hash,
|
||||
|
|
|
@ -71,7 +71,7 @@ pub fn start_backfill_import_from_snapshot(cfg: Config, db: Arc<AccountsDb>) ->
|
|||
info!("{incremental_snapshot:#?}");
|
||||
|
||||
let full_snapshot = loop {
|
||||
match loader.load_latest_snapshot().await {
|
||||
match loader.load_full_snapshot_at_slot(incremental_snapshot.full_slot).await {
|
||||
Ok(snapshot) => break snapshot,
|
||||
Err(e) => {
|
||||
warn!("Unable to download full snapshot: {}", e.to_string());
|
||||
|
|
Loading…
Reference in New Issue