gossip: refactor into network-agnostic state machine

- Adds network-agnostic gossip message RX multiplexer
- Adds PingServer and PingClient with blocking-style API
- simplify cmd/gossip/ping
This commit is contained in:
Richard Patel 2022-09-08 13:37:45 +02:00
parent 5a9f2ba9ab
commit 27a52d3407
4 changed files with 402 additions and 116 deletions

View File

@ -12,7 +12,6 @@ import (
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
"github.com/certusone/radiance/pkg/gossip"
@ -27,6 +26,8 @@ var (
flagAddr = flag.String("addr", "", "Address to ping (<host>:<port>)")
)
var target netip.AddrPort
func init() {
klog.InitFlags(nil)
flag.Parse()
@ -37,6 +38,12 @@ func main() {
klog.Exit("No address to ping specified")
}
udpAddr, err := net.ResolveUDPAddr("udp", *flagAddr)
if err != nil {
klog.Exitf("invalid target address: %s", err)
}
target = udpAddr.AddrPort()
ctx := context.Background()
_, privkey, err := ed25519.GenerateKey(rand.Reader)
@ -44,152 +51,72 @@ func main() {
panic(err)
}
conn, err := net.Dial("udp", *flagAddr)
conn, err := net.ListenUDP("udp", nil)
if err != nil {
klog.Exit(err)
}
udpConn := conn.(*net.UDPConn)
klog.Infof("GOSSIP PING %s (%s)", *flagAddr, conn.RemoteAddr())
s := Session{
privkey: privkey,
udpConn: udpConn,
reqs: make(map[[32]byte]pending),
pingClient := gossip.NewPingClient(privkey, conn)
handler := &gossip.Handler{
PingClient: pingClient,
}
client := gossip.NewClient(handler, conn)
klog.Infof("GOSSIP PING %s (%s)", *flagAddr, target.String())
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt)
defer cancel()
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return s.send(ctx)
return client.Run(ctx)
})
group.Go(func() error {
return s.receive(ctx)
pingLoop(ctx, pingClient)
return context.Canceled // done
})
_ = group.Wait()
_ = conn.Close()
klog.Infof("--- %s gossip ping statistics ---", udpConn.RemoteAddr())
klog.Infof("--- %s gossip ping statistics ---", target.String())
numSuccess := atomic.LoadUint64(&s.numSuccess)
numTimeout := atomic.LoadUint64(&s.numTimeout)
numSuccess := pingClient.NumOK.Load()
numTimeout := pingClient.NumTimeout.Load()
klog.Infof("%d packets transmitted, %d packets received, %.1f%% packet loss",
numSuccess+numTimeout, numSuccess, (1-(float64(numSuccess)/float64(numSuccess+numTimeout)))*100)
}
type pending struct {
c int
t time.Time
ping gossip.Ping
cancel context.CancelFunc
}
func pingLoop(ctx context.Context, pinger *gossip.PingClient) {
var wg sync.WaitGroup
defer wg.Wait()
type Session struct {
privkey ed25519.PrivateKey
udpConn *net.UDPConn
lock sync.Mutex
reqs map[[32]byte]pending
numSuccess uint64
numTimeout uint64
numSendFail uint64
}
func (s *Session) send(ctx context.Context) error {
defer s.udpConn.Close()
ticker := time.NewTicker(*flagDelay)
for c := 0; c < *flagCount || *flagCount == -1; c++ {
s.sendPing(ctx, c)
count := *flagCount
for seq := 0; seq < count || count == -1; seq++ {
select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
return
case <-ticker.C:
wg.Add(1)
go sendPing(ctx, &wg, pinger, seq)
}
}
return errors.New("done")
}
func (s *Session) sendPing(ctx context.Context, c int) {
t := time.Now()
func sendPing(ctx context.Context, wg *sync.WaitGroup, pinger *gossip.PingClient, seq int) {
defer wg.Done()
var token [32]byte
if _, err := rand.Read(token[:]); err != nil {
panic(err)
}
ctx, cancel := context.WithTimeout(ctx, *flagTimeout)
defer cancel()
ping := gossip.NewPing(token, s.privkey)
pingMsg := gossip.Message__Ping{Value: ping}
frame, err := pingMsg.BincodeSerialize()
if err != nil {
panic(err)
}
_, _, err = s.udpConn.WriteMsgUDP(frame, nil, nil)
if err != nil {
klog.Warningf("Send ping: %s", err)
atomic.AddUint64(&s.numSendFail, 1)
start := time.Now()
_, responder, err := pinger.Ping(ctx, target)
if err == nil {
klog.Infof("Pong from %s seq=%d time=%v", responder, seq, time.Since(start))
} else if errors.Is(err, context.Canceled) {
return
} else if errors.Is(err, context.DeadlineExceeded) {
klog.Infof("Request timeout for seq %d", seq)
} else {
klog.Warning(err)
}
pongToken := gossip.HashPingToken(ping.Token)
pingCtx, pingCancel := context.WithTimeout(ctx, *flagTimeout)
go func() {
<-pingCtx.Done()
if err := pingCtx.Err(); err == context.DeadlineExceeded {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.reqs, pongToken)
klog.V(3).Infof("Request timeout for seq %d", c)
atomic.AddUint64(&s.numTimeout, 1)
}
}()
s.lock.Lock()
s.reqs[pongToken] = pending{
c: c,
t: t,
ping: ping,
cancel: pingCancel,
}
s.lock.Unlock()
}
func (s *Session) receive(ctx context.Context) error {
for ctx.Err() == nil {
var packet [4 + gossip.PingSize]byte
n, remote, err := s.udpConn.ReadFromUDPAddrPort(packet[:])
klog.V(7).Infof("Packet from %s", remote)
if n >= len(packet) {
s.handlePong(packet[:], remote)
}
if err != nil {
return err
}
}
return nil
}
func (s *Session) handlePong(packet []byte, remote netip.AddrPort) {
msg, err := gossip.BincodeDeserializeMessage(packet)
if err != nil {
return
}
pongMsg, ok := msg.(*gossip.Message__Pong)
if !ok {
return
}
s.lock.Lock()
defer s.lock.Unlock()
req, ok := s.reqs[pongMsg.Value.Token]
if !ok {
return
}
delete(s.reqs, pongMsg.Value.Token)
req.cancel()
klog.V(3).Infof("Pong from %s seq=%d time=%v", remote, req.c, time.Since(req.t))
atomic.AddUint64(&s.numSuccess, 1)
}

94
pkg/gossip/client.go Normal file
View File

@ -0,0 +1,94 @@
package gossip
import (
"context"
"net"
"net/netip"
"sync/atomic"
)
// Client implements the network main loop.
type Client struct {
handler *Handler
so *net.UDPConn
}
func NewClient(handler *Handler, so *net.UDPConn) *Client {
return &Client{
handler: handler,
so: so,
}
}
// Run processes packets until the context is cancelled.
//
// Destroys all handlers and closes the socket after returning.
// Returns any network error or nil if the context closed.
func (c *Client) Run(ctx context.Context) error {
defer c.handler.Close()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var graceful atomic.Bool
// Monitor close signal
go func() {
defer c.so.Close()
defer graceful.Store(true)
<-ctx.Done()
}()
var buf [1280]byte
for {
n, _, _, addr, err := c.so.ReadMsgUDPAddrPort(buf[:], nil)
if n > 0 {
c.handler.HandlePacket(buf[:n], addr)
}
if err != nil {
if graceful.Load() {
return nil
}
return err
}
}
}
// Handler is a network-agnostic multiplexer for incoming gossip messages.
type Handler struct {
*PingClient
*PingServer
numInvalidMsgs uint64
numIgnoredMsgs uint64
}
// HandlePacket is the entrypoint of the RX side.
func (h *Handler) HandlePacket(packet []byte, from netip.AddrPort) {
msg, err := BincodeDeserializeMessage(packet)
if err != nil {
atomic.AddUint64(&h.numInvalidMsgs, 1)
return
}
switch x := msg.(type) {
case *Message__Ping:
if h.PingServer != nil {
h.PingServer.HandlePing(x, from)
return
}
case *Message__Pong:
if h.PingClient != nil {
h.PingClient.HandlePong(x, from)
return
}
}
atomic.AddUint64(&h.numIgnoredMsgs, 1)
}
// Close destroys all handlers.
func (h *Handler) Close() {
h.PingClient.Close()
}
type udpSender interface {
WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error)
}

View File

@ -1,8 +1,15 @@
package gossip
import (
"context"
"crypto/ed25519"
"crypto/sha256"
"math/rand"
"net/netip"
"sync"
"sync/atomic"
"k8s.io/klog/v2"
)
// PingSize is the size of a serialized ping message.
@ -19,6 +26,12 @@ func NewPing(token [32]byte, key ed25519.PrivateKey) (p Ping) {
return p
}
func NewPingRandom(key ed25519.PrivateKey) Ping {
var token [32]byte
rand.Read(token[:])
return NewPing(token, key)
}
// Verify checks the Ping's signature.
func (p *Ping) Verify() bool {
return ed25519.Verify(p.From[:], p.Token[:], p.Signature[:])
@ -28,3 +41,202 @@ func (p *Ping) Verify() bool {
func HashPingToken(token [32]byte) [32]byte {
return sha256.Sum256(token[:])
}
// PingClient implements the stateful client (initiator) side of the gossip ping protocol.
//
// It tracks every pending request to match it with solicited pong frames.
type PingClient struct {
identity ed25519.PrivateKey
so udpSender
lock sync.RWMutex
reqs map[Hash]*pingSession // pong token => session
NumSent atomic.Uint64 // ping messages sent
NumOK atomic.Uint64 // successful ping transaction
NumInvalid atomic.Uint64 // invalid sig in pong
NumTimeout atomic.Uint64 // context errored before pong arrived
NumSendFail atomic.Uint64 // socket refused to send (tx buffer full)
NumMartian atomic.Uint64 // unsolicited pong
}
func NewPingClient(identity ed25519.PrivateKey, so udpSender) *PingClient {
return &PingClient{
identity: identity,
so: so,
reqs: make(map[Hash]*pingSession),
}
}
type pingSession struct {
out chan<- pongResponse
done atomic.Bool
}
type pongResponse struct {
from netip.AddrPort
pong Ping
}
// Ping sends a gossip ping packet.
// Blocks until a valid matching pong packet arrives or the context is cancelled.
//
// Note that this mechanism is unrelated to ICMP pings.
func (p *PingClient) Ping(ctx context.Context, target netip.AddrPort) (pong Ping, responder netip.AddrPort, err error) {
// Allocate session for lifetime of scope
ping, pongToken, resp := p.createSession()
defer p.destroySession(pongToken)
// Send ping to "server"
pingMsg := &Message__Ping{ping}
packet, err := pingMsg.BincodeSerialize()
if err != nil {
klog.Errorf("Failed to serialize ping: %s", err)
return
}
if _, err = p.so.WriteToUDPAddrPort(packet, target); err != nil {
p.NumSendFail.Add(1)
return
}
p.NumSent.Add(1)
// Block until something happens
select {
case <-ctx.Done():
err = ctx.Err()
p.NumTimeout.Add(1)
return
case resp, ok := <-resp:
if !ok {
// sanity check: cancellation can only happen before or after select
panic("race condition")
}
pong = resp.pong
responder = resp.from
p.NumOK.Add(1)
return
}
}
// HandlePong processes incoming gossip pong messages.
func (p *PingClient) HandlePong(msg *Message__Pong, from netip.AddrPort) {
pong := msg.Value
// map lookup is cheaper than Ed25519 verify, so do that first
sess := p.getSession(pong.Token)
if sess == nil {
p.NumMartian.Add(1)
return
}
// We might receive two valid pongs before the initiating goroutine cleans up.
// Bail here because we are only allowed to send one pong back to the channel.
if sess.done.Swap(true) {
p.NumMartian.Add(1)
return
}
if !pong.Verify() {
p.NumInvalid.Add(1)
return
}
// Upgrade to write lock to prevent initiating goroutine
// from closing the channel we're about to send on.
// TODO: this is probably very slow
p.lock.Lock()
defer p.lock.Unlock()
sess = p.reqs[pong.Token]
if sess == nil {
// session was cancelled while we were verifying the pong
p.NumMartian.Add(1)
return
}
sess.out <- pongResponse{
from: from,
pong: pong,
}
}
func (p *PingClient) createSession() (ping Ping, pongToken Hash, resp <-chan pongResponse) {
ping = NewPingRandom(p.identity)
pongToken = HashPingToken(ping.Token)
respBi := make(chan pongResponse, 1)
resp = respBi // recv-only
p.lock.Lock()
defer p.lock.Unlock()
p.reqs[pongToken] = &pingSession{
out: respBi, // send-only
}
return
}
func (p *PingClient) getSession(pongToken Hash) *pingSession {
p.lock.RLock()
defer p.lock.RUnlock()
return p.reqs[pongToken]
}
func (p *PingClient) destroySession(pongToken Hash) {
p.lock.Lock()
defer p.lock.Unlock()
session, ok := p.reqs[pongToken]
if !ok {
return
}
close(session.out)
delete(p.reqs, pongToken)
}
func (p *PingClient) Close() {
p.lock.Lock()
defer p.lock.Unlock()
}
// PingServer implements the stateless server (reactor) side of the gossip ping protocol.
//
// It implements no rate-limits and is thus vulnerable to packet floods.
type PingServer struct {
identity ed25519.PrivateKey
so udpSender
NumOK atomic.Uint64 // handled pings, though uncertain whether pong arrived
NumInvalid atomic.Uint64 // invalid sig in ping
NumSendFail atomic.Uint64 // socket refused to send (tx buffer full)
}
func NewPingServer(identity ed25519.PrivateKey, so udpSender) *PingServer {
return &PingServer{
identity: identity,
so: so,
}
}
// HandlePing processes incoming gossip ping messages.
func (p *PingServer) HandlePing(ping *Message__Ping, from netip.AddrPort) {
// Verify signature of ping.
if !ping.Value.Verify() {
p.NumInvalid.Add(1)
return
}
// SHA-256 hash token and sign with identity.
// Note: Possible signature forging attack vector.
pong := NewPing(HashPingToken(ping.Value.Token), p.identity)
pongMsg := &Message__Pong{pong}
// Respond to sender.
packet, err := pongMsg.BincodeSerialize()
if err != nil {
klog.Errorf("Failed to serialize pong: %s", err)
return
}
if _, err = p.so.WriteToUDPAddrPort(packet, from); err != nil {
p.NumSendFail.Add(1)
return
}
p.NumOK.Add(1)
}

53
pkg/gossip/ping_test.go Normal file
View File

@ -0,0 +1,53 @@
package gossip
import (
"context"
"crypto/ed25519"
"crypto/rand"
"net"
"net/netip"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestPingServer(t *testing.T) {
conn, err := net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort("[::1]:0")))
require.NoError(t, err)
defer conn.Close()
lo := conn.LocalAddr().(*net.UDPAddr).AddrPort()
_, identity, err := ed25519.GenerateKey(rand.Reader)
require.NoError(t, err)
handler := &Handler{
PingClient: NewPingClient(identity, conn),
PingServer: NewPingServer(identity, conn),
}
client := NewClient(handler, conn)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return client.Run(ctx)
})
group.Go(func() error {
defer cancel()
pinger := handler.PingClient
for i := 0; i < 100; i++ {
pong, responder, err := pinger.Ping(ctx, lo)
require.NoError(t, err)
assert.Equal(t, lo, responder)
assert.True(t, pong.Verify())
}
return nil
})
err = group.Wait()
assert.NoError(t, err)
}