Send program write transactions concurrently

This commit is contained in:
Michael Vines 2019-02-27 11:17:32 -08:00
parent 32aaa5fd06
commit 163ed40efb
3 changed files with 124 additions and 39 deletions

1
Cargo.lock generated
View File

@ -2366,6 +2366,7 @@ dependencies = [
"chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "dirs 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)",
"solana 0.12.0", "solana 0.12.0",
"solana-drone 0.12.0", "solana-drone 0.12.0",

View File

@ -14,6 +14,7 @@ bs58 = "0.2.0"
clap = "2.32.0" clap = "2.32.0"
chrono = { version = "0.4.0", features = ["serde"] } chrono = { version = "0.4.0", features = ["serde"] }
dirs = "1.0.5" dirs = "1.0.5"
log = "0.4.2"
serde_json = "1.0.38" serde_json = "1.0.38"
solana = { path = "..", version = "0.12.0" } solana = { path = "..", version = "0.12.0" }
solana-drone = { path = "../drone", version = "0.12.0" } solana-drone = { path = "../drone", version = "0.12.0" }

View File

@ -2,6 +2,7 @@ use bincode::serialize;
use bs58; use bs58;
use chrono::prelude::*; use chrono::prelude::*;
use clap::ArgMatches; use clap::ArgMatches;
use log::*;
use serde_json; use serde_json;
use serde_json::json; use serde_json::json;
#[cfg(test)] #[cfg(test)]
@ -423,26 +424,29 @@ fn process_deploy(
bpf_loader::id(), bpf_loader::id(),
0, 0,
); );
trace!("Creating program account");
send_and_confirm_tx(&rpc_client, &mut tx, &config.id).map_err(|_| { send_and_confirm_tx(&rpc_client, &mut tx, &config.id).map_err(|_| {
WalletError::DynamicProgramError("Program allocate space failed".to_string()) WalletError::DynamicProgramError("Program allocate space failed".to_string())
})?; })?;
let mut offset = 0; trace!("Writing program data");
for chunk in program_userdata.chunks(USERDATA_CHUNK_SIZE) { let write_transactions: Vec<_> = program_userdata
let mut tx = LoaderTransaction::new_write( .chunks(USERDATA_CHUNK_SIZE)
.zip(0..)
.map(|(chunk, i)| {
LoaderTransaction::new_write(
&program_id, &program_id,
bpf_loader::id(), bpf_loader::id(),
offset, (i * USERDATA_CHUNK_SIZE) as u32,
chunk.to_vec(), chunk.to_vec(),
last_id, last_id,
0, 0,
); )
send_and_confirm_tx(&rpc_client, &mut tx, &program_id).map_err(|_| { })
WalletError::DynamicProgramError(format!("Program write failed at offset {:?}", offset)) .collect();
})?; send_and_confirm_transactions(&rpc_client, write_transactions, &program_id)?;
offset += USERDATA_CHUNK_SIZE as u32;
}
trace!("Finalizing program account");
let last_id = get_last_id(&rpc_client)?; let last_id = get_last_id(&rpc_client)?;
let mut tx = LoaderTransaction::new_finalize(&program_id, bpf_loader::id(), last_id, 0); let mut tx = LoaderTransaction::new_finalize(&program_id, bpf_loader::id(), last_id, 0);
send_and_confirm_tx(&rpc_client, &mut tx, &program_id).map_err(|_| { send_and_confirm_tx(&rpc_client, &mut tx, &program_id).map_err(|_| {
@ -724,6 +728,38 @@ fn get_last_id(rpc_client: &RpcClient) -> Result<Hash, Box<dyn error::Error>> {
Ok(Hash::new(&last_id_vec)) Ok(Hash::new(&last_id_vec))
} }
fn get_next_last_id(
rpc_client: &RpcClient,
previous_last_id: &Hash,
) -> Result<Hash, Box<dyn error::Error>> {
let mut next_last_id_retries = 3;
loop {
let next_last_id = get_last_id(rpc_client)?;
if cfg!(not(test)) {
if next_last_id != *previous_last_id {
return Ok(next_last_id);
}
} else {
// When using MockRpcClient, get_last_id() returns a constant value
return Ok(next_last_id);
}
if next_last_id_retries == 0 {
Err(WalletError::RpcRequestError(
format!(
"Unable to fetch new last_id, last_id stuck at {:?}",
next_last_id
)
.to_string(),
))?;
}
next_last_id_retries -= 1;
// Retry ~twice during a slot
sleep(Duration::from_millis(
500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND as u64,
));
}
}
fn send_tx(rpc_client: &RpcClient, tx: &Transaction) -> Result<String, Box<dyn error::Error>> { fn send_tx(rpc_client: &RpcClient, tx: &Transaction) -> Result<String, Box<dyn error::Error>> {
let serialized = serialize(tx).unwrap(); let serialized = serialize(tx).unwrap();
let params = json!([serialized]); let params = json!([serialized]);
@ -804,35 +840,81 @@ fn send_and_confirm_tx(
} }
} }
fn send_and_confirm_transactions(
rpc_client: &RpcClient,
mut transactions: Vec<Transaction>,
signer: &Keypair,
) -> Result<(), Box<dyn error::Error>> {
let mut send_retries = 5;
loop {
let mut status_retries = 4;
// Send all transactions
let mut transactions_signatures = vec![];
for transaction in transactions {
if cfg!(not(test)) {
// Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors
// since all the write transactions modify the same program account
sleep(Duration::from_millis(1000 / NUM_TICKS_PER_SECOND as u64));
}
let signature = send_tx(&rpc_client, &transaction).ok();
transactions_signatures.push((transaction, signature))
}
// Collect statuses for all the transactions, drop those that are confirmed
while status_retries > 0 {
status_retries -= 1;
if cfg!(not(test)) {
// Retry ~twice during a slot
sleep(Duration::from_millis(
500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND as u64,
));
}
transactions_signatures = transactions_signatures
.into_iter()
.filter(|(_transaction, signature)| {
if let Some(signature) = signature {
if let Ok(status) = confirm_transaction(rpc_client, &signature) {
return status != RpcSignatureStatus::Confirmed;
}
}
true
})
.collect();
if transactions_signatures.is_empty() {
return Ok(());
}
}
if send_retries == 0 {
Err(WalletError::RpcRequestError(
"Transactions failed".to_string(),
))?;
}
send_retries -= 1;
// Re-sign any failed transactions with a new last_id and retry
let last_id = get_next_last_id(rpc_client, &transactions_signatures[0].0.last_id)?;
transactions = transactions_signatures
.into_iter()
.map(|(mut transaction, _)| {
transaction.sign(&[signer], last_id);
transaction
})
.collect();
}
}
fn resign_tx( fn resign_tx(
rpc_client: &RpcClient, rpc_client: &RpcClient,
tx: &mut Transaction, tx: &mut Transaction,
signer_key: &Keypair, signer_key: &Keypair,
) -> Result<(), Box<dyn error::Error>> { ) -> Result<(), Box<dyn error::Error>> {
// Fetch a new last_id to prevent the retry from getting rejected as a let last_id = get_next_last_id(rpc_client, &tx.last_id)?;
// DuplicateSignature
let mut next_last_id_retries = 3;
let last_id = loop {
let next_last_id = get_last_id(rpc_client)?;
if next_last_id != tx.last_id {
break next_last_id;
}
if next_last_id_retries == 0 {
Err(WalletError::RpcRequestError(
format!(
"Unable to fetch new last_id, last_id stuck at {:?}",
next_last_id
)
.to_string(),
))?;
}
next_last_id_retries -= 1;
// Retry ~twice during a slot
sleep(Duration::from_millis(
500 * DEFAULT_TICKS_PER_SLOT / NUM_TICKS_PER_SECOND as u64,
));
};
tx.sign(&[signer_key], last_id); tx.sign(&[signer_key], last_id);
Ok(()) Ok(())
} }
@ -1364,6 +1446,7 @@ mod tests {
#[test] #[test]
fn test_wallet_deploy() { fn test_wallet_deploy() {
solana_logger::setup();
let mut pathbuf = PathBuf::from(env!("CARGO_MANIFEST_DIR")); let mut pathbuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
pathbuf.push("tests"); pathbuf.push("tests");
pathbuf.push("fixtures"); pathbuf.push("fixtures");