Use tracing::instrument and monitor for messages.

This commit is contained in:
Henry de Valence 2019-09-23 18:11:05 -07:00 committed by Deirdre Connolly
parent 15ca12a2f5
commit c8a3d47b56
7 changed files with 78 additions and 8 deletions

View File

@ -11,3 +11,6 @@ members = [
"zebrad",
]
[patch.crates-io]
tracing-attributes = { git = "https://github.com/tokio-rs/tracing" }
tracing = { git = "https://github.com/tokio-rs/tracing" }

View File

@ -11,6 +11,8 @@ rand = "0.7"
byteorder = "1.3"
chrono = "0.4"
failure = "0.1"
tokio = "=0.2.0-alpha.4"
tokio = "=0.2.0-alpha.5"
tracing = { git = "https://github.com/tokio-rs/tracing" }
tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false }
zebra-chain = { path = "../zebra-chain" }
zebra-chain = { path = "../zebra-chain" }

View File

@ -4,6 +4,8 @@
#[macro_use]
extern crate failure;
#[macro_use]
extern crate tracing;
pub mod message;
pub mod types;

View File

@ -284,6 +284,7 @@ pub enum RejectReason {
impl Message {
/// Send `self` to the given async writer (e.g., a network stream).
#[instrument(level = "debug", skip(writer))]
pub async fn send<W: Unpin + AsyncWrite>(
&self,
mut writer: W,
@ -329,6 +330,7 @@ impl Message {
// extension trait, which is only defined for sync Writers.
// The header is 4+12+4+4=24 bytes long.
trace!(?command, body_len = body.len());
let mut header = [0u8; 24];
let mut header_writer = Cursor::new(&mut header[..]);
header_writer.write_all(&magic.0)?;
@ -343,6 +345,7 @@ impl Message {
}
/// Receive a message from the given async reader (e.g., a network stream).
#[instrument(level = "debug", skip(reader))]
pub async fn recv<R: Unpin + AsyncRead>(
mut reader: R,
magic: Magic,
@ -366,6 +369,7 @@ impl Message {
let command = header_reader.read_12_bytes()?;
let body_len = header_reader.read_u32::<LittleEndian>()? as usize;
let checksum = Sha256dChecksum(header_reader.read_4_bytes()?);
trace!(?message_magic, ?command, body_len, ?checksum);
ensure!(
magic == message_magic,
@ -418,6 +422,7 @@ impl Message {
/// contain a checksum of the message body.
fn write_body<W: io::Write>(&self, mut writer: W, _m: Magic, _v: Version) -> Result<(), Error> {
use Message::*;
trace!(?self);
match *self {
Version {
ref version,
@ -499,67 +504,99 @@ fn try_read_pong<R: io::Read>(mut reader: R, _version: Version) -> Result<Messag
Ok(Message::Pong(Nonce(reader.read_u64::<LittleEndian>()?)))
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_reject<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("reject");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_addr<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("addr");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_getaddr<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("getaddr");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_block<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("block");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_getblocks<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("getblocks");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_headers<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("headers");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_getheaders<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("getheaders");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_inv<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("inv");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_getdata<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("getdata");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_notfound<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("notfound");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_tx<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("tx");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_mempool<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("mempool");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_filterload<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("filterload");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_filteradd<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("filteradd");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_filterclear<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("filterclear");
bail!("unimplemented message type")
}
#[instrument(level = "trace", skip(_reader, _version))]
fn try_read_merkleblock<R: io::Read>(mut _reader: R, _version: Version) -> Result<Message, Error> {
trace!("merkleblock");
bail!("unimplemented message type")
}

View File

@ -12,10 +12,13 @@ gumdrop = "0.6"
lazy_static = "1"
serde = { version = "1", features = ["serde_derive"] }
toml = "0.5"
tokio = "=0.2.0-alpha.4"
tracing = "0.1"
tracing-subscriber = "0.1"
tracing-log = "=0.0.1-alpha.2"
tokio = "=0.2.0-alpha.5"
# Replace with git to pick up instrument derive changes, revert on release.
#tracing = "0.1"
tracing = { git = "https://github.com/tokio-rs/tracing" }
tracing-futures = { git = "https://github.com/tokio-rs/tracing", features = ["tokio-alpha"], default-features = false }
tracing-subscriber = { git = "https://github.com/tokio-rs/tracing" }
tracing-log = { git = "https://github.com/tokio-rs/tracing" }
# Can't use published alpha because of conflicts tracking pin-project alphas
#hyper = "=0.13.0-alpha.1"
hyper = { git = "https://github.com/hyperium/hyper" }
@ -28,4 +31,3 @@ zebra-network = { path = "../zebra-network" }
[dev-dependencies.abscissa_core]
version = "0.3.0"
features = ["testing"]

View File

@ -101,6 +101,30 @@ impl ConnectCmd {
.await?;
info!(resp_verack = ?resp_verack);
loop {
match Message::recv(
&mut stream,
constants::magics::MAINNET,
constants::CURRENT_VERSION,
)
.await
{
Ok(msg) => match msg {
Message::Ping(nonce) => {
let pong = Message::Pong(nonce);
pong.send(
&mut stream,
constants::magics::MAINNET,
constants::CURRENT_VERSION,
)
.await?;
}
_ => warn!("Unknown message"),
},
Err(e) => error!("{}", e),
};
}
stream.shutdown(Shutdown::Both)?;
Ok(())

View File

@ -9,7 +9,7 @@ use hyper::{Body, Request, Response, Server};
use tracing::Subscriber;
use tracing_log::LogTracer;
use tracing_subscriber::{EnvFilter, reload::Handle, FmtSubscriber};
use tracing_subscriber::{reload::Handle, EnvFilter, FmtSubscriber};
/// Abscissa component which runs a tracing filter endpoint.
#[derive(Component)]