Don't close streams (#85)

* Fix resubscriptions in golang

* Remove disconnection of stream to allow updates

* Add resub parameter

* Added a resub parameter
This commit is contained in:
Linus Kendall 2023-03-13 17:21:28 +05:30 committed by GitHub
parent 1783283ae4
commit 2c5d620ef5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 8 deletions

View File

@ -28,6 +28,7 @@ var (
blocks = flag.Bool("blocks", false, "Subscribe to block update")
block_meta = flag.Bool("blocks-meta", false, "Subscribe to block metadata update")
signature = flag.String("signature", "", "Subscribe to a specific transaction signature")
resub = flag.Uint("resub", 0, "Resubscribe to only slots after x updates, 0 disables this")
accounts = flag.Bool("accounts", false, "Subscribe to accounts")
@ -232,9 +233,18 @@ func grpc_subscribe(conn *grpc.ClientConn) {
if err != nil {
log.Fatalf("%v", err)
}
stream.CloseSend()
var i uint = 0
for {
i += 1
// Example of how to resubscribe/update request
if i == *resub {
subscription = pb.SubscribeRequest{}
subscription.Slots = make(map[string]*pb.SubscribeRequestFilterSlots)
subscription.Slots["slots"] = &pb.SubscribeRequestFilterSlots{}
stream.Send(&subscription)
}
resp, err := stream.Recv()
timestamp := time.Now().UnixNano()

View File

@ -76,6 +76,10 @@ struct Args {
/// Subscribe on block meta updates (without transactions)
#[clap(long)]
blocks_meta: bool,
// Resubscribe (only to slots) after
#[clap(long)]
resub: Option<u16>,
}
const XTOKEN_LENGTH: usize = 28;
@ -169,6 +173,7 @@ impl RetryChannel {
transactions: &TransactionsFilterMap,
blocks: &BlocksFilterMap,
blocks_meta: &BlocksMetaFilterMap,
resub: &u16,
) -> anyhow::Result<()> {
// The default exponential backoff strategy intervals:
// [500ms, 750ms, 1.125s, 1.6875s, 2.53125s, 3.796875s, 5.6953125s,
@ -177,7 +182,7 @@ impl RetryChannel {
info!("Retry to connect to the server");
let mut client = self.client();
client
.subscribe(slots, accounts, transactions, blocks, blocks_meta)
.subscribe(slots, accounts, transactions, blocks, blocks_meta, resub)
.await?;
Ok(())
})
@ -205,6 +210,7 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> RetryClient<F> {
transactions: &TransactionsFilterMap,
blocks: &BlocksFilterMap,
blocks_meta: &BlocksMetaFilterMap,
resub: &u16,
) -> anyhow::Result<()> {
let request = SubscribeRequest {
slots: slots.clone(),
@ -230,17 +236,20 @@ impl<F: FnMut(Request<()>) -> InterceptedRequestResult> RetryClient<F> {
Err(error) => error!("error: {:?}", error),
}
// Example to illustrate how to resubscribe/update the subscription
counter += 1;
if counter == 10 {
#[allow(unused_variables)]
if counter == *resub {
let mut new_slots: SlotsFilterMap = HashMap::new();
new_slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
let request = SubscribeRequest {
slots: slots.clone(),
slots: new_slots.clone(),
accounts: HashMap::default(),
transactions: HashMap::default(),
blocks: HashMap::default(),
blocks_meta: HashMap::default(),
};
// subscribe_tx.unbounded_send(request)?;
subscribe_tx.unbounded_send(request)?;
}
}
info!("stream closed");
@ -295,6 +304,11 @@ async fn main() -> Result<(), Error> {
blocks_meta.insert("client".to_owned(), SubscribeRequestFilterBlocksMeta {});
}
let mut resub: u16 = 0;
if let Some(resub_arg) = args.resub {
resub = resub_arg;
}
// Client with retry policy
let channel = match RetryChannel::new(args.endpoint, args.x_token) {
Ok(channel) => channel,
@ -305,7 +319,14 @@ async fn main() -> Result<(), Error> {
};
channel
.subscribe_retry(&slots, &accounts, &transactions, &blocks, &blocks_meta)
.subscribe_retry(
&slots,
&accounts,
&transactions,
&blocks,
&blocks_meta,
&resub,
)
.await
.map_err(|error| {
error!("Error: {}", error);

View File

@ -154,7 +154,6 @@ async function main() {
// Send subscribe request
await new Promise<void>((resolve, reject) => {
stream.write(request, (err) => {
stream.end();
if (err === null || err === undefined) {
resolve();
} else {