Merge pull request #100 from yrashk/parallel-http
Problem: slow performance and regular timeouts sending transactions
This commit is contained in:
commit
47541b067f
|
@ -124,6 +124,7 @@ withdraw_confirm = { gas = 3000000 }
|
|||
- `home/foreign.gas_price_timeout` - the number of seconds to wait for an HTTP response from the gas price oracle before using the default gas price. Defaults to `10 seconds`.
|
||||
- `home/foreign.gas_price_speed` - retrieve the gas-price corresponding to this speed when querying from an Oracle. Defaults to `fast`. The available values are: "instant", "fast", "standard", and "slow".
|
||||
- `home/foreign.default_gas_price` - the default gas price (in WEI) used in transactions with the home or foreign nodes. The `default_gas_price` is used when the Oracle cannot be reached. The default value is `15_000_000_000` WEI (ie. 15 GWEI).
|
||||
- `home/foreign.concurrent_http_requests` - the number of concurrent HTTP requests allowed in-flight (default: **64**)
|
||||
|
||||
#### authorities options
|
||||
|
||||
|
@ -133,11 +134,8 @@ withdraw_confirm = { gas = 3000000 }
|
|||
#### transaction options
|
||||
|
||||
- `transaction.deposit_relay.gas` - specify how much gas should be consumed by deposit relay
|
||||
- `transaction.deposit_relay.concurrency` - how many concurrent transactions can be sent (default: **100**)
|
||||
- `transaction.withdraw_confirm.gas` - specify how much gas should be consumed by withdraw confirm
|
||||
- `transaction.withdraw_confirm.concurrency` - how many concurrent transactions can be sent (default: **100**)
|
||||
- `transaction.withdraw_relay.gas` - specify how much gas should be consumed by withdraw relay
|
||||
- `transaction.withdraw_relay.concurrency` - how many concurrent transactions can be sent (default: **100**)
|
||||
|
||||
### Database file format
|
||||
|
||||
|
|
|
@ -31,13 +31,13 @@ pub struct Connections<T> where T: Transport {
|
|||
}
|
||||
|
||||
impl Connections<Http> {
|
||||
pub fn new_http(handle: &Handle, home: &str, foreign: &str) -> Result<Self, Error> {
|
||||
pub fn new_http(handle: &Handle, home: &str, home_concurrent_connections: usize, foreign: &str, foreign_concurrent_connections: usize) -> Result<Self, Error> {
|
||||
|
||||
let home = Http::with_event_loop(home, handle,1)
|
||||
let home = Http::with_event_loop(home, handle,home_concurrent_connections)
|
||||
.map_err(ErrorKind::Web3)
|
||||
.map_err(Error::from)
|
||||
.chain_err(||"Cannot connect to home node rpc")?;
|
||||
let foreign = Http::with_event_loop(foreign, handle, 1)
|
||||
let foreign = Http::with_event_loop(foreign, handle, foreign_concurrent_connections)
|
||||
.map_err(ErrorKind::Web3)
|
||||
.map_err(Error::from)
|
||||
.chain_err(||"Cannot connect to foreign node rpc")?;
|
||||
|
@ -64,7 +64,7 @@ impl App<Http> {
|
|||
let home_url:String = format!("{}:{}", config.home.rpc_host, config.home.rpc_port);
|
||||
let foreign_url:String = format!("{}:{}", config.foreign.rpc_host, config.foreign.rpc_port);
|
||||
|
||||
let connections = Connections::new_http(handle, home_url.as_ref(), foreign_url.as_ref())?;
|
||||
let connections = Connections::new_http(handle, home_url.as_ref(), config.home.concurrent_http_requests, foreign_url.as_ref(), config.foreign.concurrent_http_requests)?;
|
||||
let keystore = EthStore::open(Box::new(RootDiskDirectory::at(&config.keystore))).map_err(|e| ErrorKind::KeyStore(e))?;
|
||||
|
||||
let keystore = AccountProvider::new(Box::new(keystore), AccountProviderSettings {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::sync::{Arc, RwLock};
|
||||
use futures::{self, Future, Stream, stream::{Collect, iter_ok, IterOk, Buffered}, Poll};
|
||||
use futures::{self, Future, Stream, stream::{Collect, FuturesUnordered, futures_unordered}, Poll};
|
||||
use web3::Transport;
|
||||
use web3::types::{U256, Address, Bytes, Log, FilterBuilder};
|
||||
use ethabi::RawLog;
|
||||
|
@ -35,7 +35,7 @@ enum DepositRelayState<T: Transport> {
|
|||
Wait,
|
||||
/// Relaying deposits in progress.
|
||||
RelayDeposits {
|
||||
future: Collect<Buffered<IterOk<::std::vec::IntoIter<NonceCheck<T, SendRawTransaction<T>>>, Error>>>,
|
||||
future: Collect<FuturesUnordered<NonceCheck<T, SendRawTransaction<T>>>>,
|
||||
block: u64,
|
||||
},
|
||||
/// All deposits till given block has been relayed.
|
||||
|
@ -115,7 +115,7 @@ impl<T: Transport> Stream for DepositRelay<T> {
|
|||
|
||||
info!("relaying {} deposits", len);
|
||||
DepositRelayState::RelayDeposits {
|
||||
future: iter_ok(deposits).buffered(self.app.config.txs.deposit_relay.concurrency).collect(),
|
||||
future: futures_unordered(deposits).collect(),
|
||||
block: item.to,
|
||||
}
|
||||
},
|
||||
|
|
|
@ -143,7 +143,7 @@ mod tests {
|
|||
use super::*;
|
||||
use error::{Error, ErrorKind};
|
||||
use futures::{Async, future::{err, ok, FutureResult}};
|
||||
use config::{Node, NodeInfo};
|
||||
use config::{Node, NodeInfo, DEFAULT_CONCURRENCY};
|
||||
use tokio_timer::Timer;
|
||||
use std::time::Duration;
|
||||
use std::path::PathBuf;
|
||||
|
@ -176,6 +176,7 @@ mod tests {
|
|||
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
|
||||
gas_price_timeout: Duration::from_secs(5),
|
||||
default_gas_price: 15_000_000_000,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
};
|
||||
let timer = Timer::default();
|
||||
let mut stream = GasPriceStream::new_with_retriever(&node, ErroredRequest, &timer);
|
||||
|
@ -218,6 +219,7 @@ mod tests {
|
|||
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
|
||||
gas_price_timeout: Duration::from_secs(5),
|
||||
default_gas_price: 15_000_000_000,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
};
|
||||
let timer = Timer::default();
|
||||
let mut stream = GasPriceStream::new_with_retriever(&node, BadJson, &timer);
|
||||
|
@ -260,6 +262,7 @@ mod tests {
|
|||
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
|
||||
gas_price_timeout: Duration::from_secs(5),
|
||||
default_gas_price: 15_000_000_000,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
};
|
||||
let timer = Timer::default();
|
||||
let mut stream = GasPriceStream::new_with_retriever(&node, UnexpectedJson, &timer);
|
||||
|
@ -301,6 +304,7 @@ mod tests {
|
|||
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
|
||||
gas_price_timeout: Duration::from_secs(5),
|
||||
default_gas_price: 15_000_000_000,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
};
|
||||
let timer = Timer::default();
|
||||
let mut stream = GasPriceStream::new_with_retriever(&node, NonObjectJson, &timer);
|
||||
|
@ -342,6 +346,7 @@ mod tests {
|
|||
gas_price_speed: GasPriceSpeed::from_str("fast").unwrap(),
|
||||
gas_price_timeout: Duration::from_secs(5),
|
||||
default_gas_price: 15_000_000_000,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
};
|
||||
let timer = Timer::default();
|
||||
let mut stream = GasPriceStream::new_with_retriever(&node, CorrectJson, &timer);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::sync::{Arc, RwLock};
|
||||
use std::ops;
|
||||
use futures::{self, Future, Stream, stream::{Collect, IterOk, iter_ok, Buffered}, Poll};
|
||||
use futures::{self, Future, Stream, stream::{Collect, FuturesUnordered, futures_unordered}, Poll};
|
||||
use web3::Transport;
|
||||
use web3::types::{U256, H520, Address, Bytes, FilterBuilder};
|
||||
use api::{self, LogStream};
|
||||
|
@ -30,7 +30,7 @@ enum WithdrawConfirmState<T: Transport> {
|
|||
Wait,
|
||||
/// Confirming withdraws.
|
||||
ConfirmWithdraws {
|
||||
future: Collect<Buffered<IterOk<::std::vec::IntoIter<NonceCheck<T, SendRawTransaction<T>>>, Error>>>,
|
||||
future: Collect<FuturesUnordered<NonceCheck<T, SendRawTransaction<T>>>>,
|
||||
block: u64,
|
||||
},
|
||||
/// All withdraws till given block has been confirmed.
|
||||
|
@ -139,7 +139,7 @@ impl<T: Transport> Stream for WithdrawConfirm<T> {
|
|||
|
||||
info!("submitting {} signatures", len);
|
||||
WithdrawConfirmState::ConfirmWithdraws {
|
||||
future: iter_ok(confirmations).buffered(self.app.config.txs.withdraw_confirm.concurrency).collect(),
|
||||
future: futures_unordered(confirmations).collect(),
|
||||
block,
|
||||
}
|
||||
},
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::sync::{Arc, RwLock};
|
||||
use futures::{self, Future, Stream, stream::{Collect, iter_ok, IterOk, Buffered}, Poll};
|
||||
use futures::{self, Future, Stream, stream::{Collect, FuturesUnordered, futures_unordered}, Poll};
|
||||
use futures::future::{JoinAll, join_all, Join};
|
||||
use tokio_timer::Timeout;
|
||||
use web3::Transport;
|
||||
|
@ -70,7 +70,7 @@ pub enum WithdrawRelayState<T: Transport> {
|
|||
block: u64,
|
||||
},
|
||||
RelayWithdraws {
|
||||
future: Collect<Buffered<IterOk<::std::vec::IntoIter<NonceCheck<T, SendRawTransaction<T>>>, Error>>>,
|
||||
future: Collect<FuturesUnordered<NonceCheck<T, SendRawTransaction<T>>>>,
|
||||
block: u64,
|
||||
},
|
||||
Yield(Option<u64>),
|
||||
|
@ -236,7 +236,7 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
|
|||
|
||||
info!("relaying {} withdraws", len);
|
||||
WithdrawRelayState::RelayWithdraws {
|
||||
future: iter_ok(relays).buffered(self.app.config.txs.withdraw_relay.concurrency).collect(),
|
||||
future: futures_unordered(relays).collect(),
|
||||
block,
|
||||
}
|
||||
},
|
||||
|
|
|
@ -15,7 +15,7 @@ const DEFAULT_POLL_INTERVAL: u64 = 1;
|
|||
const DEFAULT_CONFIRMATIONS: usize = 12;
|
||||
const DEFAULT_TIMEOUT: u64 = 3600;
|
||||
const DEFAULT_RPC_PORT: u16 = 8545;
|
||||
const DEFAULT_CONCURRENCY: usize = 100;
|
||||
pub(crate) const DEFAULT_CONCURRENCY: usize = 64;
|
||||
const DEFAULT_GAS_PRICE_SPEED: GasPriceSpeed = GasPriceSpeed::Fast;
|
||||
const DEFAULT_GAS_PRICE_TIMEOUT_SECS: u64 = 10;
|
||||
const DEFAULT_GAS_PRICE_WEI: u64 = 15_000_000_000;
|
||||
|
@ -79,6 +79,7 @@ pub struct Node {
|
|||
pub gas_price_speed: GasPriceSpeed,
|
||||
pub gas_price_timeout: Duration,
|
||||
pub default_gas_price: u64,
|
||||
pub concurrent_http_requests: usize,
|
||||
}
|
||||
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
@ -118,6 +119,7 @@ impl Node {
|
|||
};
|
||||
|
||||
let default_gas_price = node.default_gas_price.unwrap_or(DEFAULT_GAS_PRICE_WEI);
|
||||
let concurrent_http_requests = node.concurrent_http_requests.unwrap_or(DEFAULT_CONCURRENCY);
|
||||
|
||||
let result = Node {
|
||||
account: node.account,
|
||||
|
@ -141,6 +143,7 @@ impl Node {
|
|||
gas_price_speed,
|
||||
gas_price_timeout,
|
||||
default_gas_price,
|
||||
concurrent_http_requests,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
|
@ -185,7 +188,6 @@ impl Transactions {
|
|||
pub struct TransactionConfig {
|
||||
pub gas: u64,
|
||||
pub gas_price: u64,
|
||||
pub concurrency: usize,
|
||||
}
|
||||
|
||||
impl TransactionConfig {
|
||||
|
@ -193,7 +195,6 @@ impl TransactionConfig {
|
|||
TransactionConfig {
|
||||
gas: cfg.gas.unwrap_or_default(),
|
||||
gas_price: cfg.gas_price.unwrap_or_default(),
|
||||
concurrency: cfg.concurrency.unwrap_or(DEFAULT_CONCURRENCY),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -277,6 +278,7 @@ mod load {
|
|||
pub gas_price_speed: Option<String>,
|
||||
pub gas_price_timeout: Option<u64>,
|
||||
pub default_gas_price: Option<u64>,
|
||||
pub concurrent_http_requests: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -295,7 +297,6 @@ mod load {
|
|||
pub struct TransactionConfig {
|
||||
pub gas: Option<u64>,
|
||||
pub gas_price: Option<u64>,
|
||||
pub concurrency: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -382,6 +383,7 @@ home_deploy = { gas = 20 }
|
|||
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
|
||||
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
|
||||
default_gas_price: DEFAULT_GAS_PRICE_WEI,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
},
|
||||
foreign: Node {
|
||||
account: "0000000000000000000000000000000000000001".into(),
|
||||
|
@ -400,6 +402,7 @@ home_deploy = { gas = 20 }
|
|||
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
|
||||
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
|
||||
default_gas_price: DEFAULT_GAS_PRICE_WEI,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
},
|
||||
authorities: Authorities {
|
||||
accounts: vec![
|
||||
|
@ -418,7 +421,6 @@ home_deploy = { gas = 20 }
|
|||
expected.txs.home_deploy = TransactionConfig {
|
||||
gas: 20,
|
||||
gas_price: 0,
|
||||
concurrency: DEFAULT_CONCURRENCY,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -475,6 +477,7 @@ required_signatures = 2
|
|||
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
|
||||
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
|
||||
default_gas_price: DEFAULT_GAS_PRICE_WEI,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
},
|
||||
foreign: Node {
|
||||
account: "0000000000000000000000000000000000000001".into(),
|
||||
|
@ -493,6 +496,7 @@ required_signatures = 2
|
|||
gas_price_speed: DEFAULT_GAS_PRICE_SPEED,
|
||||
gas_price_timeout: Duration::from_secs(DEFAULT_GAS_PRICE_TIMEOUT_SECS),
|
||||
default_gas_price: DEFAULT_GAS_PRICE_WEI,
|
||||
concurrent_http_requests: DEFAULT_CONCURRENCY,
|
||||
},
|
||||
authorities: Authorities {
|
||||
accounts: vec![
|
||||
|
|
Loading…
Reference in New Issue