cmd/rpc/slots: add kafka publishing

This commit is contained in:
Leopold Schabel 2022-06-16 02:13:17 +02:00
parent 4cfb389389
commit c795f21c20
9 changed files with 323 additions and 6 deletions

View File

@ -11,16 +11,24 @@ import (
"time"
"github.com/certusone/radiance/pkg/envfile"
"github.com/certusone/radiance/pkg/kafka"
"github.com/certusone/radiance/pkg/leaderschedule"
envv1 "github.com/certusone/radiance/proto/env/v1"
networkv1 "github.com/certusone/radiance/proto/network/v1"
"github.com/gagliardetto/solana-go/rpc/ws"
"github.com/golang/protobuf/proto"
"github.com/twmb/franz-go/pkg/kgo"
"k8s.io/klog/v2"
)
var (
flagEnv = flag.String("env", ".env.prototxt", "Env file (.prototxt)")
flagOnly = flag.String("only", "", "Only watch specified nodes (comma-separated)")
flagType = flag.String("type", "", "Only print specific types")
flagType = flag.String("type", "", "Only print specific types to log")
flagKafka = flag.Bool("kafka", false, "Enable Kafka publishing")
flagKafkaTopic = flag.String("kafkaTopic", "slot_status", "Kafka topic suffix to publish to")
flagDebugAddr = flag.String("debugAddr", "localhost:6060", "pprof/metrics listen address")
)
@ -93,11 +101,25 @@ func main() {
go sched.Run(ctx, env.Nodes)
var kcl *kgo.Client
var topic string
if *flagKafka {
if *flagKafkaTopic == "" {
klog.Exitf("Kafka enabled but no topic specified")
}
topic = strings.Join([]string{env.Kafka.TopicPrefix, *flagKafkaTopic}, ".")
klog.Infof("Publishing to topic %s", topic)
kcl, err = kafka.NewClientFromEnv(env.Kafka)
if err != nil {
klog.Exitf("Failed to create kafka client: %v", err)
}
}
for _, node := range nodes {
node := node
go func() {
for {
if err := watchSlotUpdates(ctx, node, highest, sched); err != nil {
if err := watchSlotUpdates(ctx, node, highest, sched, kcl, topic); err != nil {
klog.Errorf("watchSlotUpdates on node %s, reconnecting: %v", node.Name, err)
}
time.Sleep(time.Second * 5)
@ -119,7 +141,7 @@ func main() {
select {}
}
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderschedule.Tracker) error {
func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderschedule.Tracker, kcl *kgo.Client, topic string) error {
timeout, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
@ -170,8 +192,68 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Ma
continue
}
klog.Infof("%s: slot=%d type=%s delay=%dms prop=%dms parent=%d stats=%v leader=%s",
node.Name, m.Slot, m.Type, delay.Milliseconds(), prop, m.Parent, m.Stats, sched.Get(m.Slot))
leader := sched.Get(m.Slot)
if kcl != nil {
var stats *networkv1.TxStats
if m.Type == ws.SlotsUpdatesFrozen {
stats = &networkv1.TxStats{
NumTransactionEntries: m.Stats.NumTransactionEntries,
NumSuccessfulTransactions: m.Stats.NumSuccessfulTransactions,
NumFailedTransactions: m.Stats.NumFailedTransactions,
MaxTransactionsPerEntry: m.Stats.MaxTransactionsPerEntry,
}
}
st := &networkv1.SlotStatus{
Slot: m.Slot,
Timestamp: uint64(ts.UnixMilli()),
Delay: uint64(delay.Milliseconds()),
Type: convertUpdateType(m.Type),
Parent: m.Parent,
Stats: stats,
Err: "", // TODO
Leader: leader.String(),
Source: node.Name,
}
// Fixed-length proto encoding
buf := proto.NewBuffer([]byte{})
if err := buf.EncodeMessage(st); err != nil {
panic(err)
}
r := &kgo.Record{Topic: topic, Value: buf.Bytes()}
kcl.Produce(ctx, r, func(_ *kgo.Record, err error) {
if err != nil {
klog.Warningf("failed to publish message to %s: %v", topic, err)
}
})
}
klog.V(1).Infof("%s: slot=%d type=%s delay=%dms prop=%dms parent=%d stats=%v leader=%s",
node.Name, m.Slot, m.Type, delay.Milliseconds(), prop, m.Parent, m.Stats, leader)
}
}
func convertUpdateType(t ws.SlotsUpdatesType) networkv1.SlotStatus_UpdateType {
switch t {
case ws.SlotsUpdatesFirstShredReceived:
return networkv1.SlotStatus_UPDATE_TYPE_FIRST_SHRED_RECEIVED
case ws.SlotsUpdatesCompleted:
return networkv1.SlotStatus_UPDATE_TYPE_COMPLETED
case ws.SlotsUpdatesCreatedBank:
return networkv1.SlotStatus_UPDATE_TYPE_CREATED_BANK
case ws.SlotsUpdatesFrozen:
return networkv1.SlotStatus_UPDATE_TYPE_FROZEN
case ws.SlotsUpdatesDead:
return networkv1.SlotStatus_UPDATE_TYPE_DEAD
case ws.SlotsUpdatesOptimisticConfirmation:
return networkv1.SlotStatus_UPDATE_TYPE_OPTIMISTIC_CONFIRMATION
case ws.SlotsUpdatesRoot:
return networkv1.SlotStatus_UPDATE_TYPE_ROOT
default:
panic("unknown slot update type " + t)
}
}
@ -195,7 +277,7 @@ func watchSlots(ctx context.Context, node *envv1.RPCNode) error {
return fmt.Errorf("recv: %w", err)
}
klog.Infof("%s: slot=%d root=%d parent=%d",
klog.V(1).Infof("%s: slot=%d root=%d parent=%d",
node.Name, m.Slot, m.Root, m.Parent)
}
}

3
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/google/nftables v0.0.0-20220611213346-a346d51f53b3
github.com/klauspost/compress v1.15.6
github.com/prometheus/client_golang v1.12.2
github.com/twmb/franz-go v1.6.0
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d
google.golang.org/protobuf v1.28.0
k8s.io/klog/v2 v2.60.1
@ -59,6 +60,7 @@ require (
github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/onsi/gomega v1.15.0 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.34.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
@ -67,6 +69,7 @@ require (
github.com/tidwall/gjson v1.14.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.1.0 // indirect
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect

9
go.sum
View File

@ -335,6 +335,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.4/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY=
github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -406,6 +407,8 @@ github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@ -493,6 +496,10 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/twmb/franz-go v1.6.0 h1:yri7YsVBe/k1LKcoZSLILgUI3U14e82qtD9i4VOcs9c=
github.com/twmb/franz-go v1.6.0/go.mod h1:xdMwpUIQL/JDKKwerc5qJQG8TU1SNIddfjKJJyqRJIg=
github.com/twmb/franz-go/pkg/kmsg v1.1.0 h1:csckTxG48q7Tem7ZwMxe2jAb0ehDNglxZccGnpqe4RU=
github.com/twmb/franz-go/pkg/kmsg v1.1.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA=
@ -546,6 +553,7 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -632,6 +640,7 @@ golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=

View File

@ -2,6 +2,8 @@ package envfile
import (
"testing"
envv1 "github.com/certusone/radiance/proto/env/v1"
)
func TestLoadEnvFile(t *testing.T) {
@ -17,4 +19,13 @@ func TestLoadEnvFile(t *testing.T) {
if len(env.Nodes) != 2 {
t.Errorf("Expected 2 node, got %d", len(env.Nodes))
}
if len(env.Kafka.Brokers) != 2 {
t.Errorf("Expected 2 broker, got %d", len(env.Kafka.Brokers))
}
if _, ok := env.Kafka.Encryption.(*envv1.Kafka_TlsEncryption); !ok {
t.Errorf("Expected TLS encryption, got %T", env.Kafka.Encryption)
}
}

View File

@ -10,3 +10,16 @@ nodes {
ws: "ws://localhost:8900"
tag: "local"
}
kafka {
brokers: "localhost:9092"
brokers: "localhost:9092"
topic_prefix: "radiance.dev"
sasl_auth: {
username: "radiance"
password: "radiance"
}
tls_encryption: {}
}

37
pkg/kafka/kafka.go Normal file
View File

@ -0,0 +1,37 @@
package kafka
import (
"crypto/tls"
"net"
"time"
envv1 "github.com/certusone/radiance/proto/env/v1"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
)
func NewClientFromEnv(env *envv1.Kafka, opts ...kgo.Opt) (*kgo.Client, error) {
opts = append(opts,
kgo.SeedBrokers(env.Brokers...),
)
if s, ok := env.Auth.(*envv1.Kafka_SaslAuth); ok {
m := scram.Auth{
User: s.SaslAuth.Username,
Pass: s.SaslAuth.Password}.AsSha256Mechanism()
opts = append(opts, kgo.SASL(m))
}
if _, ok := env.Encryption.(*envv1.Kafka_TlsEncryption); ok {
tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
opts = append(opts, kgo.Dialer(tlsDialer.DialContext))
}
cl, err := kgo.NewClient(opts...)
if err != nil {
return nil, err
}
return cl, nil
}

View File

@ -8,6 +8,7 @@ option go_package = "github.com/certusone/radiance/proto/envv1;envv1";
message Env {
repeated RPCNode nodes = 1;
Kafka kafka = 2;
}
message RPCNode {
@ -16,3 +17,36 @@ message RPCNode {
string ws = 3;
repeated string tag = 4;
}
message Kafka {
// List of seed brokers.
repeated string brokers = 1;
// Topic prefix (dot notation). The topic name is the prefix + application-specific name.
string topic_prefix = 2;
// SASL SCRAM-SHA-256 authentication.
message KafkaSASLAuth {
string username = 1;
string password = 2;
}
message KafkaNoAuth{}
// Authentication method.
oneof auth {
// No authentication.
KafkaNoAuth no_auth = 3;
// SASL authentication.
KafkaSASLAuth sasl_auth = 4;
}
// Encryption
oneof encryption {
// TLS encryption.
KafkaTLSEncryption tls_encryption = 6;
}
// TLS encryption.
message KafkaTLSEncryption {}
}

View File

@ -0,0 +1,55 @@
syntax = "proto3";
package proto.network.v1;
option go_package = "github.com/certusone/radiance/proto/network;networkv1";
// See client/src/rpc_response.rs.
message SlotStatus {
// Slot number
uint64 slot = 1;
// Millisecond UNIX timestamp of the observation on the Solana node.
// Depends on accurate local clocks.
uint64 timestamp = 2;
// One-way delay between the Solana node and the client.
uint64 delay = 3;
// Update type
enum UpdateType {
UPDATE_TYPE_UNSPECIFIED = 0;
UPDATE_TYPE_FIRST_SHRED_RECEIVED = 1;
UPDATE_TYPE_COMPLETED = 2;
UPDATE_TYPE_CREATED_BANK = 3;
UPDATE_TYPE_FROZEN = 4;
UPDATE_TYPE_DEAD = 5;
UPDATE_TYPE_OPTIMISTIC_CONFIRMATION = 6;
UPDATE_TYPE_ROOT = 7;
}
UpdateType type = 4;
// For type == CREATED_BANK, the parent slot number is included.
uint64 parent = 5;
// For type == FROZEN, extra transaction stats are included.
TxStats stats = 6;
// For type == DEAD, an error is included.
// TODO: solana-go doesn't currently expose this
string err = 7;
// Slot's leader as base58 string (looked up by the ingester)
string leader = 8;
// Source node identifier
string source = 9;
}
message TxStats {
uint64 num_transaction_entries = 1;
uint64 num_successful_transactions = 2;
uint64 num_failed_transactions = 3;
uint64 max_transactions_per_entry = 4;
}

73
schema/slot_status.sql Normal file
View File

@ -0,0 +1,73 @@
CREATE TABLE IF NOT EXISTS slot_status_queue
(
slot UInt64,
timestamp UInt64,
delay UInt64,
type Enum8(
'unspecified' = 0,
'firstShredReceived' = 1,
'completed' = 2,
'createdBank' = 3,
'frozen' = 4,
'dead' = 5,
'optimisticConfirmation' = 6,
'root' = 7
),
source String,
leader String,
parent UInt64,
"stats.num_transaction_entries" Nullable(UInt64),
"stats.num_successful_transactions" Nullable(UInt64),
"stats.num_failed_transactions" Nullable(UInt64),
"stats.max_transactions_per_entry" Nullable(UInt64),
err String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = '<snip>:30036',
kafka_topic_list = 'certus.radiance.slot_status',
kafka_group_name = 'heimdall-chdev1',
kafka_format = 'Protobuf',
kafka_max_block_size = 32,
format_schema = 'network.proto:SlotStatus';
CREATE TABLE IF NOT EXISTS slot_status
(
slot UInt64,
timestamp DateTime64(3),
delay UInt64,
type Enum8(
'unspecified' = 0,
'firstShredReceived' = 1,
'completed' = 2,
'createdBank' = 3,
'frozen' = 4,
'dead' = 5,
'optimisticConfirmation' = 6,
'root' = 7
),
source String,
leader String,
parent UInt64,
"stats.num_transaction_entries" Nullable(UInt64),
"stats.num_successful_transactions" Nullable(UInt64),
"stats.num_failed_transactions" Nullable(UInt64),
"stats.max_transactions_per_entry" Nullable(UInt64),
err String
) ENGINE = MergeTree()
PARTITION BY toDate(timestamp)
ORDER BY (slot, type, leader);
CREATE MATERIALIZED VIEW IF NOT EXISTS slot_status_view TO slot_status
AS
SELECT fromUnixTimestamp64Milli(timestamp) AS timestamp,
* EXCEPT ( timestamp )
FROM slot_status_queue;