From b291deff82c969a95c0a42c09c48721269b4d6da Mon Sep 17 00:00:00 2001 From: debris Date: Fri, 11 Aug 2017 13:13:27 +0200 Subject: [PATCH] create logs stream --- src/api.rs | 153 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 153 insertions(+) diff --git a/src/api.rs b/src/api.rs index d07b5f1..9617e5c 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1 +1,154 @@ +use std::vec; +use std::time::Duration; +use futures::{Future, Stream, Poll}; +use futures_after::{After, AfterStream}; +use web3::{self, api, Transport}; +use web3::api::{Namespace, FilterStream, CreateFilter}; +use web3::types::{Log, Filter, H256, Block, BlockId, BlockNumber}; +use web3::helpers::CallResult; +use error::Error; + pub use web3::confirm::send_transaction_with_confirmation; + +pub fn logs(transport: T, filter: &Filter) -> CallResult, T::Out> { + api::Eth::new(transport).logs(filter) +} + +pub fn block(transport: T, id: BlockId) -> CallResult, T::Out> { + api::Eth::new(transport).block(id) +} + +pub fn create_blocks_filter(transport: T) -> CreateFilter { + api::EthFilter::new(transport).create_blocks_filter() +} + +pub enum BlockNumbersStreamState { + WaitForNextBlock, + FetchBlock(CallResult, T::Out>), + NextItem(Option), +} + +pub struct BlockNumbersStream { + transport: T, + stream: FilterStream, + state: BlockNumbersStreamState, +} + +impl BlockNumbersStream { + fn new(transport: T, stream: FilterStream) -> Self { + BlockNumbersStream { + transport, + stream, + state: BlockNumbersStreamState::WaitForNextBlock, + } + } +} + +impl Stream for BlockNumbersStream { + type Item = BlockNumber; + type Error = web3::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + let next_state = match self.state { + BlockNumbersStreamState::WaitForNextBlock => match try_ready!(self.stream.poll()) { + Some(hash) => BlockNumbersStreamState::FetchBlock(block(&self.transport, hash.into())), + None => return Ok(None.into()), + }, + BlockNumbersStreamState::FetchBlock(ref mut future) => { + let block = try_ready!(future.poll()); + let block_number = block.number.expect("block number to exist for mined block"); + BlockNumbersStreamState::NextItem(Some(BlockNumber::Number(block_number.low_u64()))) + }, + BlockNumbersStreamState::NextItem(ref mut item) => match item.take() { + None => BlockNumbersStreamState::WaitForNextBlock, + some => return Ok(some.into()), + } + }; + + self.state = next_state; + } + } +} + +pub enum LogsStreamState { + WaitForNextBlock, + FetchLogs(CallResult, T::Out>), + NextLog(vec::IntoIter), +} + +pub struct LogsStream { + transport: T, + state: LogsStreamState, + stream: After>, + filter: Filter, +} + +impl LogsStream { + fn new(transport: T, stream: FilterStream, filter: Filter, confirmations: usize) -> Self { + LogsStream { + stream: BlockNumbersStream::new(transport.clone(), stream).after(confirmations), + state: LogsStreamState::WaitForNextBlock, + transport, + filter, + } + } +} + +impl Stream for LogsStream { + type Item = Log; + type Error = web3::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + let next_state = match self.state { + LogsStreamState::WaitForNextBlock => match try_ready!(self.stream.poll()) { + Some(number) => { + self.filter.from_block = Some(number.clone()); + self.filter.to_block = Some(number); + LogsStreamState::FetchLogs(logs(&self.transport, &self.filter)) + }, + None => return Ok(None.into()), + }, + LogsStreamState::FetchLogs(ref mut future) => { + let logs = try_ready!(future.poll()); + LogsStreamState::NextLog(logs.into_iter()) + }, + LogsStreamState::NextLog(ref mut iter) => match iter.next() { + None => LogsStreamState::WaitForNextBlock, + some => return Ok(some.into()), + }, + }; + self.state = next_state; + } + } +} + +pub struct CreateLogsStream { + create_filter: CreateFilter, + transport: T, + log_filter: Filter, + poll_interval: Duration, + confirmations: usize, +} + +impl Future for CreateLogsStream { + type Item = LogsStream; + type Error = web3::Error; + + fn poll(&mut self) -> Poll { + let filter = try_ready!(self.create_filter.poll()); + let stream = LogsStream::new(self.transport.clone(), filter.stream(self.poll_interval), self.log_filter.clone(), self.confirmations); + Ok(stream.into()) + } +} + +pub fn create_logs_stream_with_confirmations(transport: T, log_filter: Filter, poll_interval: Duration, confirmations: usize) -> CreateLogsStream { + CreateLogsStream { + create_filter: create_blocks_filter(transport.clone()), + transport, + log_filter, + poll_interval, + confirmations, + } +}