diff --git a/cmd/gossip/ping/ping.go b/cmd/gossip/ping/ping.go new file mode 100644 index 0000000..c3030c0 --- /dev/null +++ b/cmd/gossip/ping/ping.go @@ -0,0 +1,195 @@ +// Ping sends gossip pings to a Solana node. +package main + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "errors" + "flag" + "net" + "net/netip" + "os" + "os/signal" + "sync" + "sync/atomic" + "time" + + "github.com/certusone/radiance/pkg/gossip" + "golang.org/x/sync/errgroup" + "k8s.io/klog/v2" +) + +var ( + flagCount = flag.Int("c", -1, "Number of pings to send, -1 for infinite") + flagDelay = flag.Duration("i", 1*time.Second, "Delay between pings") + flagTimeout = flag.Duration("timeout", 3*time.Second, "Ping timeout") + flagAddr = flag.String("addr", "", "Address to ping (:)") +) + +func init() { + klog.InitFlags(nil) + flag.Parse() +} + +func main() { + if *flagAddr == "" { + klog.Exit("No address to ping specified") + } + + ctx := context.Background() + + _, privkey, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + panic(err) + } + + conn, err := net.Dial("udp", *flagAddr) + if err != nil { + klog.Exit(err) + } + udpConn := conn.(*net.UDPConn) + + klog.Infof("GOSSIP PING %s (%s)", *flagAddr, conn.RemoteAddr()) + + s := Session{ + privkey: privkey, + udpConn: udpConn, + reqs: make(map[[32]byte]pending), + } + + ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) + defer cancel() + group, ctx := errgroup.WithContext(ctx) + group.Go(func() error { + return s.send(ctx) + }) + group.Go(func() error { + return s.receive(ctx) + }) + _ = group.Wait() + _ = conn.Close() + + klog.Infof("--- %s gossip ping statistics ---", udpConn.RemoteAddr()) + + numSuccess := atomic.LoadUint64(&s.numSuccess) + numTimeout := atomic.LoadUint64(&s.numTimeout) + klog.Infof("%d packets transmitted, %d packets received, %.1f%% packet loss", + numSuccess+numTimeout, numSuccess, (1-(float64(numSuccess)/float64(numSuccess+numTimeout)))*100) +} + +type pending struct { + c int + t time.Time + ping gossip.Ping + cancel context.CancelFunc +} + +type Session struct { + privkey ed25519.PrivateKey + udpConn *net.UDPConn + lock sync.Mutex + reqs map[[32]byte]pending + + numSuccess uint64 + numTimeout uint64 + numSendFail uint64 +} + +func (s *Session) send(ctx context.Context) error { + defer s.udpConn.Close() + ticker := time.NewTicker(*flagDelay) + for c := 0; c < *flagCount || *flagCount == -1; c++ { + s.sendPing(ctx, c) + select { + case <-ticker.C: + case <-ctx.Done(): + return ctx.Err() + } + } + return errors.New("done") +} + +func (s *Session) sendPing(ctx context.Context, c int) { + t := time.Now() + + var token [32]byte + if _, err := rand.Read(token[:]); err != nil { + panic(err) + } + + ping := gossip.NewPing(token, s.privkey) + pingMsg := gossip.Message__Ping{Value: ping} + frame, err := pingMsg.BincodeSerialize() + if err != nil { + panic(err) + } + + _, _, err = s.udpConn.WriteMsgUDP(frame, nil, nil) + if err != nil { + klog.Warningf("Send ping: %s", err) + atomic.AddUint64(&s.numSendFail, 1) + return + } + + pongToken := gossip.HashPingToken(ping.Token) + pingCtx, pingCancel := context.WithTimeout(ctx, *flagTimeout) + go func() { + <-pingCtx.Done() + if err := pingCtx.Err(); err == context.DeadlineExceeded { + s.lock.Lock() + defer s.lock.Unlock() + delete(s.reqs, pongToken) + klog.V(3).Infof("Request timeout for seq %d", c) + atomic.AddUint64(&s.numTimeout, 1) + } + }() + + s.lock.Lock() + s.reqs[pongToken] = pending{ + c: c, + t: t, + ping: ping, + cancel: pingCancel, + } + s.lock.Unlock() +} + +func (s *Session) receive(ctx context.Context) error { + for ctx.Err() == nil { + var packet [132]byte + n, remote, err := s.udpConn.ReadFromUDPAddrPort(packet[:]) + klog.V(7).Infof("Packet from %s", remote) + if n >= len(packet) { + s.handlePong(packet[:], remote) + } + if err != nil { + return err + } + } + return nil +} + +func (s *Session) handlePong(packet []byte, remote netip.AddrPort) { + msg, err := gossip.BincodeDeserializeMessage(packet) + if err != nil { + return + } + pongMsg, ok := msg.(*gossip.Message__Pong) + if !ok { + return + } + + s.lock.Lock() + defer s.lock.Unlock() + req, ok := s.reqs[pongMsg.Value.Token] + if !ok { + return + } + delete(s.reqs, pongMsg.Value.Token) + + req.cancel() + + klog.V(3).Infof("Pong from %s seq=%d time=%v", remote, req.c, time.Since(req.t)) + atomic.AddUint64(&s.numSuccess, 1) +} diff --git a/generate.sh b/generate.sh index 9788c25..1e920ef 100755 --- a/generate.sh +++ b/generate.sh @@ -7,3 +7,13 @@ set -e ) third_party/tools/bin/buf generate + +# cargo install serde-generate +if command -v serdegen &> /dev/null +then + serdegen ./pkg/gossip/schema.yaml \ + --language=Go \ + --with-runtimes=Bincode \ + --serde-package-name=gossip \ + > ./pkg/gossip/schema.go +fi diff --git a/go.mod b/go.mod index 5d11205..db6e9f2 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/klauspost/compress v1.15.9 github.com/lucas-clemente/quic-go v0.27.2 github.com/mr-tron/base58 v1.2.0 + github.com/novifinancial/serde-reflection/serde-generate/runtime/golang v0.0.0-20220519162058-e5cd3c3b3f3a github.com/prometheus/client_golang v1.13.0 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 github.com/stretchr/testify v1.8.0 diff --git a/go.sum b/go.sum index 81613ab..4054e63 100644 --- a/go.sum +++ b/go.sum @@ -445,6 +445,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= +github.com/novifinancial/serde-reflection/serde-generate/runtime/golang v0.0.0-20220519162058-e5cd3c3b3f3a h1:oMG8C4E7DFkat7WQicw4JNa/dYUaqO7RvLPbkFdADIA= +github.com/novifinancial/serde-reflection/serde-generate/runtime/golang v0.0.0-20220519162058-e5cd3c3b3f3a/go.mod h1:NrRYJCFtaewjIRr4B9V2AyWsAEMW0Zqdjs8Bm+bACbM= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/pkg/gossip/ping.go b/pkg/gossip/ping.go new file mode 100644 index 0000000..3aa3866 --- /dev/null +++ b/pkg/gossip/ping.go @@ -0,0 +1,30 @@ +package gossip + +import ( + "crypto/ed25519" + "crypto/sha256" +) + +// PingSize is the size of a serialized ping message. +const PingSize = 128 + +// NewPing creates and signs a new ping message. +// +// Panics if the provided private key is invalid. +func NewPing(token [32]byte, key ed25519.PrivateKey) (p Ping) { + sig := ed25519.Sign(key, token[:]) + copy(p.From[:], key.Public().(ed25519.PublicKey)) + copy(p.Token[:], token[:]) + copy(p.Signature[:], sig[:]) + return p +} + +// Verify checks the Ping's signature. +func (p *Ping) Verify() bool { + return ed25519.Verify(p.From[:], p.Token[:], p.Signature[:]) +} + +// HashPingToken returns the pong token given a ping token. +func HashPingToken(token [32]byte) [32]byte { + return sha256.Sum256(token[:]) +} diff --git a/pkg/gossip/schema.go b/pkg/gossip/schema.go new file mode 100644 index 0000000..2a7317f --- /dev/null +++ b/pkg/gossip/schema.go @@ -0,0 +1,252 @@ +// Code generated by serde-generate; DO NOT EDIT. + +package gossip + +import ( + "fmt" + "github.com/novifinancial/serde-reflection/serde-generate/runtime/golang/bincode" + "github.com/novifinancial/serde-reflection/serde-generate/runtime/golang/serde" +) + +type Message interface { + isMessage() + Serialize(serializer serde.Serializer) error + BincodeSerialize() ([]byte, error) +} + +func DeserializeMessage(deserializer serde.Deserializer) (Message, error) { + index, err := deserializer.DeserializeVariantIndex() + if err != nil { + return nil, err + } + + switch index { + case 4: + if val, err := load_Message__Ping(deserializer); err == nil { + return &val, nil + } else { + return nil, err + } + + case 5: + if val, err := load_Message__Pong(deserializer); err == nil { + return &val, nil + } else { + return nil, err + } + + default: + return nil, fmt.Errorf("Unknown variant index for Message: %d", index) + } +} + +func BincodeDeserializeMessage(input []byte) (Message, error) { + if input == nil { + var obj Message + return obj, fmt.Errorf("Cannot deserialize null array") + } + deserializer := bincode.NewDeserializer(input) + obj, err := DeserializeMessage(deserializer) + if err == nil && deserializer.GetBufferOffset() < uint64(len(input)) { + return obj, fmt.Errorf("Some input bytes were not read") + } + return obj, err +} + +type Message__Ping struct { + Value Ping +} + +func (*Message__Ping) isMessage() {} + +func (obj *Message__Ping) Serialize(serializer serde.Serializer) error { + if err := serializer.IncreaseContainerDepth(); err != nil { + return err + } + serializer.SerializeVariantIndex(4) + if err := obj.Value.Serialize(serializer); err != nil { + return err + } + serializer.DecreaseContainerDepth() + return nil +} + +func (obj *Message__Ping) BincodeSerialize() ([]byte, error) { + if obj == nil { + return nil, fmt.Errorf("Cannot serialize null object") + } + serializer := bincode.NewSerializer() + if err := obj.Serialize(serializer); err != nil { + return nil, err + } + return serializer.GetBytes(), nil +} + +func load_Message__Ping(deserializer serde.Deserializer) (Message__Ping, error) { + var obj Message__Ping + if err := deserializer.IncreaseContainerDepth(); err != nil { + return obj, err + } + if val, err := DeserializePing(deserializer); err == nil { + obj.Value = val + } else { + return obj, err + } + deserializer.DecreaseContainerDepth() + return obj, nil +} + +type Message__Pong struct { + Value Ping +} + +func (*Message__Pong) isMessage() {} + +func (obj *Message__Pong) Serialize(serializer serde.Serializer) error { + if err := serializer.IncreaseContainerDepth(); err != nil { + return err + } + serializer.SerializeVariantIndex(5) + if err := obj.Value.Serialize(serializer); err != nil { + return err + } + serializer.DecreaseContainerDepth() + return nil +} + +func (obj *Message__Pong) BincodeSerialize() ([]byte, error) { + if obj == nil { + return nil, fmt.Errorf("Cannot serialize null object") + } + serializer := bincode.NewSerializer() + if err := obj.Serialize(serializer); err != nil { + return nil, err + } + return serializer.GetBytes(), nil +} + +func load_Message__Pong(deserializer serde.Deserializer) (Message__Pong, error) { + var obj Message__Pong + if err := deserializer.IncreaseContainerDepth(); err != nil { + return obj, err + } + if val, err := DeserializePing(deserializer); err == nil { + obj.Value = val + } else { + return obj, err + } + deserializer.DecreaseContainerDepth() + return obj, nil +} + +type Ping struct { + From [32]uint8 + Token [32]uint8 + Signature [64]uint8 +} + +func (obj *Ping) Serialize(serializer serde.Serializer) error { + if err := serializer.IncreaseContainerDepth(); err != nil { + return err + } + if err := serialize_array32_u8_array(obj.From, serializer); err != nil { + return err + } + if err := serialize_array32_u8_array(obj.Token, serializer); err != nil { + return err + } + if err := serialize_array64_u8_array(obj.Signature, serializer); err != nil { + return err + } + serializer.DecreaseContainerDepth() + return nil +} + +func (obj *Ping) BincodeSerialize() ([]byte, error) { + if obj == nil { + return nil, fmt.Errorf("Cannot serialize null object") + } + serializer := bincode.NewSerializer() + if err := obj.Serialize(serializer); err != nil { + return nil, err + } + return serializer.GetBytes(), nil +} + +func DeserializePing(deserializer serde.Deserializer) (Ping, error) { + var obj Ping + if err := deserializer.IncreaseContainerDepth(); err != nil { + return obj, err + } + if val, err := deserialize_array32_u8_array(deserializer); err == nil { + obj.From = val + } else { + return obj, err + } + if val, err := deserialize_array32_u8_array(deserializer); err == nil { + obj.Token = val + } else { + return obj, err + } + if val, err := deserialize_array64_u8_array(deserializer); err == nil { + obj.Signature = val + } else { + return obj, err + } + deserializer.DecreaseContainerDepth() + return obj, nil +} + +func BincodeDeserializePing(input []byte) (Ping, error) { + if input == nil { + var obj Ping + return obj, fmt.Errorf("Cannot deserialize null array") + } + deserializer := bincode.NewDeserializer(input) + obj, err := DeserializePing(deserializer) + if err == nil && deserializer.GetBufferOffset() < uint64(len(input)) { + return obj, fmt.Errorf("Some input bytes were not read") + } + return obj, err +} +func serialize_array32_u8_array(value [32]uint8, serializer serde.Serializer) error { + for _, item := range value { + if err := serializer.SerializeU8(item); err != nil { + return err + } + } + return nil +} + +func deserialize_array32_u8_array(deserializer serde.Deserializer) ([32]uint8, error) { + var obj [32]uint8 + for i := range obj { + if val, err := deserializer.DeserializeU8(); err == nil { + obj[i] = val + } else { + return obj, err + } + } + return obj, nil +} + +func serialize_array64_u8_array(value [64]uint8, serializer serde.Serializer) error { + for _, item := range value { + if err := serializer.SerializeU8(item); err != nil { + return err + } + } + return nil +} + +func deserialize_array64_u8_array(deserializer serde.Deserializer) ([64]uint8, error) { + var obj [64]uint8 + for i := range obj { + if val, err := deserializer.DeserializeU8(); err == nil { + obj[i] = val + } else { + return obj, err + } + } + return obj, nil +} diff --git a/pkg/gossip/schema.yaml b/pkg/gossip/schema.yaml new file mode 100644 index 0000000..2aad913 --- /dev/null +++ b/pkg/gossip/schema.yaml @@ -0,0 +1,25 @@ +--- +Message: + ENUM: + 4: + Ping: + NEWTYPE: + TYPENAME: Ping + 5: + Pong: + NEWTYPE: + TYPENAME: Ping +Ping: + STRUCT: + - from: + TUPLEARRAY: + CONTENT: U8 + SIZE: 32 + - token: + TUPLEARRAY: + CONTENT: U8 + SIZE: 32 + - signature: + TUPLEARRAY: + CONTENT: U8 + SIZE: 64