Merge pull request #313 from ethcore/network-rpc

Network RPC, changes to p2p crate, add node method
This commit is contained in:
Nikolay Volf 2016-12-13 00:11:41 +01:00 committed by GitHub
commit ffd177d2e2
15 changed files with 261 additions and 24 deletions

View File

@ -30,7 +30,7 @@ pub use primitives::{hash, bytes};
pub use config::Config;
pub use net::Config as NetConfig;
pub use p2p::P2P;
pub use p2p::{P2P, Context};
pub use event_loop::{event_loop, forever};
pub use util::{PeerId, PeerInfo, InternetProtocol};
pub use util::{NodeTableError, PeerId, PeerInfo, InternetProtocol};
pub use protocol::{InboundSyncConnection, InboundSyncConnectionRef, OutboundSyncConnection, OutboundSyncConnectionRef, LocalSyncNode, LocalSyncNodeRef};

View File

@ -1,5 +1,6 @@
use std::{io, net, error, time};
use std::sync::Arc;
use std::net::SocketAddr;
use parking_lot::RwLock;
use futures::{Future, finished, failed, BoxFuture};
use futures::stream::Stream;
@ -12,7 +13,7 @@ use ns_dns_tokio::DnsResolver;
use message::{Payload, MessageResult, Message};
use message::common::Services;
use net::{connect, Connections, Channel, Config as NetConfig, accept_connection, ConnectionCounter};
use util::{NodeTable, Node, Direction};
use util::{NodeTable, Node, NodeTableError, Direction};
use session::{SessionFactory, SeednodeSessionFactory, NormalSessionFactory};
use {Config, PeerId};
use protocol::{LocalSyncNodeRef, InboundSyncConnectionRef, OutboundSyncConnectionRef};
@ -88,9 +89,21 @@ impl Context {
self.node_table.write().insert_many(nodes);
}
/// Adds node to table.
pub fn add_node(&self, addr: SocketAddr) -> Result<(), NodeTableError> {
trace!("Adding node {} to node table", &addr);
self.node_table.write().add(addr, self.config.connection.services)
}
/// Removes node from table.
pub fn remove_node(&self, addr: SocketAddr) -> Result<(), NodeTableError> {
trace!("Removing node {} from node table", &addr);
self.node_table.write().remove(&addr)
}
/// Every 10 seconds check if we have reached maximum number of outbound connections.
/// If not, connect to best peers.
pub fn autoconnect(context: Arc<Context>, handle: &Handle, config: NetConfig) {
pub fn autoconnect(context: Arc<Context>, handle: &Handle) {
let c = context.clone();
// every 10 seconds connect to new peers (if needed)
let interval: BoxedEmptyFuture = Interval::new(time::Duration::new(10, 0), handle).expect("Failed to create interval")
@ -113,7 +126,7 @@ impl Context {
trace!("Creating {} more outbound connections", addresses.len());
for address in addresses {
Context::connect::<NormalSessionFactory>(context.clone(), address, config.clone());
Context::connect::<NormalSessionFactory>(context.clone(), address);
}
if let Err(_err) = context.node_table.read().save_to_file(&context.config.node_table_path) {
@ -174,13 +187,18 @@ impl Context {
}
/// Connect to socket using given context.
pub fn connect<T>(context: Arc<Context>, socket: net::SocketAddr, config: NetConfig) where T: SessionFactory {
pub fn connect<T>(context: Arc<Context>, socket: net::SocketAddr) where T: SessionFactory {
context.connection_counter.note_new_outbound_connection();
context.remote.clone().spawn(move |handle| {
context.pool.clone().spawn(Context::connect_future::<T>(context, socket, handle, &config))
let config = context.config.clone();
context.pool.clone().spawn(Context::connect_future::<T>(context, socket, handle, &config.connection))
})
}
pub fn connect_normal(context: Arc<Context>, socket: net::SocketAddr) {
Self::connect::<NormalSessionFactory>(context, socket)
}
pub fn accept_connection_future(context: Arc<Context>, stream: TcpStream, socket: net::SocketAddr, handle: &Handle, config: NetConfig) -> BoxedEmptyFuture {
accept_connection(stream, handle, &config, socket).then(move |result| {
match result {
@ -420,25 +438,25 @@ impl P2P {
self.connect_to_seednode(&resolver, seed);
}
Context::autoconnect(self.context.clone(), &self.event_loop_handle, self.config.connection.clone());
Context::autoconnect(self.context.clone(), &self.event_loop_handle);
try!(self.listen());
Ok(())
}
/// Attempts to connect to the specified node
pub fn connect<T>(&self, addr: net::SocketAddr) where T: SessionFactory {
Context::connect::<T>(self.context.clone(), addr, self.config.connection.clone());
Context::connect::<T>(self.context.clone(), addr);
}
pub fn connect_to_seednode(&self, resolver: &Resolver, seednode: &str) {
let owned_seednode = seednode.to_owned();
let context = self.context.clone();
let connection_config = self.config.connection.clone();
let dns_lookup = resolver.resolve(seednode).then(move |result| {
match result {
Ok(address) => match address.pick_one() {
Some(socket) => {
trace!("Dns lookup of seednode {} finished. Connecting to {}", owned_seednode, socket);
Context::connect::<SeednodeSessionFactory>(context, socket, connection_config);
Context::connect::<SeednodeSessionFactory>(context, socket);
},
None => {
trace!("Dns lookup of seednode {} resolved with no results", owned_seednode);
@ -454,10 +472,13 @@ impl P2P {
self.event_loop_handle.spawn(pool_work);
}
fn listen(&self) -> Result<(), Box<error::Error>> {
let server = try!(Context::listen(self.context.clone(), &self.event_loop_handle, self.config.connection.clone()));
self.event_loop_handle.spawn(server);
Ok(())
}
pub fn context(&self) -> &Arc<Context> {
&self.context
}
}

View File

@ -7,7 +7,7 @@ mod response_queue;
mod synchronizer;
pub use self::internet_protocol::InternetProtocol;
pub use self::node_table::{NodeTable, Node};
pub use self::node_table::{NodeTable, NodeTableError, Node};
pub use self::peer::{PeerId, PeerInfo, Direction};
pub use self::response_queue::{ResponseQueue, Responses};
pub use self::synchronizer::{Synchronizer, ConfigurableSynchronizer};

View File

@ -161,6 +161,9 @@ impl PartialOrd for Node {
}
}
#[derive(Debug)]
pub enum NodeTableError { AddressAlreadyAdded, NoAddressInTable }
#[derive(Default, Debug)]
pub struct NodeTable<T = RealTime> where T: Time {
/// Time source.
@ -219,6 +222,35 @@ impl<T> NodeTable<T> where T: Time {
}
}
pub fn exists(&self, addr: SocketAddr) -> bool {
self.by_addr.contains_key(&addr)
}
pub fn add(&mut self, addr: SocketAddr, services: Services) -> Result<(), NodeTableError> {
if self.exists(addr.clone()) {
Err(NodeTableError::AddressAlreadyAdded)
}
else {
self.insert(addr, services);
Ok(())
}
}
/// Tries to remove node with the speicified socket address
/// from table, if exists.
/// Returnes `true` if it has removed anything
pub fn remove(&mut self, addr: &SocketAddr) -> Result<(), NodeTableError> {
let node = self.by_addr.remove(&addr);
match node {
Some(val) => {
self.by_time.remove(&val.clone().into());
self.by_score.remove(&val.into());
Ok(())
}
None => Err(NodeTableError::NoAddressInTable)
}
}
/// Inserts many new addresses into node table.
/// Used in `addr` request handler.
/// Discards all nodes with timestamp newer than current time.
@ -452,6 +484,43 @@ mod tests {
table.note_failure(&s1);
}
#[test]
fn add_node() {
let mut table = NodeTable::<ZeroTime>::default();
let add_result = table.add("127.0.0.1:8001".parse().unwrap(), Services::default());
assert!(add_result.is_ok())
}
#[test]
fn add_duplicate() {
let mut table = NodeTable::<ZeroTime>::default();
table.add("127.0.0.1:8001".parse().unwrap(), Services::default()).unwrap();
let add_result = table.add("127.0.0.1:8001".parse().unwrap(), Services::default());
assert!(add_result.is_err())
}
#[test]
fn remove() {
let mut table = NodeTable::<ZeroTime>::default();
table.add("127.0.0.1:8001".parse().unwrap(), Services::default()).unwrap();
let remove_result = table.remove(&"127.0.0.1:8001".parse().unwrap());
assert!(remove_result.is_ok());
assert_eq!(0, table.by_addr.len());
assert_eq!(0, table.by_score.len());
assert_eq!(0, table.by_time.len());
}
#[test]
fn remove_nonexistant() {
let mut table = NodeTable::<ZeroTime>::default();
let remove_result = table.remove(&"127.0.0.1:8001".parse().unwrap());
assert!(remove_result.is_err());
}
#[test]
fn test_save_and_load() {
let s0: SocketAddr = "127.0.0.1:8000".parse().unwrap();

View File

@ -37,12 +37,13 @@ pub fn start(cfg: config::Config) -> Result<(), String> {
let local_sync_node = create_local_sync_node(&sync_handle, cfg.magic, db);
let sync_connection_factory = create_sync_connection_factory(local_sync_node.clone());
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
let rpc_deps = rpc::Dependencies {
local_sync_node: local_sync_node,
p2p_context: p2p.context().clone(),
};
let _rpc_server = try!(rpc::new_http(cfg.rpc_config, rpc_deps));
let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string()));
try!(p2p.run().map_err(|_| "Failed to start p2p module"));
el.run(p2p::forever()).unwrap();
Ok(())

View File

@ -1,11 +1,14 @@
use std::net::SocketAddr;
use std::sync::Arc;
use rpc_apis::{self, ApiSet};
use ethcore_rpc::{Server, RpcServer, RpcServerError};
use std::io;
use sync;
use p2p;
pub struct Dependencies {
pub local_sync_node: sync::LocalNodeRef,
pub p2p_context: Arc<p2p::Context>,
}
#[derive(Debug, PartialEq)]

View File

@ -9,6 +9,8 @@ pub enum Api {
Raw,
/// Miner
Miner,
/// Network
Network,
}
#[derive(Debug, PartialEq, Eq)]
@ -18,7 +20,7 @@ pub enum ApiSet {
impl Default for ApiSet {
fn default() -> Self {
ApiSet::List(vec![Api::Raw].into_iter().collect())
ApiSet::List(vec![Api::Raw, Api::Network].into_iter().collect())
}
}
@ -29,6 +31,7 @@ impl FromStr for Api {
match s {
"raw" => Ok(Api::Raw),
"miner" => Ok(Api::Miner),
"network" => Ok(Api::Network),
api => Err(format!("Unknown api: {}", api)),
}
}
@ -49,6 +52,7 @@ pub fn setup_rpc<T: Extendable>(server: T, apis: ApiSet, deps: Dependencies) ->
match api {
Api::Raw => server.add_delegate(RawClient::new(RawClientCore::new(deps.local_sync_node.clone())).to_delegate()),
Api::Miner => server.add_delegate(MinerClient::new(MinerClientCore::new(deps.local_sync_node.clone())).to_delegate()),
Api::Network => server.add_delegate(NetworkClient::new(NetworkClientCore::new(deps.p2p_context.clone())).to_delegate()),
}
}
server

View File

@ -7,21 +7,23 @@ mod codes {
pub const TRANSACTION_OUTPUT_NOT_FOUND: i64 = -32097;
pub const TRANSACTION_OF_SIDE_BRANCH: i64 = -32098;
pub const BLOCK_NOT_FOUND: i64 = -32099;
pub const NODE_ALREADY_ADDED: i64 = -32150;
pub const NODE_NOT_ADDED: i64 = -32151;
}
use std::fmt;
use jsonrpc_core::{Error, ErrorCode, Value};
macro_rules! rpc_unimplemented {
() => (Err(::v1::helpers::errors::unimplemented(None)))
macro_rules! rpc_unimplemented {
() => (Err(::v1::helpers::errors::unimplemented(None)))
}
pub fn unimplemented(details: Option<String>) -> Error {
Error {
code: ErrorCode::InternalError,
message: "This request is not implemented yet. Please create an issue on Github repo.".into(),
data: details.map(Value::String),
}
pub fn unimplemented(details: Option<String>) -> Error {
Error {
code: ErrorCode::InternalError,
message: "This request is not implemented yet. Please create an issue on Github repo.".into(),
data: details.map(Value::String),
}
}
pub fn invalid_params<T: fmt::Debug>(param: &str, details: T) -> Error {
@ -79,3 +81,19 @@ pub fn transaction_of_side_branch<T: fmt::Debug>(data: T) -> Error {
data: Some(Value::String(format!("{:?}", data))),
}
}
pub fn node_already_added() -> Error {
Error {
code: ErrorCode::ServerError(codes::NODE_ALREADY_ADDED),
message: "Node already added to the node table".into(),
data: None,
}
}
pub fn node_not_added() -> Error {
Error {
code: ErrorCode::ServerError(codes::NODE_NOT_ADDED),
message: "Node not added to the node table".into(),
data: None,
}
}

View File

@ -1,7 +1,9 @@
mod blockchain;
mod miner;
mod raw;
mod network;
pub use self::blockchain::{BlockChainClient, BlockChainClientCore};
pub use self::miner::{MinerClient, MinerClientCore};
pub use self::raw::{RawClient, RawClientCore};
pub use self::network::{NetworkClient, NetworkClientCore};

View File

@ -0,0 +1,68 @@
use std::sync::Arc;
use std::net::SocketAddr;
use v1::traits::Network as NetworkRpc;
use v1::types::AddNodeOperation;
use jsonrpc_core::Error;
use v1::helpers::errors;
use p2p;
pub trait NetworkApi : Send + Sync + 'static {
fn add_node(&self, socket_addr: SocketAddr) -> Result<(), p2p::NodeTableError>;
fn remove_node(&self, socket_addr: SocketAddr) -> Result<(), p2p::NodeTableError>;
fn connect(&self, socket_addr: SocketAddr);
}
impl<T> NetworkRpc for NetworkClient<T> where T: NetworkApi {
fn add_node(&self, node: String, operation: AddNodeOperation) -> Result<(), Error> {
let addr = try!(node.parse().map_err(
|_| errors::invalid_params("node", "Invalid socket address format, should be ip:port (127.0.0.1:8008)")));
match operation {
AddNodeOperation::Add => {
self.api.add_node(addr).map_err(|_| errors::node_already_added())
},
AddNodeOperation::Remove => {
self.api.remove_node(addr).map_err(|_| errors::node_not_added())
},
AddNodeOperation::OneTry => {
self.api.connect(addr);
Ok(())
}
}
}
}
pub struct NetworkClient<T: NetworkApi> {
api: T,
}
impl<T> NetworkClient<T> where T: NetworkApi {
pub fn new(api: T) -> Self {
NetworkClient {
api: api,
}
}
}
pub struct NetworkClientCore {
p2p: Arc<p2p::Context>,
}
impl NetworkClientCore {
pub fn new(p2p: Arc<p2p::Context>) -> Self {
NetworkClientCore { p2p: p2p }
}
}
impl NetworkApi for NetworkClientCore {
fn add_node(&self, socket_addr: SocketAddr) -> Result<(), p2p::NodeTableError> {
self.p2p.add_node(socket_addr)
}
fn remove_node(&self, socket_addr: SocketAddr) -> Result<(), p2p::NodeTableError> {
self.p2p.remove_node(socket_addr)
}
fn connect(&self, socket_addr: SocketAddr) {
p2p::Context::connect_normal(self.p2p.clone(), socket_addr);
}
}

View File

@ -6,5 +6,7 @@ pub mod types;
pub use self::traits::Raw;
pub use self::traits::Miner;
pub use self::traits::Network;
pub use self::impls::{RawClient, RawClientCore};
pub use self::impls::{MinerClient, MinerClientCore};
pub use self::impls::{NetworkClient, NetworkClientCore};

View File

@ -1,7 +1,9 @@
mod blockchain;
mod miner;
mod raw;
mod network;
pub use self::blockchain::BlockChain;
pub use self::miner::Miner;
pub use self::raw::Raw;
pub use self::raw::Raw;
pub use self::network::Network;

View File

@ -0,0 +1,14 @@
use jsonrpc_core::Error;
use v1::types::AddNodeOperation;
build_rpc_trait! {
/// Parity-bitcoin network interface
pub trait Network {
/// Add/remove/connecto to the node
/// @curl-example: curl --data-binary '{"jsonrpc": "2.0", "method": "addnode", "params": ["127.0.0.1:8888", "add"], "id":1 }' -H 'content-type: application/json;' http://127.0.0.1:8332/
/// @curl-example: curl --data-binary '{"jsonrpc": "2.0", "method": "addnode", "params": ["127.0.0.1:8888", "remove"], "id":1 }' -H 'content-type: application/json;' http://127.0.0.1:8332/
/// @curl-example: curl --data-binary '{"jsonrpc": "2.0", "method": "addnode", "params": ["127.0.0.1:8888", "onetry"], "id":1 }' -H 'content-type: application/json;' http://127.0.0.1:8332/
#[rpc(name = "addnode")]
fn add_node(&self, String, AddNodeOperation) -> Result<(), Error>;
}
}

View File

@ -9,6 +9,7 @@ mod raw_block;
mod raw_transaction;
mod script;
mod uint;
mod nodes;
pub use self::block_template::{BlockTemplate, BlockTemplateTransaction};
pub use self::block_template_request::{BlockTemplateRequest, BlockTemplateRequestMode};
@ -21,3 +22,4 @@ pub use self::raw_block::RawBlock;
pub use self::raw_transaction::RawTransaction;
pub use self::script::ScriptType;
pub use self::uint::U256;
pub use self::nodes::AddNodeOperation;

31
rpc/src/v1/types/nodes.rs Normal file
View File

@ -0,0 +1,31 @@
use serde::{Deserialize, Deserializer};
#[derive(Debug, PartialEq)]
pub enum AddNodeOperation {
Add,
Remove,
OneTry,
}
impl Deserialize for AddNodeOperation {
fn deserialize<D>(deserializer: &mut D) -> Result<Self, D::Error> where D: Deserializer {
use serde::de::Visitor;
struct DummyVisitor;
impl Visitor for DummyVisitor {
type Value = AddNodeOperation;
fn visit_str<E>(&mut self, value: &str) -> Result<AddNodeOperation, E> where E: ::serde::de::Error {
match value {
"add" => Ok(AddNodeOperation::Add),
"remove" => Ok(AddNodeOperation::Remove),
"onetry" => Ok(AddNodeOperation::OneTry),
_ => Err(E::invalid_value(&format!("unknown ScriptType variant: {}", value))),
}
}
}
deserializer.deserialize(DummyVisitor)
}
}