radiance/pkg/gossip/client.go

104 lines
2.1 KiB
Go

package gossip
import (
"context"
"net"
"net/netip"
"sync/atomic"
)
// Driver implements the network main loop.
//
// Note: This uses Go 1.19 standard library networking, which processes packets one-by-one. (slow!)
// Looks like Go 1.20 will add batch packet receive: https://github.com/golang/go/issues/45886
type Driver struct {
handler *Handler
so *net.UDPConn
}
func NewDriver(handler *Handler, so *net.UDPConn) *Driver {
return &Driver{
handler: handler,
so: so,
}
}
// Run processes packets until the context is cancelled.
//
// Destroys all handlers and closes the socket after returning.
// Returns any network error or nil if the context closed.
func (c *Driver) Run(ctx context.Context) error {
defer c.handler.Close()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var graceful atomic.Bool
// Monitor close signal
go func() {
defer c.so.Close()
defer graceful.Store(true)
<-ctx.Done()
}()
var buf [PacketSize]byte
for {
n, _, _, addr, err := c.so.ReadMsgUDPAddrPort(buf[:], nil)
if n > 0 {
c.handler.HandlePacket(buf[:n], addr)
}
if err != nil {
if graceful.Load() {
return nil
}
return err
}
}
}
// Handler is a network-agnostic multiplexer for incoming gossip messages.
type Handler struct {
*PullClient
*PingClient
*PingServer
numInvalidMsgs uint64
numIgnoredMsgs uint64
}
// HandlePacket is the entrypoint of the RX side.
func (h *Handler) HandlePacket(packet []byte, from netip.AddrPort) {
msg, err := BincodeDeserializeMessage(packet)
if err != nil {
atomic.AddUint64(&h.numInvalidMsgs, 1)
return
}
switch x := msg.(type) {
case *Message__PullResponse:
if h.PullClient != nil {
h.PullClient.HandlePullResponse(x, from)
return
}
case *Message__Ping:
if h.PingServer != nil {
h.PingServer.HandlePing(x, from)
return
}
case *Message__Pong:
if h.PingClient != nil {
h.PingClient.HandlePong(x, from)
return
}
}
atomic.AddUint64(&h.numIgnoredMsgs, 1)
}
// Close destroys all handlers.
func (h *Handler) Close() {
h.PingClient.Close()
}
type udpSender interface {
WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error)
}