use std::time::Duration; use serde::de::DeserializeOwned; use serde_json::Value; use futures::{Future, Stream, Poll}; use tokio_timer::{Timer, Interval, Timeout}; use web3::{self, api, Transport}; use web3::api::Namespace; use web3::types::{Log, Filter, H256, U256, FilterBuilder, Bytes, Address, CallRequest, BlockNumber}; use web3::helpers::{self, CallResult}; use error::{Error, ErrorKind}; /// Imperative alias for web3 function. pub use web3::confirm::send_raw_transaction_with_confirmation; /// Wrapper type for `CallResult` pub struct ApiCall { future: CallResult, message: &'static str, } impl>Future for ApiCall { type Item = T; type Error = Error; fn poll(&mut self) -> Poll { trace!(target: "bridge", "{}", self.message); self.future.poll().map_err(ErrorKind::Web3).map_err(Into::into) } } /// Imperative wrapper for web3 function. pub fn net_version(transport: T) -> ApiCall { ApiCall { future: CallResult::new(transport.execute("net_version", vec![])), message: "net_version", } } /// Imperative wrapper for web3 function. pub fn eth_get_transaction_count(transport: T, address: Address, block: Option) -> ApiCall { // we are not using Eth.balance() because it converts None block into `latest` // while we want `pending` because there might have not been enough time since // the last transaction to get it mined. let address = helpers::serialize(&address); let block = helpers::serialize(&block.unwrap_or(BlockNumber::Pending)); ApiCall { future: CallResult::new(transport.execute("eth_getTransactionCount", vec![address, block])), message: "net_version", } } use serde_json; /// trimming the null from the tail because at least some RPC servers require a topic to be present /// if there's a null /// FIXME: this is not a great fix long term fn trim_filter(filter: &Filter) -> serde_json::Value { fn trim_filter1(vals: &mut Vec) { loop { match vals.pop() { None => { return; }, Some(serde_json::Value::Null) => (), Some(v) => { vals.push(v); return; } } } } match helpers::serialize(filter) { serde_json::Value::Object(mut map) => { for (k, v) in map.iter_mut() { if k == "topics" { match v { &mut serde_json::Value::Array(ref mut v) => trim_filter1(v), _ => (), } } } serde_json::Value::Object(map) } val => val, } } /// Imperative wrapper for web3 function. pub fn logs(transport: T, filter: &Filter) -> ApiCall, T::Out> { let filter = trim_filter(filter); ApiCall { future: CallResult::new(transport.execute("eth_getLogs", vec![filter])), message: "eth_getLogs", } } /// Imperative wrapper for web3 function. pub fn block_number(transport: T) -> ApiCall { ApiCall { future: api::Eth::new(transport).block_number(), message: "eth_blockNumber", } } /// Imperative wrapper for web3 function. pub fn balance(transport: T, address: Address, block: Option) -> ApiCall { // we are not using Eth.balance() because it converts None block into `latest` // while we want `pending` because there might have not been enough time since // the last transaction to get it mined. let address = helpers::serialize(&address); let block = helpers::serialize(&block.unwrap_or(BlockNumber::Pending)); ApiCall { future: CallResult::new(transport.execute("eth_getBalance", vec![address, block])), message: "eth_getBalance", } } /// Imperative wrapper for web3 function. pub fn send_raw_transaction(transport: T, tx: Bytes) -> ApiCall { ApiCall { future: api::Eth::new(transport).send_raw_transaction(tx), message: "eth_sendRawTransaction", } } pub use bridge::nonce::send_transaction_with_nonce; /// Imperative wrapper for web3 function. pub fn call(transport: T, address: Address, payload: Bytes) -> ApiCall { let future = api::Eth::new(transport).call(CallRequest { from: None, to: address, gas: None, gas_price: None, value: None, data: Some(payload), }, None); ApiCall { future, message: "eth_call", } } /// Returns a eth_sign-compatible hash of data to sign. /// The data is prepended with special message to prevent /// chosen-plaintext attacks. pub fn eth_data_hash(mut data: Vec) -> H256 { use keccak_hash::keccak; let mut message_data = format!("\x19Ethereum Signed Message:\n{}", data.len()) .into_bytes(); message_data.append(&mut data); keccak(message_data) } /// Used for `LogStream` initialization. pub struct LogStreamInit { pub after: u64, pub filter: FilterBuilder, pub request_timeout: Duration, pub poll_interval: Duration, pub confirmations: usize, } /// Contains all logs matching `LogStream` filter in inclusive range `[from, to]`. #[derive(Debug, PartialEq)] pub struct LogStreamItem { pub from: u64, pub to: u64, pub logs: Vec, } /// Log Stream state. enum LogStreamState { /// Log Stream is waiting for timer to poll. Wait, /// Fetching best block number. FetchBlockNumber(Timeout>), /// Fetching logs for new best block. FetchLogs { from: u64, to: u64, future: Timeout, T::Out>>, }, /// All logs has been fetched. NextItem(Option), } /// Creates new `LogStream`. pub fn log_stream(transport: T, timer: Timer, init: LogStreamInit) -> LogStream { LogStream { transport, interval: timer.interval(init.poll_interval), timer, state: LogStreamState::Wait, after: init.after, filter: init.filter, confirmations: init.confirmations, request_timeout: init.request_timeout, } } /// Stream of confirmed logs. pub struct LogStream { transport: T, timer: Timer, interval: Interval, state: LogStreamState, after: u64, filter: FilterBuilder, confirmations: usize, request_timeout: Duration, } impl Stream for LogStream { type Item = LogStreamItem; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { loop { let next_state = match self.state { LogStreamState::Wait => { let _ = try_stream!(self.interval.poll()); LogStreamState::FetchBlockNumber(self.timer.timeout(block_number(&self.transport), self.request_timeout)) }, LogStreamState::FetchBlockNumber(ref mut future) => { let last_block = try_ready!(future.poll()).low_u64(); let last_confirmed_block = last_block.saturating_sub(self.confirmations as u64); if last_confirmed_block > self.after { let from = self.after + 1; let filter = self.filter.clone() .from_block(from.into()) .to_block(last_confirmed_block.into()) .build(); LogStreamState::FetchLogs { from: from, to: last_confirmed_block, future: self.timer.timeout(logs(&self.transport, &filter), self.request_timeout), } } else { LogStreamState::Wait } }, LogStreamState::FetchLogs { ref mut future, from, to } => { let logs = try_ready!(future.poll()); let item = LogStreamItem { from, to, logs, }; self.after = to; LogStreamState::NextItem(Some(item)) }, LogStreamState::NextItem(ref mut item) => match item.take() { None => LogStreamState::Wait, some => return Ok(some.into()), }, }; self.state = next_state; } } }