Add exit flag for bigtable upload operations

This commit is contained in:
Michael Vines 2020-09-04 10:01:00 -07:00
parent bafdcf24f5
commit d3611f74c8
3 changed files with 28 additions and 2 deletions

View File

@ -65,6 +65,7 @@ impl BigTableUploadService {
starting_slot,
Some(max_confirmed_root),
true,
exit.clone(),
));
match result {

View File

@ -8,7 +8,12 @@ use solana_cli::display::println_transaction;
use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
use solana_transaction_status::UiTransactionEncoding;
use std::{path::Path, process::exit, result::Result, sync::Arc};
use std::{
path::Path,
process::exit,
result::Result,
sync::{atomic::AtomicBool, Arc},
};
async fn upload(
blockstore: Blockstore,
@ -26,6 +31,7 @@ async fn upload(
starting_slot,
ending_slot,
allow_missing_metadata,
Arc::new(AtomicBool::new(false)),
)
.await
}

View File

@ -2,7 +2,15 @@ use crate::blockstore::Blockstore;
use log::*;
use solana_measure::measure::Measure;
use solana_sdk::clock::Slot;
use std::{collections::HashSet, result::Result, sync::Arc, time::Duration};
use std::{
collections::HashSet,
result::Result,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::time::delay_for;
// Attempt to upload this many blocks in parallel
@ -17,6 +25,7 @@ pub async fn upload_confirmed_blocks(
starting_slot: Slot,
ending_slot: Option<Slot>,
allow_missing_metadata: bool,
exit: Arc<AtomicBool>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut measure = Measure::start("entire upload");
@ -120,11 +129,17 @@ pub async fn upload_confirmed_blocks(
// Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading
let (_loader_thread, receiver) = {
let exit = exit.clone();
let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH);
(
std::thread::spawn(move || {
let mut measure = Measure::start("block loader thread");
for (i, slot) in blocks_to_upload.iter().enumerate() {
if exit.load(Ordering::Relaxed) {
break;
}
let _ = match blockstore.get_confirmed_block(
*slot,
Some(solana_transaction_status::UiTransactionEncoding::Base64),
@ -162,6 +177,10 @@ pub async fn upload_confirmed_blocks(
tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL);
while let Some(blocks) = stream.next().await {
if exit.load(Ordering::Relaxed) {
break;
}
let mut measure_upload = Measure::start("Upload");
let mut num_blocks = blocks.len();
info!("Preparing the next {} blocks for upload", num_blocks);