use std::sync::Arc; use futures::{Future, Stream, Poll}; use futures::future::{JoinAll, join_all}; use web3::Transport; use web3::helpers::CallResult; use web3::types::{TransactionRequest, H256, Address, Bytes, Log, FilterBuilder}; use ethabi::RawLog; use api::{LogStream, self}; use error::{Error, ErrorKind, Result}; use database::Database; use contracts::{mainnet, testnet, web3_filter}; use app::App; fn deposits_filter(mainnet: &mainnet::EthereumBridge, address: Address) -> FilterBuilder { let filter = mainnet.events().deposit().create_filter(); web3_filter(filter, address) } fn deposit_relay_payload(mainnet: &mainnet::EthereumBridge, testnet: &testnet::KovanBridge, log: Log) -> Result { let raw_log = RawLog { topics: log.topics.into_iter().map(|t| t.0).collect(), data: log.data.0, }; let deposit_log = mainnet.events().deposit().parse_log(raw_log)?; let hash = log.transaction_hash.expect("log to be mined and contain `transaction_hash`"); let payload = testnet.functions().deposit().input(deposit_log.recipient, deposit_log.value, hash.0); Ok(payload.into()) } /// State of deposits relay. enum DepositRelayState { /// Deposit relay is waiting for logs. Wait, /// Relaying deposits in progress. RelayDeposits { future: JoinAll>>, block: u64, }, /// All deposits till given block has been relayed. Yield(Option), } pub fn create_deposit_relay(app: Arc>, init: &Database) -> DepositRelay { let logs_init = api::LogStreamInit { after: init.checked_deposit_relay, poll_interval: app.config.mainnet.poll_interval, confirmations: app.config.mainnet.required_confirmations, filter: deposits_filter(&app.mainnet_bridge, init.mainnet_contract_address.clone()), }; DepositRelay { logs: api::log_stream(app.connections.mainnet.clone(), logs_init), testnet_contract: init.testnet_contract_address.clone(), state: DepositRelayState::Wait, app, } } pub struct DepositRelay { app: Arc>, logs: LogStream, state: DepositRelayState, testnet_contract: Address, } impl Stream for DepositRelay { type Item = u64; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { loop { let next_state = match self.state { DepositRelayState::Wait => { let item = try_stream!(self.logs.poll()); let deposits = item.logs .into_iter() .map(|log| deposit_relay_payload(&self.app.mainnet_bridge, &self.app.testnet_bridge, log)) .collect::>>()? .into_iter() .map(|payload| TransactionRequest { from: self.app.config.testnet.account.clone(), to: Some(self.testnet_contract.clone()), gas: Some(self.app.config.testnet.txs.deposit.gas.into()), gas_price: Some(self.app.config.testnet.txs.deposit.gas_price.into()), value: Some(self.app.config.testnet.txs.deposit.value.into()), data: Some(payload), nonce: None, condition: None, }) .map(|request| api::send_transaction(&self.app.connections.testnet, request)) .collect::>(); DepositRelayState::RelayDeposits { future: join_all(deposits), block: item.to, } }, DepositRelayState::RelayDeposits { ref mut future, block } => { let _ = try_ready!(future.poll().map_err(ErrorKind::Web3)); DepositRelayState::Yield(Some(block)) }, DepositRelayState::Yield(ref mut block) => match block.take() { None => DepositRelayState::Wait, some => return Ok(some.into()), } }; self.state = next_state; } } }