cmd: add initial gossip pull

This commit is contained in:
Richard Patel 2022-09-08 19:40:32 +02:00
parent b4c108cc5c
commit cddcfc155d
5 changed files with 179 additions and 1 deletions

97
cmd/gossip/pull/pull.go Normal file
View File

@ -0,0 +1,97 @@
// Pull downloads CRDS from a peer.
package main
import (
"context"
"crypto/ed25519"
"crypto/rand"
"flag"
"net"
"net/netip"
"os"
"os/signal"
"time"
"github.com/certusone/radiance/pkg/gossip"
"golang.org/x/sync/errgroup"
"k8s.io/klog/v2"
)
var (
flagAddr = flag.String("addr", "", "Address to ping (<host>:<port>)")
)
var target netip.AddrPort
func init() {
klog.InitFlags(nil)
flag.Parse()
}
func main() {
if *flagAddr == "" {
klog.Exit("No address specified")
}
udpAddr, err := net.ResolveUDPAddr("udp", *flagAddr)
if err != nil {
klog.Exitf("invalid target address: %s", err)
}
target = udpAddr.AddrPort()
ctx := context.Background()
_, identity, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
panic(err)
}
conn, err := net.ListenUDP("udp", nil)
if err != nil {
klog.Exit(err)
}
pingClient := gossip.NewPingClient(identity, conn)
pingServer := gossip.NewPingServer(identity, conn)
pullClient := gossip.NewPullClient(identity, conn)
handler := &gossip.Handler{
PingClient: pingClient,
PingServer: pingServer,
PullClient: pullClient,
}
client := gossip.NewDriver(handler, conn)
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt)
defer cancel()
group, ctx := errgroup.WithContext(ctx)
group.Go(func() error {
return client.Run(ctx)
})
group.Go(func() error {
_, _, err := pingClient.Ping(ctx, target)
if err != nil {
klog.Exitf("ping ignored: %s", err)
}
klog.Info("Pinged")
err = pullClient.Pull(target)
if err != nil {
klog.Exitf("failed to send pull request: %s", err)
}
err = pullClient.Pull(target)
if err != nil {
klog.Exitf("failed to send pull request: %s", err)
}
time.Sleep(2 * time.Second)
err = pullClient.Pull(target)
if err != nil {
klog.Exitf("failed to send pull request: %s", err)
}
time.Sleep(3 * time.Second)
return context.Canceled // done
})
_ = group.Wait()
_ = conn.Close()
}

2
go.mod
View File

@ -18,6 +18,7 @@ require (
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
github.com/stretchr/testify v1.8.0
github.com/twmb/franz-go v1.6.0
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c
google.golang.org/protobuf v1.28.1
k8s.io/klog/v2 v2.60.1
@ -95,7 +96,6 @@ require (
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect

View File

@ -14,6 +14,8 @@ import (
"math/rand"
)
const MaxBloomSize = 928
func NewBloom(numBits uint64, keys []uint64) *Bloom {
bits := make([]uint64, (numBits+63)/64)
ret := &Bloom{

64
pkg/gossip/pull.go Normal file
View File

@ -0,0 +1,64 @@
package gossip
import (
"crypto/ed25519"
"encoding/json"
"fmt"
"net/netip"
"time"
)
const PacketSize = 1232
// PullClient implements the stateful client (initiator) side of the gossip pull protocol.
type PullClient struct {
identity ed25519.PrivateKey
so udpSender
}
func NewPullClient(identity ed25519.PrivateKey, so udpSender) *PullClient {
return &PullClient{
identity: identity,
so: so,
}
}
func (p *PullClient) Pull(target netip.AddrPort) error {
filters := NewCrdsFilterSet(65536, MaxBloomSize)
for _, filter := range filters {
if err := p.sendPullRequest(target, filter); err != nil {
return err
}
}
return nil
}
func (p *PullClient) sendPullRequest(target netip.AddrPort, filter CrdsFilter) error {
msg := &Message__PullRequest{
Filter: filter,
Value: CrdsValue{
Data: &CrdsData__ContactInfo{
Value: ContactInfo{
Wallclock: uint64(time.Now().UnixMilli()),
},
},
},
}
err := msg.Value.Sign(p.identity)
if err != nil {
panic("failed to sign pull request: " + err.Error())
}
packet, err := msg.BincodeSerialize()
if err != nil {
panic("failed to serialize packet: " + err.Error())
}
_, err = p.so.WriteToUDPAddrPort(packet, target)
return err
}
func (p *PullClient) HandlePullResponse(msg *Message__PullResponse, _ netip.AddrPort) {
jsonBuf, _ := json.MarshalIndent(msg, "", "\t")
fmt.Println(string(jsonBuf))
}

15
pkg/gossip/types.go Normal file
View File

@ -0,0 +1,15 @@
package gossip
import "github.com/gagliardetto/solana-go"
func (p Pubkey) MarshalText() ([]byte, error) {
return solana.PublicKey(p).MarshalText()
}
func (h Hash) MarshalText() ([]byte, error) {
return []byte(solana.Hash(h).String()), nil
}
func (s Signature) MarshalText() ([]byte, error) {
return []byte(solana.Signature(s).String()), nil
}