diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs index 1d423b1486..b8a445504b 100644 --- a/core/src/bigtable_upload_service.rs +++ b/core/src/bigtable_upload_service.rs @@ -65,6 +65,7 @@ impl BigTableUploadService { starting_slot, Some(max_confirmed_root), true, + exit.clone(), )); match result { diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 725789922e..eff4e2b65b 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -8,7 +8,12 @@ use solana_cli::display::println_transaction; use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{path::Path, process::exit, result::Result, sync::Arc}; +use std::{ + path::Path, + process::exit, + result::Result, + sync::{atomic::AtomicBool, Arc}, +}; async fn upload( blockstore: Blockstore, @@ -26,6 +31,7 @@ async fn upload( starting_slot, ending_slot, allow_missing_metadata, + Arc::new(AtomicBool::new(false)), ) .await } diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index fe347a1e98..4d42664fc7 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -2,7 +2,15 @@ use crate::blockstore::Blockstore; use log::*; use solana_measure::measure::Measure; use solana_sdk::clock::Slot; -use std::{collections::HashSet, result::Result, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + result::Result, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use tokio::time::delay_for; // Attempt to upload this many blocks in parallel @@ -17,6 +25,7 @@ pub async fn upload_confirmed_blocks( starting_slot: Slot, ending_slot: Option, allow_missing_metadata: bool, + exit: Arc, ) -> Result<(), Box> { let mut measure = Measure::start("entire upload"); @@ -120,11 +129,17 @@ pub async fn upload_confirmed_blocks( // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading let (_loader_thread, receiver) = { + let exit = exit.clone(); + let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); ( std::thread::spawn(move || { let mut measure = Measure::start("block loader thread"); for (i, slot) in blocks_to_upload.iter().enumerate() { + if exit.load(Ordering::Relaxed) { + break; + } + let _ = match blockstore.get_confirmed_block( *slot, Some(solana_transaction_status::UiTransactionEncoding::Base64), @@ -162,6 +177,10 @@ pub async fn upload_confirmed_blocks( tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); while let Some(blocks) = stream.next().await { + if exit.load(Ordering::Relaxed) { + break; + } + let mut measure_upload = Measure::start("Upload"); let mut num_blocks = blocks.len(); info!("Preparing the next {} blocks for upload", num_blocks);