Stop ignoring errors when the new state is AwaitingRequest
The previous code would send a Nil message on the Sender, even if the result was actually an error.
This commit is contained in:
parent
da5084a10a
commit
86136c7b5c
|
@ -543,26 +543,32 @@ where
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// These matches return a Result with a new state or an (error, Option<oneshot::Sender>)
|
// These matches return a Result with (new_state, Option<Sender>) or an (error, Sender)
|
||||||
let new_state_result = match (&self.state, request) {
|
let new_state_result = match (&self.state, request) {
|
||||||
(Failed, _) => panic!("failed connection cannot handle requests"),
|
(Failed, _) => panic!("failed connection cannot handle requests"),
|
||||||
(AwaitingResponse { .. }, _) => panic!("tried to update pending request"),
|
(AwaitingResponse { .. }, _) => panic!("tried to update pending request"),
|
||||||
|
|
||||||
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
|
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
|
||||||
Ok(()) => Ok(AwaitingResponse {
|
Ok(()) => Ok((
|
||||||
handler: Handler::Peers,
|
AwaitingResponse {
|
||||||
tx,
|
handler: Handler::Peers,
|
||||||
span,
|
tx,
|
||||||
}),
|
span,
|
||||||
Err(e) => Err((e, Some(tx))),
|
},
|
||||||
|
None,
|
||||||
|
)),
|
||||||
|
Err(e) => Err((e, tx)),
|
||||||
},
|
},
|
||||||
(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
|
(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
|
||||||
Ok(()) => Ok(AwaitingResponse {
|
Ok(()) => Ok((
|
||||||
handler: Handler::Ping(nonce),
|
AwaitingResponse {
|
||||||
tx,
|
handler: Handler::Ping(nonce),
|
||||||
span,
|
tx,
|
||||||
}),
|
span,
|
||||||
Err(e) => Err((e, Some(tx))),
|
},
|
||||||
|
None,
|
||||||
|
)),
|
||||||
|
Err(e) => Err((e, tx)),
|
||||||
},
|
},
|
||||||
(AwaitingRequest, BlocksByHash(hashes)) => {
|
(AwaitingRequest, BlocksByHash(hashes)) => {
|
||||||
match self
|
match self
|
||||||
|
@ -572,15 +578,18 @@ where
|
||||||
))
|
))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => Ok(AwaitingResponse {
|
Ok(()) => Ok((
|
||||||
handler: Handler::BlocksByHash {
|
AwaitingResponse {
|
||||||
blocks: Vec::with_capacity(hashes.len()),
|
handler: Handler::BlocksByHash {
|
||||||
hashes,
|
blocks: Vec::with_capacity(hashes.len()),
|
||||||
|
hashes,
|
||||||
|
},
|
||||||
|
tx,
|
||||||
|
span,
|
||||||
},
|
},
|
||||||
tx,
|
None,
|
||||||
span,
|
)),
|
||||||
}),
|
Err(e) => Err((e, tx)),
|
||||||
Err(e) => Err((e, Some(tx))),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(AwaitingRequest, TransactionsByHash(hashes)) => {
|
(AwaitingRequest, TransactionsByHash(hashes)) => {
|
||||||
|
@ -591,15 +600,18 @@ where
|
||||||
))
|
))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => Ok(AwaitingResponse {
|
Ok(()) => Ok((
|
||||||
handler: Handler::TransactionsByHash {
|
AwaitingResponse {
|
||||||
transactions: Vec::with_capacity(hashes.len()),
|
handler: Handler::TransactionsByHash {
|
||||||
hashes,
|
transactions: Vec::with_capacity(hashes.len()),
|
||||||
|
hashes,
|
||||||
|
},
|
||||||
|
tx,
|
||||||
|
span,
|
||||||
},
|
},
|
||||||
tx,
|
None,
|
||||||
span,
|
)),
|
||||||
}),
|
Err(e) => Err((e, tx)),
|
||||||
Err(e) => Err((e, Some(tx))),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(AwaitingRequest, FindBlocks { known_blocks, stop }) => {
|
(AwaitingRequest, FindBlocks { known_blocks, stop }) => {
|
||||||
|
@ -608,12 +620,15 @@ where
|
||||||
.send(Message::GetBlocks { known_blocks, stop })
|
.send(Message::GetBlocks { known_blocks, stop })
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => Ok(AwaitingResponse {
|
Ok(()) => Ok((
|
||||||
handler: Handler::FindBlocks,
|
AwaitingResponse {
|
||||||
tx,
|
handler: Handler::FindBlocks,
|
||||||
span,
|
tx,
|
||||||
}),
|
span,
|
||||||
Err(e) => Err((e, Some(tx))),
|
},
|
||||||
|
None,
|
||||||
|
)),
|
||||||
|
Err(e) => Err((e, tx)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(AwaitingRequest, FindHeaders { known_blocks, stop }) => {
|
(AwaitingRequest, FindHeaders { known_blocks, stop }) => {
|
||||||
|
@ -622,64 +637,83 @@ where
|
||||||
.send(Message::GetHeaders { known_blocks, stop })
|
.send(Message::GetHeaders { known_blocks, stop })
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => Ok(AwaitingResponse {
|
Ok(()) => Ok((
|
||||||
handler: Handler::FindHeaders,
|
AwaitingResponse {
|
||||||
tx,
|
handler: Handler::FindHeaders,
|
||||||
span,
|
tx,
|
||||||
}),
|
span,
|
||||||
Err(e) => Err((e, Some(tx))),
|
},
|
||||||
|
None,
|
||||||
|
)),
|
||||||
|
Err(e) => Err((e, tx)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(AwaitingRequest, MempoolTransactions) => {
|
(AwaitingRequest, MempoolTransactions) => {
|
||||||
match self.peer_tx.send(Message::Mempool).await {
|
match self.peer_tx.send(Message::Mempool).await {
|
||||||
Ok(()) => Ok(AwaitingResponse {
|
Ok(()) => Ok((
|
||||||
handler: Handler::MempoolTransactions,
|
AwaitingResponse {
|
||||||
tx,
|
handler: Handler::MempoolTransactions,
|
||||||
span,
|
tx,
|
||||||
}),
|
span,
|
||||||
Err(e) => Err((e, Some(tx))),
|
},
|
||||||
|
None,
|
||||||
|
)),
|
||||||
|
Err(e) => Err((e, tx)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(AwaitingRequest, PushTransaction(transaction)) => {
|
(AwaitingRequest, PushTransaction(transaction)) => {
|
||||||
// Since we're not waiting for further messages, we need to
|
|
||||||
// send a response before dropping tx.
|
|
||||||
let _ = tx.send(Ok(Response::Nil));
|
|
||||||
match self.peer_tx.send(Message::Tx(transaction)).await {
|
match self.peer_tx.send(Message::Tx(transaction)).await {
|
||||||
Ok(()) => Ok(AwaitingRequest),
|
Ok(()) => Ok((AwaitingRequest, Some(tx))),
|
||||||
Err(e) => Err((e, None)),
|
Err(e) => Err((e, tx)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(AwaitingRequest, AdvertiseTransactions(hashes)) => {
|
(AwaitingRequest, AdvertiseTransactions(hashes)) => {
|
||||||
let _ = tx.send(Ok(Response::Nil));
|
|
||||||
match self
|
match self
|
||||||
.peer_tx
|
.peer_tx
|
||||||
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
|
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => Ok(AwaitingRequest),
|
Ok(()) => Ok((AwaitingRequest, Some(tx))),
|
||||||
Err(e) => Err((e, None)),
|
Err(e) => Err((e, tx)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(AwaitingRequest, AdvertiseBlock(hash)) => {
|
(AwaitingRequest, AdvertiseBlock(hash)) => {
|
||||||
let _ = tx.send(Ok(Response::Nil));
|
|
||||||
match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
|
match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
|
||||||
Ok(()) => Ok(AwaitingRequest),
|
Ok(()) => Ok((AwaitingRequest, Some(tx))),
|
||||||
Err(e) => Err((e, None)),
|
Err(e) => Err((e, tx)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Updates state or fails. Sends the error on the Sender if it is Some.
|
// Updates state or fails. Sends the error on the Sender if it is Some.
|
||||||
match new_state_result {
|
match new_state_result {
|
||||||
Ok(new_state) => {
|
Ok((AwaitingRequest, Some(tx))) => {
|
||||||
|
// Since we're not waiting for further messages, we need to
|
||||||
|
// send a response before dropping tx.
|
||||||
|
let _ = tx.send(Ok(Response::Nil));
|
||||||
|
self.state = AwaitingRequest;
|
||||||
|
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
|
||||||
|
}
|
||||||
|
Ok((new_state @ AwaitingResponse { .. }, None)) => {
|
||||||
self.state = new_state;
|
self.state = new_state;
|
||||||
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
|
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
|
||||||
}
|
}
|
||||||
Err((e, Some(tx))) => {
|
Err((e, tx)) => {
|
||||||
let e = SharedPeerError::from(e);
|
let e = SharedPeerError::from(e);
|
||||||
let _ = tx.send(Err(e.clone()));
|
let _ = tx.send(Err(e.clone()));
|
||||||
self.fail_with(e);
|
self.fail_with(e);
|
||||||
}
|
}
|
||||||
Err((e, None)) => self.fail_with(e),
|
// unreachable states
|
||||||
|
Ok((Failed, tx)) => unreachable!(
|
||||||
|
"failed client requests must use fail_with(error) to reach a Failed state. tx: {:?}",
|
||||||
|
tx
|
||||||
|
),
|
||||||
|
Ok((AwaitingRequest, None)) => unreachable!(
|
||||||
|
"successful AwaitingRequest states must send a response on tx, but tx is None",
|
||||||
|
),
|
||||||
|
Ok((AwaitingResponse { .. }, Some(tx))) => unreachable!(
|
||||||
|
"successful AwaitingResponse states must keep tx, but tx is Some: {:?}",
|
||||||
|
tx
|
||||||
|
),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue