Merge pull request #250 from blockworks-foundation/fixing_network_issues

Resubscribing grpc block subsciption on failure, get vote accounts ev…
This commit is contained in:
galactus 2023-12-01 11:22:45 +01:00 committed by GitHub
commit 8760021e7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 104 deletions

View File

@ -253,63 +253,67 @@ pub fn create_block_processing_task(
block_sx: Sender<ProducedBlock>,
commitment_level: CommitmentLevel,
) -> AnyhowJoinHandle {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
let commitment_config = match commitment_level {
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
CommitmentLevel::Processed => CommitmentConfig::processed(),
};
tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token, None)?;
let mut stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
)
.await?;
loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"client".to_string(),
SubscribeRequestFilterBlocks {
account_include: Default::default(),
include_transactions: Some(true),
include_accounts: Some(false),
include_entries: Some(false),
},
);
while let Some(message) = stream.next().await {
let message = message?;
let Some(update) = message.update_oneof else {
continue;
let commitment_config = match commitment_level {
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
CommitmentLevel::Processed => CommitmentConfig::processed(),
};
match update {
UpdateOneof::Block(block) => {
let block = process_block(block, commitment_config);
block_sx
.send(block)
.context("Grpc failed to send a block")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
u => {
bail!("Unexpected update: {u:?}");
}
};
// connect to grpc
let mut client =
GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?;
let mut stream = client
.subscribe_once(
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
blocks_subs,
Default::default(),
Some(commitment_level),
Default::default(),
None,
)
.await?;
while let Some(message) = stream.next().await {
let message = message?;
let Some(update) = message.update_oneof else {
continue;
};
match update {
UpdateOneof::Block(block) => {
let block = process_block(block, commitment_config);
block_sx
.send(block)
.context("Grpc failed to send a block")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
_ => {
log::trace!("unknown GRPC notification");
}
};
}
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
bail!("geyser slot stream ended");
})
}
@ -324,68 +328,71 @@ pub fn create_grpc_subscription(
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);
let mut slots = HashMap::new();
slots.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
let slot_task: AnyhowJoinHandle = {
let grpc_x_token = grpc_x_token.clone();
let grpc_addr = grpc_addr.clone();
tokio::spawn(async move {
// connect to grpc
let mut client = GeyserGrpcClient::connect(grpc_addr, grpc_x_token.clone(), None)?;
let version = client.get_version().await?.version;
if version != expected_grpc_version {
log::warn!(
"Expected grpc version {:?}, got {:?}, continue",
expected_grpc_version,
version
loop {
let mut slots = HashMap::new();
slots.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
}
let mut stream = client
.subscribe_once(
slots,
Default::default(),
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Some(CommitmentLevel::Processed),
Default::default(),
None,
)
.await?;
// connect to grpc
let mut client =
GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?;
while let Some(message) = stream.next().await {
let message = message?;
let version = client.get_version().await?.version;
if version != expected_grpc_version {
log::warn!(
"Expected grpc version {:?}, got {:?}, continue",
expected_grpc_version,
version
);
}
let mut stream = client
.subscribe_once(
slots,
Default::default(),
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Some(CommitmentLevel::Processed),
Default::default(),
None,
)
.await?;
let Some(update) = message.update_oneof else {
while let Some(message) = stream.next().await {
let message = message?;
let Some(update) = message.update_oneof else {
continue;
};
match update {
UpdateOneof::Slot(slot) => {
slot_sx
.send(SlotNotification {
estimated_processed_slot: slot.slot,
processed_slot: slot.slot,
})
.context("Error sending slot notification")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
k => {
bail!("Unexpected update: {k:?}");
}
};
match update {
UpdateOneof::Slot(slot) => {
slot_sx
.send(SlotNotification {
estimated_processed_slot: slot.slot,
processed_slot: slot.slot,
})
.context("Error sending slot notification")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
k => {
bail!("Unexpected update: {k:?}");
}
};
}
log::error!("Grpc slot subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
bail!("geyser slot stream ended");
})
};

View File

@ -22,7 +22,7 @@ pub fn poll_vote_accounts_and_cluster_info(
.send(vote_accounts)
.context("Should be able to send vote accounts")?;
}
tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(Duration::from_secs(600)).await;
}
})
}