wormhole-explorer/fly/processor/vaa_gossip_consumer_splitte...

124 lines
2.7 KiB
Go

package processor
import (
"context"
"sync"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// VAAGossipConsumerSplitterOption represents a consumer splitter option function.
type VAAGossipConsumerSplitterOption func(*VAAGossipConsumerSplitter)
// VAAGossipConsumerSplitter represents a vaa message splitter.
type VAAGossipConsumerSplitter struct {
push VAAPushFunc
pythCh chan *sppliterMessage
nonPythCh chan *sppliterMessage
logger *zap.Logger
workerSize int
wgBlock sync.WaitGroup
size int
}
type sppliterMessage struct {
value *vaa.VAA
data []byte
}
// NewVAAGossipSplitterConsumer creates a splitter instance.
func NewVAAGossipSplitterConsumer(
publish VAAPushFunc,
workerSize int,
logger *zap.Logger,
opts ...VAAGossipConsumerSplitterOption) *VAAGossipConsumerSplitter {
v := &VAAGossipConsumerSplitter{
push: publish,
logger: logger,
workerSize: workerSize,
size: 50,
}
for _, opt := range opts {
opt(v)
}
v.pythCh = make(chan *sppliterMessage, v.size)
v.nonPythCh = make(chan *sppliterMessage, v.size)
return v
}
// WithSize allows to specify channel size when setting a value.
func WithSize(v int) VAAGossipConsumerSplitterOption {
return func(i *VAAGossipConsumerSplitter) {
i.size = v
}
}
// Push splits vaa message on different channels depending on whether it is a pyth or non pyth.
func (p *VAAGossipConsumerSplitter) Push(ctx context.Context, v *vaa.VAA, serializedVaa []byte) error {
msg := &sppliterMessage{
value: v,
data: serializedVaa,
}
if vaa.ChainIDPythNet == v.EmitterChain {
//if the pyth channel is full, deletes the oldest message and sends the new message
select {
case p.pythCh <- msg:
default:
select {
case <-p.pythCh:
default:
}
p.pythCh <- msg
}
} else {
p.nonPythCh <- msg
}
return nil
}
// Start runs two go routine to process messages for both channels.
func (p *VAAGossipConsumerSplitter) Start(ctx context.Context) {
for i := 0; i < p.workerSize; i++ {
p.wgBlock.Add(1)
go p.executePyth(ctx)
}
go p.executeNonPyth(ctx)
}
// Close closes all consumer resources.
func (p *VAAGossipConsumerSplitter) Close() {
close(p.nonPythCh)
close(p.pythCh)
p.wgBlock.Wait()
}
func (p *VAAGossipConsumerSplitter) executePyth(ctx context.Context) {
defer p.wgBlock.Done()
for {
select {
case <-ctx.Done():
return
case m, opened := <-p.pythCh:
if !opened {
return
}
_ = p.push(ctx, m.value, m.data)
}
}
}
func (p *VAAGossipConsumerSplitter) executeNonPyth(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case m, opened := <-p.nonPythCh:
if !opened {
return
}
_ = p.push(ctx, m.value, m.data)
}
}
}