Problem: slow performance and regular timeouts sending transactions

Solution: fix the maximum number of concurrent HTTP request
at a transport level.

It is set by default to 64 and there's now a new configuration
parameter (`concurrent_http_requests`) in `home` and `foreign`
sections. Previously used `concurrency` parameter from transactions
configuration has been removed.
This commit is contained in:
Yurii Rashkovskii 2018-06-01 18:31:47 -07:00
parent b46ab21417
commit ae8cc1552f
No known key found for this signature in database
GPG Key ID: 1D60D7CFD80845FF
7 changed files with 29 additions and 22 deletions

View File

@ -124,6 +124,7 @@ withdraw_confirm = { gas = 3000000 }
- `home/foreign.gas_price_timeout` - the number of seconds to wait for an HTTP response from the gas price oracle before using the default gas price. Defaults to `10 seconds`.
- `home/foreign.gas_price_speed_type` - retrieve the gas-price corresponding to this speed when querying from an Oracle. Defaults to `fast`. The available values are: "instant", "fast", "standard", and "slow".
- `home/foreign.default_gas_price` - the default gas price (in WEI) used in transactions with the home or foreign nodes. The `default_gas_price` is used when the Oracle cannot be reached. The default value is `15_000_000_000` WEI (ie. 15 GWEI).
- `home/foreign.concurrent_http_requests` - the number of concurrent HTTP requests allowed in-flight (default: **64**)
#### authorities options
@ -133,11 +134,8 @@ withdraw_confirm = { gas = 3000000 }
#### transaction options
- `transaction.deposit_relay.gas` - specify how much gas should be consumed by deposit relay
- `transaction.deposit_relay.concurrency` - how many concurrent transactions can be sent (default: **100**)
- `transaction.withdraw_confirm.gas` - specify how much gas should be consumed by withdraw confirm
- `transaction.withdraw_confirm.concurrency` - how many concurrent transactions can be sent (default: **100**)
- `transaction.withdraw_relay.gas` - specify how much gas should be consumed by withdraw relay
- `transaction.withdraw_relay.concurrency` - how many concurrent transactions can be sent (default: **100**)
### Database file format

View File

@ -31,13 +31,13 @@ pub struct Connections<T> where T: Transport {
}
impl Connections<Http> {
pub fn new_http(handle: &Handle, home: &str, foreign: &str) -> Result<Self, Error> {
pub fn new_http(handle: &Handle, home: &str, home_concurrent_connections: usize, foreign: &str, foreign_concurrent_connections: usize) -> Result<Self, Error> {
let home = Http::with_event_loop(home, handle,1)
let home = Http::with_event_loop(home, handle,home_concurrent_connections)
.map_err(ErrorKind::Web3)
.map_err(Error::from)
.chain_err(||"Cannot connect to home node rpc")?;
let foreign = Http::with_event_loop(foreign, handle, 1)
let foreign = Http::with_event_loop(foreign, handle, foreign_concurrent_connections)
.map_err(ErrorKind::Web3)
.map_err(Error::from)
.chain_err(||"Cannot connect to foreign node rpc")?;
@ -64,7 +64,7 @@ impl App<Http> {
let home_url:String = format!("{}:{}", config.home.rpc_host, config.home.rpc_port);
let foreign_url:String = format!("{}:{}", config.foreign.rpc_host, config.foreign.rpc_port);
let connections = Connections::new_http(handle, home_url.as_ref(), foreign_url.as_ref())?;
let connections = Connections::new_http(handle, home_url.as_ref(), config.home.concurrent_http_requests, foreign_url.as_ref(), config.foreign.concurrent_http_requests)?;
let keystore = EthStore::open(Box::new(RootDiskDirectory::at(&config.keystore))).map_err(|e| ErrorKind::KeyStore(e))?;
let keystore = AccountProvider::new(Box::new(keystore), AccountProviderSettings {

View File

@ -1,5 +1,5 @@
use std::sync::{Arc, RwLock};
use futures::{self, Future, Stream, stream::{Collect, iter_ok, IterOk, Buffered}, Poll};
use futures::{self, Future, Stream, stream::{Collect, FuturesUnordered, futures_unordered}, Poll};
use web3::Transport;
use web3::types::{U256, Address, Bytes, Log, FilterBuilder};
use ethabi::RawLog;
@ -35,7 +35,7 @@ enum DepositRelayState<T: Transport> {
Wait,
/// Relaying deposits in progress.
RelayDeposits {
future: Collect<Buffered<IterOk<::std::vec::IntoIter<NonceCheck<T, SendRawTransaction<T>>>, Error>>>,
future: Collect<FuturesUnordered<NonceCheck<T, SendRawTransaction<T>>>>,
block: u64,
},
/// All deposits till given block has been relayed.
@ -115,7 +115,7 @@ impl<T: Transport> Stream for DepositRelay<T> {
info!("relaying {} deposits", len);
DepositRelayState::RelayDeposits {
future: iter_ok(deposits).buffered(self.app.config.txs.deposit_relay.concurrency).collect(),
future: futures_unordered(deposits).collect(),
block: item.to,
}
},

View File

@ -143,7 +143,7 @@ mod tests {
use super::*;
use error::{Error, ErrorKind};
use futures::{Async, future::{err, ok, FutureResult}};
use config::{Node, NodeInfo};
use config::{Node, NodeInfo, DEFAULT_CONCURRENCY};
use tokio_timer::Timer;
use std::time::Duration;
use std::path::PathBuf;
@ -176,6 +176,7 @@ mod tests {
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
gas_price_timeout: Duration::from_secs(5),
default_gas_price: 15_000_000_000,
concurrent_http_requests: DEFAULT_CONCURRENCY,
};
let timer = Timer::default();
let mut stream = GasPriceStream::new_with_retriever(&node, ErroredRequest, &timer);
@ -218,6 +219,7 @@ mod tests {
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
gas_price_timeout: Duration::from_secs(5),
default_gas_price: 15_000_000_000,
concurrent_http_requests: DEFAULT_CONCURRENCY,
};
let timer = Timer::default();
let mut stream = GasPriceStream::new_with_retriever(&node, BadJson, &timer);
@ -260,6 +262,7 @@ mod tests {
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
gas_price_timeout: Duration::from_secs(5),
default_gas_price: 15_000_000_000,
concurrent_http_requests: DEFAULT_CONCURRENCY,
};
let timer = Timer::default();
let mut stream = GasPriceStream::new_with_retriever(&node, UnexpectedJson, &timer);
@ -301,6 +304,7 @@ mod tests {
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
gas_price_timeout: Duration::from_secs(5),
default_gas_price: 15_000_000_000,
concurrent_http_requests: DEFAULT_CONCURRENCY,
};
let timer = Timer::default();
let mut stream = GasPriceStream::new_with_retriever(&node, NonObjectJson, &timer);
@ -342,6 +346,7 @@ mod tests {
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
gas_price_timeout: Duration::from_secs(5),
default_gas_price: 15_000_000_000,
concurrent_http_requests: DEFAULT_CONCURRENCY,
};
let timer = Timer::default();
let mut stream = GasPriceStream::new_with_retriever(&node, CorrectJson, &timer);

View File

@ -1,6 +1,6 @@
use std::sync::{Arc, RwLock};
use std::ops;
use futures::{self, Future, Stream, stream::{Collect, IterOk, iter_ok, Buffered}, Poll};
use futures::{self, Future, Stream, stream::{Collect, FuturesUnordered, futures_unordered}, Poll};
use web3::Transport;
use web3::types::{U256, H520, Address, Bytes, FilterBuilder};
use api::{self, LogStream};
@ -30,7 +30,7 @@ enum WithdrawConfirmState<T: Transport> {
Wait,
/// Confirming withdraws.
ConfirmWithdraws {
future: Collect<Buffered<IterOk<::std::vec::IntoIter<NonceCheck<T, SendRawTransaction<T>>>, Error>>>,
future: Collect<FuturesUnordered<NonceCheck<T, SendRawTransaction<T>>>>,
block: u64,
},
/// All withdraws till given block has been confirmed.
@ -139,7 +139,7 @@ impl<T: Transport> Stream for WithdrawConfirm<T> {
info!("submitting {} signatures", len);
WithdrawConfirmState::ConfirmWithdraws {
future: iter_ok(confirmations).buffered(self.app.config.txs.withdraw_confirm.concurrency).collect(),
future: futures_unordered(confirmations).collect(),
block,
}
},

View File

@ -1,5 +1,5 @@
use std::sync::{Arc, RwLock};
use futures::{self, Future, Stream, stream::{Collect, iter_ok, IterOk, Buffered}, Poll};
use futures::{self, Future, Stream, stream::{Collect, FuturesUnordered, futures_unordered}, Poll};
use futures::future::{JoinAll, join_all, Join};
use tokio_timer::Timeout;
use web3::Transport;
@ -70,7 +70,7 @@ pub enum WithdrawRelayState<T: Transport> {
block: u64,
},
RelayWithdraws {
future: Collect<Buffered<IterOk<::std::vec::IntoIter<NonceCheck<T, SendRawTransaction<T>>>, Error>>>,
future: Collect<FuturesUnordered<NonceCheck<T, SendRawTransaction<T>>>>,
block: u64,
},
Yield(Option<u64>),
@ -236,7 +236,7 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
info!("relaying {} withdraws", len);
WithdrawRelayState::RelayWithdraws {
future: iter_ok(relays).buffered(self.app.config.txs.withdraw_relay.concurrency).collect(),
future: futures_unordered(relays).collect(),
block,
}
},

View File

@ -15,7 +15,7 @@ const DEFAULT_POLL_INTERVAL: u64 = 1;
const DEFAULT_CONFIRMATIONS: usize = 12;
const DEFAULT_TIMEOUT: u64 = 3600;
const DEFAULT_RPC_PORT: u16 = 8545;
const DEFAULT_CONCURRENCY: usize = 100;
pub(crate) const DEFAULT_CONCURRENCY: usize = 64;
const DEFAULT_GAS_PRICE_SPEED: GasPriceSpeed = GasPriceSpeed::Fast;
const DEFAULT_GAS_PRICE_TIMEOUT_SECS: u64 = 10;
const DEFAULT_GAS_PRICE_WEI: u64 = 15_000_000_000;
@ -79,6 +79,7 @@ pub struct Node {
pub gas_price_speed: GasPriceSpeed,
pub gas_price_timeout: Duration,
pub default_gas_price: u64,
pub concurrent_http_requests: usize,
}
use std::sync::{Arc, RwLock};
@ -118,6 +119,7 @@ impl Node {
};
let default_gas_price = node.default_gas_price.unwrap_or(DEFAULT_GAS_PRICE_WEI);
let concurrent_http_requests = node.concurrent_http_requests.unwrap_or(DEFAULT_CONCURRENCY);
let result = Node {
account: node.account,
@ -141,6 +143,7 @@ impl Node {
gas_price_speed,
gas_price_timeout,
default_gas_price,
concurrent_http_requests,
};
Ok(result)
@ -185,7 +188,6 @@ impl Transactions {
pub struct TransactionConfig {
pub gas: u64,
pub gas_price: u64,
pub concurrency: usize,
}
impl TransactionConfig {
@ -193,7 +195,6 @@ impl TransactionConfig {
TransactionConfig {
gas: cfg.gas.unwrap_or_default(),
gas_price: cfg.gas_price.unwrap_or_default(),
concurrency: cfg.concurrency.unwrap_or(DEFAULT_CONCURRENCY),
}
}
}
@ -277,6 +278,7 @@ mod load {
pub gas_price_speed: Option<String>,
pub gas_price_timeout: Option<u64>,
pub default_gas_price: Option<u64>,
pub concurrent_http_requests: Option<usize>,
}
#[derive(Deserialize)]
@ -295,7 +297,6 @@ mod load {
pub struct TransactionConfig {
pub gas: Option<u64>,
pub gas_price: Option<u64>,
pub concurrency: Option<usize>,
}
#[derive(Deserialize)]
@ -382,6 +383,7 @@ home_deploy = { gas = 20 }
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
default_gas_price: DEFAULT_GAS_PRICE_WEI,
concurrent_http_requests: DEFAULT_CONCURRENCY,
},
foreign: Node {
account: "0000000000000000000000000000000000000001".into(),
@ -400,6 +402,7 @@ home_deploy = { gas = 20 }
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
default_gas_price: DEFAULT_GAS_PRICE_WEI,
concurrent_http_requests: DEFAULT_CONCURRENCY,
},
authorities: Authorities {
accounts: vec![
@ -418,7 +421,6 @@ home_deploy = { gas = 20 }
expected.txs.home_deploy = TransactionConfig {
gas: 20,
gas_price: 0,
concurrency: DEFAULT_CONCURRENCY,
};
}
@ -475,6 +477,7 @@ required_signatures = 2
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
default_gas_price: DEFAULT_GAS_PRICE_WEI,
concurrent_http_requests: DEFAULT_CONCURRENCY,
},
foreign: Node {
account: "0000000000000000000000000000000000000001".into(),
@ -493,6 +496,7 @@ required_signatures = 2
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
default_gas_price: DEFAULT_GAS_PRICE_WEI,
concurrent_http_requests: DEFAULT_CONCURRENCY,
},
authorities: Authorities {
accounts: vec![