node/watchers/polygon: implement unsubscribe (TOB-WORMGUWA-7)

This commit is contained in:
tbjump 2023-05-04 00:11:40 +00:00 committed by tbjump
parent 100a01b4b1
commit 5d137b6d88
2 changed files with 9 additions and 3 deletions

View File

@ -37,9 +37,9 @@ type Connector interface {
type PollSubscription struct { type PollSubscription struct {
errOnce sync.Once errOnce sync.Once
err chan error err chan error // subscription consumer reads, subscription fulfiller writes. used to propagate errors.
quit chan error quit chan error // subscription consumer writes, subscription fulfiller reads. used to signal that consumer wants to cancel the subscription.
unsubDone chan struct{} unsubDone chan struct{} // subscription consumer reads, subscription fulfiller writes. used to signal that the subscription was successfully canceled
} }
func NewPollSubscription() *PollSubscription { func NewPollSubscription() *PollSubscription {

View File

@ -119,6 +119,8 @@ func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, errC chan err
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
messageSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil return nil
case err := <-messageSub.Err(): case err := <-messageSub.Err():
sub.err <- err sub.err <- err
@ -126,6 +128,10 @@ func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, errC chan err
if err := c.processCheckpoint(ctx, sink, checkpoint); err != nil { if err := c.processCheckpoint(ctx, sink, checkpoint); err != nil {
sub.err <- fmt.Errorf("failed to process checkpoint: %w", err) sub.err <- fmt.Errorf("failed to process checkpoint: %w", err)
} }
case <-sub.quit:
messageSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil
} }
} }
}) })