From cddcfc155d963290ca6d6cae20f314c7e0fe5a97 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Thu, 8 Sep 2022 19:40:32 +0200 Subject: [PATCH] cmd: add initial gossip pull --- cmd/gossip/pull/pull.go | 97 +++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- pkg/gossip/bloom.go | 2 + pkg/gossip/pull.go | 64 +++++++++++++++++++++++++++ pkg/gossip/types.go | 15 +++++++ 5 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 cmd/gossip/pull/pull.go create mode 100644 pkg/gossip/pull.go create mode 100644 pkg/gossip/types.go diff --git a/cmd/gossip/pull/pull.go b/cmd/gossip/pull/pull.go new file mode 100644 index 0000000..3b987f4 --- /dev/null +++ b/cmd/gossip/pull/pull.go @@ -0,0 +1,97 @@ +// Pull downloads CRDS from a peer. +package main + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "flag" + "net" + "net/netip" + "os" + "os/signal" + "time" + + "github.com/certusone/radiance/pkg/gossip" + "golang.org/x/sync/errgroup" + "k8s.io/klog/v2" +) + +var ( + flagAddr = flag.String("addr", "", "Address to ping (:)") +) + +var target netip.AddrPort + +func init() { + klog.InitFlags(nil) + flag.Parse() +} + +func main() { + if *flagAddr == "" { + klog.Exit("No address specified") + } + + udpAddr, err := net.ResolveUDPAddr("udp", *flagAddr) + if err != nil { + klog.Exitf("invalid target address: %s", err) + } + target = udpAddr.AddrPort() + + ctx := context.Background() + + _, identity, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + panic(err) + } + + conn, err := net.ListenUDP("udp", nil) + if err != nil { + klog.Exit(err) + } + + pingClient := gossip.NewPingClient(identity, conn) + pingServer := gossip.NewPingServer(identity, conn) + pullClient := gossip.NewPullClient(identity, conn) + handler := &gossip.Handler{ + PingClient: pingClient, + PingServer: pingServer, + PullClient: pullClient, + } + client := gossip.NewDriver(handler, conn) + + ctx, cancel := signal.NotifyContext(ctx, os.Interrupt) + defer cancel() + group, ctx := errgroup.WithContext(ctx) + group.Go(func() error { + return client.Run(ctx) + }) + group.Go(func() error { + _, _, err := pingClient.Ping(ctx, target) + if err != nil { + klog.Exitf("ping ignored: %s", err) + } + klog.Info("Pinged") + err = pullClient.Pull(target) + if err != nil { + klog.Exitf("failed to send pull request: %s", err) + } + err = pullClient.Pull(target) + if err != nil { + klog.Exitf("failed to send pull request: %s", err) + } + + time.Sleep(2 * time.Second) + + err = pullClient.Pull(target) + if err != nil { + klog.Exitf("failed to send pull request: %s", err) + } + + time.Sleep(3 * time.Second) + return context.Canceled // done + }) + _ = group.Wait() + _ = conn.Close() +} diff --git a/go.mod b/go.mod index db6e9f2..e7339f3 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 github.com/stretchr/testify v1.8.0 github.com/twmb/franz-go v1.6.0 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c google.golang.org/protobuf v1.28.1 k8s.io/klog/v2 v2.60.1 @@ -95,7 +96,6 @@ require ( golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb // indirect - golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.10 // indirect diff --git a/pkg/gossip/bloom.go b/pkg/gossip/bloom.go index 2e295ef..f7529af 100644 --- a/pkg/gossip/bloom.go +++ b/pkg/gossip/bloom.go @@ -14,6 +14,8 @@ import ( "math/rand" ) +const MaxBloomSize = 928 + func NewBloom(numBits uint64, keys []uint64) *Bloom { bits := make([]uint64, (numBits+63)/64) ret := &Bloom{ diff --git a/pkg/gossip/pull.go b/pkg/gossip/pull.go new file mode 100644 index 0000000..e78f4cb --- /dev/null +++ b/pkg/gossip/pull.go @@ -0,0 +1,64 @@ +package gossip + +import ( + "crypto/ed25519" + "encoding/json" + "fmt" + "net/netip" + "time" +) + +const PacketSize = 1232 + +// PullClient implements the stateful client (initiator) side of the gossip pull protocol. +type PullClient struct { + identity ed25519.PrivateKey + so udpSender +} + +func NewPullClient(identity ed25519.PrivateKey, so udpSender) *PullClient { + return &PullClient{ + identity: identity, + so: so, + } +} + +func (p *PullClient) Pull(target netip.AddrPort) error { + filters := NewCrdsFilterSet(65536, MaxBloomSize) + for _, filter := range filters { + if err := p.sendPullRequest(target, filter); err != nil { + return err + } + } + return nil +} + +func (p *PullClient) sendPullRequest(target netip.AddrPort, filter CrdsFilter) error { + msg := &Message__PullRequest{ + Filter: filter, + Value: CrdsValue{ + Data: &CrdsData__ContactInfo{ + Value: ContactInfo{ + Wallclock: uint64(time.Now().UnixMilli()), + }, + }, + }, + } + err := msg.Value.Sign(p.identity) + if err != nil { + panic("failed to sign pull request: " + err.Error()) + } + + packet, err := msg.BincodeSerialize() + if err != nil { + panic("failed to serialize packet: " + err.Error()) + } + + _, err = p.so.WriteToUDPAddrPort(packet, target) + return err +} + +func (p *PullClient) HandlePullResponse(msg *Message__PullResponse, _ netip.AddrPort) { + jsonBuf, _ := json.MarshalIndent(msg, "", "\t") + fmt.Println(string(jsonBuf)) +} diff --git a/pkg/gossip/types.go b/pkg/gossip/types.go new file mode 100644 index 0000000..75c9144 --- /dev/null +++ b/pkg/gossip/types.go @@ -0,0 +1,15 @@ +package gossip + +import "github.com/gagliardetto/solana-go" + +func (p Pubkey) MarshalText() ([]byte, error) { + return solana.PublicKey(p).MarshalText() +} + +func (h Hash) MarshalText() ([]byte, error) { + return []byte(solana.Hash(h).String()), nil +} + +func (s Signature) MarshalText() ([]byte, error) { + return []byte(solana.Signature(s).String()), nil +}