From c795f21c2017ede7cf32091e3cdd504b596dcf1c Mon Sep 17 00:00:00 2001 From: Leopold Schabel Date: Thu, 16 Jun 2022 02:13:17 +0200 Subject: [PATCH] cmd/rpc/slots: add kafka publishing --- cmd/rpc/slots/slot.go | 94 ++++++++++++++++++++++++++++-- go.mod | 3 + go.sum | 9 +++ pkg/envfile/env_test.go | 11 ++++ pkg/envfile/testdata/env.prototxt | 13 +++++ pkg/kafka/kafka.go | 37 ++++++++++++ proto/env/v1/env.proto | 34 +++++++++++ proto/network/v1/slot_status.proto | 55 +++++++++++++++++ schema/slot_status.sql | 73 +++++++++++++++++++++++ 9 files changed, 323 insertions(+), 6 deletions(-) create mode 100644 pkg/kafka/kafka.go create mode 100644 proto/network/v1/slot_status.proto create mode 100644 schema/slot_status.sql diff --git a/cmd/rpc/slots/slot.go b/cmd/rpc/slots/slot.go index 0f9680c..f11a069 100644 --- a/cmd/rpc/slots/slot.go +++ b/cmd/rpc/slots/slot.go @@ -11,16 +11,24 @@ import ( "time" "github.com/certusone/radiance/pkg/envfile" + "github.com/certusone/radiance/pkg/kafka" "github.com/certusone/radiance/pkg/leaderschedule" envv1 "github.com/certusone/radiance/proto/env/v1" + networkv1 "github.com/certusone/radiance/proto/network/v1" "github.com/gagliardetto/solana-go/rpc/ws" + "github.com/golang/protobuf/proto" + "github.com/twmb/franz-go/pkg/kgo" "k8s.io/klog/v2" ) var ( flagEnv = flag.String("env", ".env.prototxt", "Env file (.prototxt)") flagOnly = flag.String("only", "", "Only watch specified nodes (comma-separated)") - flagType = flag.String("type", "", "Only print specific types") + + flagType = flag.String("type", "", "Only print specific types to log") + + flagKafka = flag.Bool("kafka", false, "Enable Kafka publishing") + flagKafkaTopic = flag.String("kafkaTopic", "slot_status", "Kafka topic suffix to publish to") flagDebugAddr = flag.String("debugAddr", "localhost:6060", "pprof/metrics listen address") ) @@ -93,11 +101,25 @@ func main() { go sched.Run(ctx, env.Nodes) + var kcl *kgo.Client + var topic string + if *flagKafka { + if *flagKafkaTopic == "" { + klog.Exitf("Kafka enabled but no topic specified") + } + topic = strings.Join([]string{env.Kafka.TopicPrefix, *flagKafkaTopic}, ".") + klog.Infof("Publishing to topic %s", topic) + kcl, err = kafka.NewClientFromEnv(env.Kafka) + if err != nil { + klog.Exitf("Failed to create kafka client: %v", err) + } + } + for _, node := range nodes { node := node go func() { for { - if err := watchSlotUpdates(ctx, node, highest, sched); err != nil { + if err := watchSlotUpdates(ctx, node, highest, sched, kcl, topic); err != nil { klog.Errorf("watchSlotUpdates on node %s, reconnecting: %v", node.Name, err) } time.Sleep(time.Second * 5) @@ -119,7 +141,7 @@ func main() { select {} } -func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderschedule.Tracker) error { +func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Map, sched *leaderschedule.Tracker, kcl *kgo.Client, topic string) error { timeout, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -170,8 +192,68 @@ func watchSlotUpdates(ctx context.Context, node *envv1.RPCNode, highest *sync.Ma continue } - klog.Infof("%s: slot=%d type=%s delay=%dms prop=%dms parent=%d stats=%v leader=%s", - node.Name, m.Slot, m.Type, delay.Milliseconds(), prop, m.Parent, m.Stats, sched.Get(m.Slot)) + leader := sched.Get(m.Slot) + + if kcl != nil { + var stats *networkv1.TxStats + if m.Type == ws.SlotsUpdatesFrozen { + stats = &networkv1.TxStats{ + NumTransactionEntries: m.Stats.NumTransactionEntries, + NumSuccessfulTransactions: m.Stats.NumSuccessfulTransactions, + NumFailedTransactions: m.Stats.NumFailedTransactions, + MaxTransactionsPerEntry: m.Stats.MaxTransactionsPerEntry, + } + } + + st := &networkv1.SlotStatus{ + Slot: m.Slot, + Timestamp: uint64(ts.UnixMilli()), + Delay: uint64(delay.Milliseconds()), + Type: convertUpdateType(m.Type), + Parent: m.Parent, + Stats: stats, + Err: "", // TODO + Leader: leader.String(), + Source: node.Name, + } + + // Fixed-length proto encoding + buf := proto.NewBuffer([]byte{}) + if err := buf.EncodeMessage(st); err != nil { + panic(err) + } + + r := &kgo.Record{Topic: topic, Value: buf.Bytes()} + kcl.Produce(ctx, r, func(_ *kgo.Record, err error) { + if err != nil { + klog.Warningf("failed to publish message to %s: %v", topic, err) + } + }) + } + + klog.V(1).Infof("%s: slot=%d type=%s delay=%dms prop=%dms parent=%d stats=%v leader=%s", + node.Name, m.Slot, m.Type, delay.Milliseconds(), prop, m.Parent, m.Stats, leader) + } +} + +func convertUpdateType(t ws.SlotsUpdatesType) networkv1.SlotStatus_UpdateType { + switch t { + case ws.SlotsUpdatesFirstShredReceived: + return networkv1.SlotStatus_UPDATE_TYPE_FIRST_SHRED_RECEIVED + case ws.SlotsUpdatesCompleted: + return networkv1.SlotStatus_UPDATE_TYPE_COMPLETED + case ws.SlotsUpdatesCreatedBank: + return networkv1.SlotStatus_UPDATE_TYPE_CREATED_BANK + case ws.SlotsUpdatesFrozen: + return networkv1.SlotStatus_UPDATE_TYPE_FROZEN + case ws.SlotsUpdatesDead: + return networkv1.SlotStatus_UPDATE_TYPE_DEAD + case ws.SlotsUpdatesOptimisticConfirmation: + return networkv1.SlotStatus_UPDATE_TYPE_OPTIMISTIC_CONFIRMATION + case ws.SlotsUpdatesRoot: + return networkv1.SlotStatus_UPDATE_TYPE_ROOT + default: + panic("unknown slot update type " + t) } } @@ -195,7 +277,7 @@ func watchSlots(ctx context.Context, node *envv1.RPCNode) error { return fmt.Errorf("recv: %w", err) } - klog.Infof("%s: slot=%d root=%d parent=%d", + klog.V(1).Infof("%s: slot=%d root=%d parent=%d", node.Name, m.Slot, m.Root, m.Parent) } } diff --git a/go.mod b/go.mod index 7b7f0df..5077d2b 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/google/nftables v0.0.0-20220611213346-a346d51f53b3 github.com/klauspost/compress v1.15.6 github.com/prometheus/client_golang v1.12.2 + github.com/twmb/franz-go v1.6.0 golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d google.golang.org/protobuf v1.28.0 k8s.io/klog/v2 v2.60.1 @@ -59,6 +60,7 @@ require ( github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/onsi/gomega v1.15.0 // indirect + github.com/pierrec/lz4/v4 v4.1.14 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.34.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect @@ -67,6 +69,7 @@ require ( github.com/tidwall/gjson v1.14.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/twmb/franz-go/pkg/kmsg v1.1.0 // indirect github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.9.0 // indirect diff --git a/go.sum b/go.sum index 9e8bf2d..424d9ac 100644 --- a/go.sum +++ b/go.sum @@ -335,6 +335,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.4/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.15.6 h1:6D9PcO8QWu0JyaQ2zUMmu16T1T+zjjEpP91guRsvDfY= github.com/klauspost/compress v1.15.6/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -406,6 +407,8 @@ github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU= github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE= +github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -493,6 +496,10 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/twmb/franz-go v1.6.0 h1:yri7YsVBe/k1LKcoZSLILgUI3U14e82qtD9i4VOcs9c= +github.com/twmb/franz-go v1.6.0/go.mod h1:xdMwpUIQL/JDKKwerc5qJQG8TU1SNIddfjKJJyqRJIg= +github.com/twmb/franz-go/pkg/kmsg v1.1.0 h1:csckTxG48q7Tem7ZwMxe2jAb0ehDNglxZccGnpqe4RU= +github.com/twmb/franz-go/pkg/kmsg v1.1.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f h1:p4VB7kIXpOQvVn1ZaTIVp+3vuYAXFe3OJEvjbUYJLaA= @@ -546,6 +553,7 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= +golang.org/x/crypto v0.0.0-20220518034528-6f7dac969898/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -632,6 +640,7 @@ golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= diff --git a/pkg/envfile/env_test.go b/pkg/envfile/env_test.go index 595d009..8f7ef30 100644 --- a/pkg/envfile/env_test.go +++ b/pkg/envfile/env_test.go @@ -2,6 +2,8 @@ package envfile import ( "testing" + + envv1 "github.com/certusone/radiance/proto/env/v1" ) func TestLoadEnvFile(t *testing.T) { @@ -17,4 +19,13 @@ func TestLoadEnvFile(t *testing.T) { if len(env.Nodes) != 2 { t.Errorf("Expected 2 node, got %d", len(env.Nodes)) } + + if len(env.Kafka.Brokers) != 2 { + t.Errorf("Expected 2 broker, got %d", len(env.Kafka.Brokers)) + } + + if _, ok := env.Kafka.Encryption.(*envv1.Kafka_TlsEncryption); !ok { + t.Errorf("Expected TLS encryption, got %T", env.Kafka.Encryption) + } + } diff --git a/pkg/envfile/testdata/env.prototxt b/pkg/envfile/testdata/env.prototxt index 9eaac73..d87b4cd 100644 --- a/pkg/envfile/testdata/env.prototxt +++ b/pkg/envfile/testdata/env.prototxt @@ -10,3 +10,16 @@ nodes { ws: "ws://localhost:8900" tag: "local" } + +kafka { + brokers: "localhost:9092" + brokers: "localhost:9092" + topic_prefix: "radiance.dev" + + sasl_auth: { + username: "radiance" + password: "radiance" + } + + tls_encryption: {} +} diff --git a/pkg/kafka/kafka.go b/pkg/kafka/kafka.go new file mode 100644 index 0000000..f33109d --- /dev/null +++ b/pkg/kafka/kafka.go @@ -0,0 +1,37 @@ +package kafka + +import ( + "crypto/tls" + "net" + "time" + + envv1 "github.com/certusone/radiance/proto/env/v1" + "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sasl/scram" +) + +func NewClientFromEnv(env *envv1.Kafka, opts ...kgo.Opt) (*kgo.Client, error) { + opts = append(opts, + kgo.SeedBrokers(env.Brokers...), + ) + + if s, ok := env.Auth.(*envv1.Kafka_SaslAuth); ok { + m := scram.Auth{ + User: s.SaslAuth.Username, + Pass: s.SaslAuth.Password}.AsSha256Mechanism() + + opts = append(opts, kgo.SASL(m)) + } + + if _, ok := env.Encryption.(*envv1.Kafka_TlsEncryption); ok { + tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}} + opts = append(opts, kgo.Dialer(tlsDialer.DialContext)) + } + + cl, err := kgo.NewClient(opts...) + if err != nil { + return nil, err + } + + return cl, nil +} diff --git a/proto/env/v1/env.proto b/proto/env/v1/env.proto index a81b932..950bed7 100644 --- a/proto/env/v1/env.proto +++ b/proto/env/v1/env.proto @@ -8,6 +8,7 @@ option go_package = "github.com/certusone/radiance/proto/envv1;envv1"; message Env { repeated RPCNode nodes = 1; + Kafka kafka = 2; } message RPCNode { @@ -16,3 +17,36 @@ message RPCNode { string ws = 3; repeated string tag = 4; } + +message Kafka { + // List of seed brokers. + repeated string brokers = 1; + // Topic prefix (dot notation). The topic name is the prefix + application-specific name. + string topic_prefix = 2; + + // SASL SCRAM-SHA-256 authentication. + message KafkaSASLAuth { + string username = 1; + string password = 2; + } + + message KafkaNoAuth{} + + // Authentication method. + oneof auth { + // No authentication. + KafkaNoAuth no_auth = 3; + // SASL authentication. + KafkaSASLAuth sasl_auth = 4; + } + + // Encryption + oneof encryption { + // TLS encryption. + KafkaTLSEncryption tls_encryption = 6; + } + + // TLS encryption. + message KafkaTLSEncryption {} +} + diff --git a/proto/network/v1/slot_status.proto b/proto/network/v1/slot_status.proto new file mode 100644 index 0000000..be40e75 --- /dev/null +++ b/proto/network/v1/slot_status.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; + +package proto.network.v1; + +option go_package = "github.com/certusone/radiance/proto/network;networkv1"; + +// See client/src/rpc_response.rs. +message SlotStatus { + // Slot number + uint64 slot = 1; + + // Millisecond UNIX timestamp of the observation on the Solana node. + // Depends on accurate local clocks. + uint64 timestamp = 2; + + // One-way delay between the Solana node and the client. + uint64 delay = 3; + + // Update type + enum UpdateType { + UPDATE_TYPE_UNSPECIFIED = 0; + UPDATE_TYPE_FIRST_SHRED_RECEIVED = 1; + UPDATE_TYPE_COMPLETED = 2; + UPDATE_TYPE_CREATED_BANK = 3; + UPDATE_TYPE_FROZEN = 4; + UPDATE_TYPE_DEAD = 5; + UPDATE_TYPE_OPTIMISTIC_CONFIRMATION = 6; + UPDATE_TYPE_ROOT = 7; + } + + UpdateType type = 4; + + // For type == CREATED_BANK, the parent slot number is included. + uint64 parent = 5; + + // For type == FROZEN, extra transaction stats are included. + TxStats stats = 6; + + // For type == DEAD, an error is included. + // TODO: solana-go doesn't currently expose this + string err = 7; + + // Slot's leader as base58 string (looked up by the ingester) + string leader = 8; + + // Source node identifier + string source = 9; +} + +message TxStats { + uint64 num_transaction_entries = 1; + uint64 num_successful_transactions = 2; + uint64 num_failed_transactions = 3; + uint64 max_transactions_per_entry = 4; +} diff --git a/schema/slot_status.sql b/schema/slot_status.sql new file mode 100644 index 0000000..8ad70d5 --- /dev/null +++ b/schema/slot_status.sql @@ -0,0 +1,73 @@ +CREATE TABLE IF NOT EXISTS slot_status_queue +( + slot UInt64, + timestamp UInt64, + delay UInt64, + type Enum8( + 'unspecified' = 0, + 'firstShredReceived' = 1, + 'completed' = 2, + 'createdBank' = 3, + 'frozen' = 4, + 'dead' = 5, + 'optimisticConfirmation' = 6, + 'root' = 7 + ), + + source String, + leader String, + + parent UInt64, + + "stats.num_transaction_entries" Nullable(UInt64), + "stats.num_successful_transactions" Nullable(UInt64), + "stats.num_failed_transactions" Nullable(UInt64), + "stats.max_transactions_per_entry" Nullable(UInt64), + + err String +) ENGINE = Kafka() + SETTINGS + kafka_broker_list = ':30036', + kafka_topic_list = 'certus.radiance.slot_status', + kafka_group_name = 'heimdall-chdev1', + kafka_format = 'Protobuf', + kafka_max_block_size = 32, + format_schema = 'network.proto:SlotStatus'; + +CREATE TABLE IF NOT EXISTS slot_status +( + slot UInt64, + timestamp DateTime64(3), + delay UInt64, + type Enum8( + 'unspecified' = 0, + 'firstShredReceived' = 1, + 'completed' = 2, + 'createdBank' = 3, + 'frozen' = 4, + 'dead' = 5, + 'optimisticConfirmation' = 6, + 'root' = 7 + ), + + + source String, + leader String, + + parent UInt64, + + "stats.num_transaction_entries" Nullable(UInt64), + "stats.num_successful_transactions" Nullable(UInt64), + "stats.num_failed_transactions" Nullable(UInt64), + "stats.max_transactions_per_entry" Nullable(UInt64), + + err String +) ENGINE = MergeTree() + PARTITION BY toDate(timestamp) + ORDER BY (slot, type, leader); + +CREATE MATERIALIZED VIEW IF NOT EXISTS slot_status_view TO slot_status +AS +SELECT fromUnixTimestamp64Milli(timestamp) AS timestamp, + * EXCEPT ( timestamp ) +FROM slot_status_queue;