poa-bridge/src/bridge/deposit_relay.rs

116 lines
3.7 KiB
Rust
Raw Normal View History

2017-08-12 07:57:07 -07:00
use std::sync::Arc;
2017-08-13 10:28:41 -07:00
use futures::{Future, Stream, Poll};
2017-08-12 07:57:07 -07:00
use futures::future::{JoinAll, join_all};
use web3::Transport;
use web3::helpers::CallResult;
use web3::types::{TransactionRequest, H256, Address, Bytes, Log, FilterBuilder};
use ethabi::RawLog;
2017-08-12 07:57:07 -07:00
use api::{LogStream, self};
use error::{Error, ErrorKind, Result};
2017-08-12 11:03:48 -07:00
use database::Database;
use contracts::{mainnet, testnet, web3_topic};
2017-08-12 07:57:07 -07:00
use app::App;
2017-08-10 08:55:46 -07:00
fn deposits_filter(mainnet: &mainnet::EthereumBridge, address: Address) -> FilterBuilder {
let filter = mainnet.events().deposit().create_filter();
let t0 = web3_topic(filter.topic0);
let t1 = web3_topic(filter.topic1);
let t2 = web3_topic(filter.topic2);
let t3 = web3_topic(filter.topic3);
FilterBuilder::default()
.address(vec![address])
.topics(t0, t1, t2, t3)
}
fn deposit_relay_payload(mainnet: &mainnet::EthereumBridge, testnet: &testnet::KovanBridge, log: Log) -> Result<Bytes> {
let raw_log = RawLog {
topics: log.topics.into_iter().map(|t| t.0).collect(),
data: log.data.0,
};
let deposit_log = mainnet.events().deposit().parse_log(raw_log)?;
let hash = log.transaction_hash.expect("log to be mined and contain `transaction_hash`");
let payload = testnet.functions().deposit().input(deposit_log.recipient, deposit_log.value, hash.0);
Ok(payload.into())
}
2017-08-13 04:09:28 -07:00
/// State of deposits relay.
enum DepositRelayState<T: Transport> {
/// Deposit relay is waiting for logs.
2017-08-12 07:57:07 -07:00
Wait,
2017-08-13 04:09:28 -07:00
/// Relaying deposits in progress.
2017-08-12 07:57:07 -07:00
RelayDeposits {
future: JoinAll<Vec<CallResult<H256, T::Out>>>,
block: u64,
},
2017-08-13 07:15:14 -07:00
/// All deposits till given block has been relayed.
2017-08-12 07:57:07 -07:00
Yield(Option<u64>),
}
2017-08-13 04:09:50 -07:00
pub fn create_deposit_relay<T: Transport + Clone>(app: Arc<App<T>>, init: &Database) -> DepositRelay<T> {
2017-08-13 04:09:28 -07:00
let logs_init = api::LogStreamInit {
after: init.checked_deposit_relay,
poll_interval: app.config.mainnet.poll_interval,
confirmations: app.config.mainnet.required_confirmations,
filter: deposits_filter(&app.mainnet_bridge, init.mainnet_contract_address.clone()),
2017-08-13 04:09:28 -07:00
};
DepositRelay {
logs: api::log_stream(app.connections.mainnet.clone(), logs_init),
testnet_contract: init.testnet_contract_address.clone(),
state: DepositRelayState::Wait,
app,
}
}
2017-08-12 07:57:07 -07:00
pub struct DepositRelay<T: Transport> {
app: Arc<App<T>>,
logs: LogStream<T>,
state: DepositRelayState<T>,
2017-08-13 04:09:28 -07:00
testnet_contract: Address,
2017-08-12 07:57:07 -07:00
}
impl<T: Transport> Stream for DepositRelay<T> {
type Item = u64;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let next_state = match self.state {
2017-08-12 11:03:48 -07:00
DepositRelayState::Wait => {
let item = try_stream!(self.logs.poll());
let deposits = item.logs
.into_iter()
.map(|log| deposit_relay_payload(&self.app.mainnet_bridge, &self.app.testnet_bridge, log))
.collect::<Result<Vec<_>>>()?
2017-08-12 11:03:48 -07:00
.into_iter()
.map(|payload| TransactionRequest {
from: self.app.config.testnet.account.clone(),
2017-08-13 04:09:28 -07:00
to: Some(self.testnet_contract.clone()),
2017-08-12 11:03:48 -07:00
gas: Some(self.app.config.testnet.txs.deposit.gas.into()),
gas_price: Some(self.app.config.testnet.txs.deposit.gas_price.into()),
value: Some(self.app.config.testnet.txs.deposit.value.into()),
data: Some(payload),
nonce: None,
condition: None,
})
.map(|request| api::send_transaction(&self.app.connections.testnet, request))
.collect::<Vec<_>>();
2017-08-12 07:57:07 -07:00
2017-08-12 11:03:48 -07:00
DepositRelayState::RelayDeposits {
future: join_all(deposits),
2017-08-13 07:13:03 -07:00
block: item.to,
2017-08-12 11:03:48 -07:00
}
2017-08-12 07:57:07 -07:00
},
DepositRelayState::RelayDeposits { ref mut future, block } => {
let _ = try_ready!(future.poll().map_err(ErrorKind::Web3));
DepositRelayState::Yield(Some(block))
},
DepositRelayState::Yield(ref mut block) => match block.take() {
None => DepositRelayState::Wait,
some => return Ok(some.into()),
}
};
self.state = next_state;
}
}
2017-08-10 08:55:46 -07:00
}