diff --git a/cmd/gossip/ping/ping.go b/cmd/gossip/ping/ping.go index 9574148..4563d74 100644 --- a/cmd/gossip/ping/ping.go +++ b/cmd/gossip/ping/ping.go @@ -12,7 +12,6 @@ import ( "os" "os/signal" "sync" - "sync/atomic" "time" "github.com/certusone/radiance/pkg/gossip" @@ -27,6 +26,8 @@ var ( flagAddr = flag.String("addr", "", "Address to ping (:)") ) +var target netip.AddrPort + func init() { klog.InitFlags(nil) flag.Parse() @@ -37,6 +38,12 @@ func main() { klog.Exit("No address to ping specified") } + udpAddr, err := net.ResolveUDPAddr("udp", *flagAddr) + if err != nil { + klog.Exitf("invalid target address: %s", err) + } + target = udpAddr.AddrPort() + ctx := context.Background() _, privkey, err := ed25519.GenerateKey(rand.Reader) @@ -44,152 +51,72 @@ func main() { panic(err) } - conn, err := net.Dial("udp", *flagAddr) + conn, err := net.ListenUDP("udp", nil) if err != nil { klog.Exit(err) } - udpConn := conn.(*net.UDPConn) - klog.Infof("GOSSIP PING %s (%s)", *flagAddr, conn.RemoteAddr()) - - s := Session{ - privkey: privkey, - udpConn: udpConn, - reqs: make(map[[32]byte]pending), + pingClient := gossip.NewPingClient(privkey, conn) + handler := &gossip.Handler{ + PingClient: pingClient, } + client := gossip.NewClient(handler, conn) + + klog.Infof("GOSSIP PING %s (%s)", *flagAddr, target.String()) ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) defer cancel() group, ctx := errgroup.WithContext(ctx) group.Go(func() error { - return s.send(ctx) + return client.Run(ctx) }) group.Go(func() error { - return s.receive(ctx) + pingLoop(ctx, pingClient) + return context.Canceled // done }) _ = group.Wait() _ = conn.Close() - klog.Infof("--- %s gossip ping statistics ---", udpConn.RemoteAddr()) + klog.Infof("--- %s gossip ping statistics ---", target.String()) - numSuccess := atomic.LoadUint64(&s.numSuccess) - numTimeout := atomic.LoadUint64(&s.numTimeout) + numSuccess := pingClient.NumOK.Load() + numTimeout := pingClient.NumTimeout.Load() klog.Infof("%d packets transmitted, %d packets received, %.1f%% packet loss", numSuccess+numTimeout, numSuccess, (1-(float64(numSuccess)/float64(numSuccess+numTimeout)))*100) } -type pending struct { - c int - t time.Time - ping gossip.Ping - cancel context.CancelFunc -} +func pingLoop(ctx context.Context, pinger *gossip.PingClient) { + var wg sync.WaitGroup + defer wg.Wait() -type Session struct { - privkey ed25519.PrivateKey - udpConn *net.UDPConn - lock sync.Mutex - reqs map[[32]byte]pending - - numSuccess uint64 - numTimeout uint64 - numSendFail uint64 -} - -func (s *Session) send(ctx context.Context) error { - defer s.udpConn.Close() ticker := time.NewTicker(*flagDelay) - for c := 0; c < *flagCount || *flagCount == -1; c++ { - s.sendPing(ctx, c) + count := *flagCount + for seq := 0; seq < count || count == -1; seq++ { select { - case <-ticker.C: case <-ctx.Done(): - return ctx.Err() + return + case <-ticker.C: + wg.Add(1) + go sendPing(ctx, &wg, pinger, seq) } } - return errors.New("done") } -func (s *Session) sendPing(ctx context.Context, c int) { - t := time.Now() +func sendPing(ctx context.Context, wg *sync.WaitGroup, pinger *gossip.PingClient, seq int) { + defer wg.Done() - var token [32]byte - if _, err := rand.Read(token[:]); err != nil { - panic(err) - } + ctx, cancel := context.WithTimeout(ctx, *flagTimeout) + defer cancel() - ping := gossip.NewPing(token, s.privkey) - pingMsg := gossip.Message__Ping{Value: ping} - frame, err := pingMsg.BincodeSerialize() - if err != nil { - panic(err) - } - - _, _, err = s.udpConn.WriteMsgUDP(frame, nil, nil) - if err != nil { - klog.Warningf("Send ping: %s", err) - atomic.AddUint64(&s.numSendFail, 1) + start := time.Now() + _, responder, err := pinger.Ping(ctx, target) + if err == nil { + klog.Infof("Pong from %s seq=%d time=%v", responder, seq, time.Since(start)) + } else if errors.Is(err, context.Canceled) { return + } else if errors.Is(err, context.DeadlineExceeded) { + klog.Infof("Request timeout for seq %d", seq) + } else { + klog.Warning(err) } - - pongToken := gossip.HashPingToken(ping.Token) - pingCtx, pingCancel := context.WithTimeout(ctx, *flagTimeout) - go func() { - <-pingCtx.Done() - if err := pingCtx.Err(); err == context.DeadlineExceeded { - s.lock.Lock() - defer s.lock.Unlock() - delete(s.reqs, pongToken) - klog.V(3).Infof("Request timeout for seq %d", c) - atomic.AddUint64(&s.numTimeout, 1) - } - }() - - s.lock.Lock() - s.reqs[pongToken] = pending{ - c: c, - t: t, - ping: ping, - cancel: pingCancel, - } - s.lock.Unlock() -} - -func (s *Session) receive(ctx context.Context) error { - for ctx.Err() == nil { - var packet [4 + gossip.PingSize]byte - n, remote, err := s.udpConn.ReadFromUDPAddrPort(packet[:]) - klog.V(7).Infof("Packet from %s", remote) - if n >= len(packet) { - s.handlePong(packet[:], remote) - } - if err != nil { - return err - } - } - return nil -} - -func (s *Session) handlePong(packet []byte, remote netip.AddrPort) { - msg, err := gossip.BincodeDeserializeMessage(packet) - if err != nil { - return - } - pongMsg, ok := msg.(*gossip.Message__Pong) - if !ok { - return - } - - s.lock.Lock() - defer s.lock.Unlock() - req, ok := s.reqs[pongMsg.Value.Token] - if !ok { - return - } - delete(s.reqs, pongMsg.Value.Token) - - req.cancel() - - klog.V(3).Infof("Pong from %s seq=%d time=%v", remote, req.c, time.Since(req.t)) - atomic.AddUint64(&s.numSuccess, 1) } diff --git a/pkg/gossip/client.go b/pkg/gossip/client.go new file mode 100644 index 0000000..4ae4114 --- /dev/null +++ b/pkg/gossip/client.go @@ -0,0 +1,94 @@ +package gossip + +import ( + "context" + "net" + "net/netip" + "sync/atomic" +) + +// Client implements the network main loop. +type Client struct { + handler *Handler + so *net.UDPConn +} + +func NewClient(handler *Handler, so *net.UDPConn) *Client { + return &Client{ + handler: handler, + so: so, + } +} + +// Run processes packets until the context is cancelled. +// +// Destroys all handlers and closes the socket after returning. +// Returns any network error or nil if the context closed. +func (c *Client) Run(ctx context.Context) error { + defer c.handler.Close() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var graceful atomic.Bool + // Monitor close signal + go func() { + defer c.so.Close() + defer graceful.Store(true) + <-ctx.Done() + }() + + var buf [1280]byte + for { + n, _, _, addr, err := c.so.ReadMsgUDPAddrPort(buf[:], nil) + if n > 0 { + c.handler.HandlePacket(buf[:n], addr) + } + if err != nil { + if graceful.Load() { + return nil + } + return err + } + } +} + +// Handler is a network-agnostic multiplexer for incoming gossip messages. +type Handler struct { + *PingClient + *PingServer + + numInvalidMsgs uint64 + numIgnoredMsgs uint64 +} + +// HandlePacket is the entrypoint of the RX side. +func (h *Handler) HandlePacket(packet []byte, from netip.AddrPort) { + msg, err := BincodeDeserializeMessage(packet) + if err != nil { + atomic.AddUint64(&h.numInvalidMsgs, 1) + return + } + switch x := msg.(type) { + case *Message__Ping: + if h.PingServer != nil { + h.PingServer.HandlePing(x, from) + return + } + case *Message__Pong: + if h.PingClient != nil { + h.PingClient.HandlePong(x, from) + return + } + } + atomic.AddUint64(&h.numIgnoredMsgs, 1) +} + +// Close destroys all handlers. +func (h *Handler) Close() { + h.PingClient.Close() +} + +type udpSender interface { + WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) +} diff --git a/pkg/gossip/ping.go b/pkg/gossip/ping.go index 3aa3866..34e8ebc 100644 --- a/pkg/gossip/ping.go +++ b/pkg/gossip/ping.go @@ -1,8 +1,15 @@ package gossip import ( + "context" "crypto/ed25519" "crypto/sha256" + "math/rand" + "net/netip" + "sync" + "sync/atomic" + + "k8s.io/klog/v2" ) // PingSize is the size of a serialized ping message. @@ -19,6 +26,12 @@ func NewPing(token [32]byte, key ed25519.PrivateKey) (p Ping) { return p } +func NewPingRandom(key ed25519.PrivateKey) Ping { + var token [32]byte + rand.Read(token[:]) + return NewPing(token, key) +} + // Verify checks the Ping's signature. func (p *Ping) Verify() bool { return ed25519.Verify(p.From[:], p.Token[:], p.Signature[:]) @@ -28,3 +41,202 @@ func (p *Ping) Verify() bool { func HashPingToken(token [32]byte) [32]byte { return sha256.Sum256(token[:]) } + +// PingClient implements the stateful client (initiator) side of the gossip ping protocol. +// +// It tracks every pending request to match it with solicited pong frames. +type PingClient struct { + identity ed25519.PrivateKey + so udpSender + + lock sync.RWMutex + reqs map[Hash]*pingSession // pong token => session + + NumSent atomic.Uint64 // ping messages sent + NumOK atomic.Uint64 // successful ping transaction + NumInvalid atomic.Uint64 // invalid sig in pong + NumTimeout atomic.Uint64 // context errored before pong arrived + NumSendFail atomic.Uint64 // socket refused to send (tx buffer full) + NumMartian atomic.Uint64 // unsolicited pong +} + +func NewPingClient(identity ed25519.PrivateKey, so udpSender) *PingClient { + return &PingClient{ + identity: identity, + so: so, + reqs: make(map[Hash]*pingSession), + } +} + +type pingSession struct { + out chan<- pongResponse + done atomic.Bool +} + +type pongResponse struct { + from netip.AddrPort + pong Ping +} + +// Ping sends a gossip ping packet. +// Blocks until a valid matching pong packet arrives or the context is cancelled. +// +// Note that this mechanism is unrelated to ICMP pings. +func (p *PingClient) Ping(ctx context.Context, target netip.AddrPort) (pong Ping, responder netip.AddrPort, err error) { + // Allocate session for lifetime of scope + ping, pongToken, resp := p.createSession() + defer p.destroySession(pongToken) + + // Send ping to "server" + pingMsg := &Message__Ping{ping} + packet, err := pingMsg.BincodeSerialize() + if err != nil { + klog.Errorf("Failed to serialize ping: %s", err) + return + } + if _, err = p.so.WriteToUDPAddrPort(packet, target); err != nil { + p.NumSendFail.Add(1) + return + } + p.NumSent.Add(1) + + // Block until something happens + select { + case <-ctx.Done(): + err = ctx.Err() + p.NumTimeout.Add(1) + return + case resp, ok := <-resp: + if !ok { + // sanity check: cancellation can only happen before or after select + panic("race condition") + } + pong = resp.pong + responder = resp.from + p.NumOK.Add(1) + return + } +} + +// HandlePong processes incoming gossip pong messages. +func (p *PingClient) HandlePong(msg *Message__Pong, from netip.AddrPort) { + pong := msg.Value + + // map lookup is cheaper than Ed25519 verify, so do that first + sess := p.getSession(pong.Token) + if sess == nil { + p.NumMartian.Add(1) + return + } + + // We might receive two valid pongs before the initiating goroutine cleans up. + // Bail here because we are only allowed to send one pong back to the channel. + if sess.done.Swap(true) { + p.NumMartian.Add(1) + return + } + + if !pong.Verify() { + p.NumInvalid.Add(1) + return + } + + // Upgrade to write lock to prevent initiating goroutine + // from closing the channel we're about to send on. + // TODO: this is probably very slow + p.lock.Lock() + defer p.lock.Unlock() + sess = p.reqs[pong.Token] + if sess == nil { + // session was cancelled while we were verifying the pong + p.NumMartian.Add(1) + return + } + sess.out <- pongResponse{ + from: from, + pong: pong, + } +} + +func (p *PingClient) createSession() (ping Ping, pongToken Hash, resp <-chan pongResponse) { + ping = NewPingRandom(p.identity) + pongToken = HashPingToken(ping.Token) + + respBi := make(chan pongResponse, 1) + resp = respBi // recv-only + + p.lock.Lock() + defer p.lock.Unlock() + p.reqs[pongToken] = &pingSession{ + out: respBi, // send-only + } + return +} + +func (p *PingClient) getSession(pongToken Hash) *pingSession { + p.lock.RLock() + defer p.lock.RUnlock() + return p.reqs[pongToken] +} + +func (p *PingClient) destroySession(pongToken Hash) { + p.lock.Lock() + defer p.lock.Unlock() + + session, ok := p.reqs[pongToken] + if !ok { + return + } + close(session.out) + delete(p.reqs, pongToken) +} + +func (p *PingClient) Close() { + p.lock.Lock() + defer p.lock.Unlock() +} + +// PingServer implements the stateless server (reactor) side of the gossip ping protocol. +// +// It implements no rate-limits and is thus vulnerable to packet floods. +type PingServer struct { + identity ed25519.PrivateKey + so udpSender + + NumOK atomic.Uint64 // handled pings, though uncertain whether pong arrived + NumInvalid atomic.Uint64 // invalid sig in ping + NumSendFail atomic.Uint64 // socket refused to send (tx buffer full) +} + +func NewPingServer(identity ed25519.PrivateKey, so udpSender) *PingServer { + return &PingServer{ + identity: identity, + so: so, + } +} + +// HandlePing processes incoming gossip ping messages. +func (p *PingServer) HandlePing(ping *Message__Ping, from netip.AddrPort) { + // Verify signature of ping. + if !ping.Value.Verify() { + p.NumInvalid.Add(1) + return + } + + // SHA-256 hash token and sign with identity. + // Note: Possible signature forging attack vector. + pong := NewPing(HashPingToken(ping.Value.Token), p.identity) + pongMsg := &Message__Pong{pong} + + // Respond to sender. + packet, err := pongMsg.BincodeSerialize() + if err != nil { + klog.Errorf("Failed to serialize pong: %s", err) + return + } + if _, err = p.so.WriteToUDPAddrPort(packet, from); err != nil { + p.NumSendFail.Add(1) + return + } + p.NumOK.Add(1) +} diff --git a/pkg/gossip/ping_test.go b/pkg/gossip/ping_test.go new file mode 100644 index 0000000..a5d8319 --- /dev/null +++ b/pkg/gossip/ping_test.go @@ -0,0 +1,53 @@ +package gossip + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "net" + "net/netip" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestPingServer(t *testing.T) { + conn, err := net.ListenUDP("udp", net.UDPAddrFromAddrPort(netip.MustParseAddrPort("[::1]:0"))) + require.NoError(t, err) + defer conn.Close() + + lo := conn.LocalAddr().(*net.UDPAddr).AddrPort() + + _, identity, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + + handler := &Handler{ + PingClient: NewPingClient(identity, conn), + PingServer: NewPingServer(identity, conn), + } + client := NewClient(handler, conn) + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + + group, ctx := errgroup.WithContext(ctx) + group.Go(func() error { + return client.Run(ctx) + }) + group.Go(func() error { + defer cancel() + pinger := handler.PingClient + + for i := 0; i < 100; i++ { + pong, responder, err := pinger.Ping(ctx, lo) + require.NoError(t, err) + assert.Equal(t, lo, responder) + assert.True(t, pong.Verify()) + } + return nil + }) + err = group.Wait() + assert.NoError(t, err) +}