remove unused bigtable data

Change-Id: I2b4a798b99f11be2ce9869d15dc633e1568974e1
This commit is contained in:
justinschuldt 2021-10-29 05:01:44 -05:00 committed by Justin Schuldt
parent 5f1e5dd211
commit 8fed2d0659
6 changed files with 1 additions and 126 deletions

View File

@ -11,7 +11,7 @@ import (
const tableName = "v2Events" const tableName = "v2Events"
// These column family names match the guardian code that does the inserting. // These column family names match the guardian code that does the inserting.
var columnFamilies = []string{"MessagePublication", "Signatures", "VAAState", "QuorumState"} var columnFamilies = []string{"MessagePublication", "QuorumState"}
func main() { func main() {
project := flag.String("project", "", "The Google Cloud Platform project ID. Required.") project := flag.String("project", "", "The Google Cloud Platform project ID. Required.")

View File

@ -23,12 +23,6 @@ func printRow(row bigtable.Row) {
if _, ok := row[columnFamilies[0]]; ok { if _, ok := row[columnFamilies[0]]; ok {
printItems(row[columnFamilies[0]]) printItems(row[columnFamilies[0]])
} }
if _, ok := row[columnFamilies[1]]; ok {
printItems(row[columnFamilies[1]])
}
if _, ok := row[columnFamilies[2]]; ok {
printSignatures(row[columnFamilies[2]])
}
if _, ok := row[columnFamilies[3]]; ok { if _, ok := row[columnFamilies[3]]; ok {
printItems(row[columnFamilies[3]]) printItems(row[columnFamilies[3]])
} }

View File

@ -14,10 +14,6 @@ The column families listed below represent data unique to a phase of the attesta
- `MessagePublication` holds data about a user's interaction with a Wormhole contract. Contains data from the Guardian's VAA struct. - `MessagePublication` holds data about a user's interaction with a Wormhole contract. Contains data from the Guardian's VAA struct.
- `Signatures` holds observed signatures from Guardians within a GuardianSet. Holds signatures independent of index within GuardianSet. This column family will provide an account of "which Guardians observed X transaction, and when?".
- `VAAState` records incremental updates toward Guardian consensus. The VAAState column family holds the progression of signatures of a GuardianSet. Each update to the Signatures list of the VAA struct is recorded. This column family will provide an account of "Which Guardians contributed to reaching quorum".
- `QuorumState` stores the signed VAA once quorum is reached. - `QuorumState` stores the signed VAA once quorum is reached.
### Column Qualifiers ### Column Qualifiers
@ -34,8 +30,4 @@ Each column qualifier below is prefixed with its column family.
- `MessagePublication:InitiatingTxID` The transaction identifier of the user's interaction with the contract. - `MessagePublication:InitiatingTxID` The transaction identifier of the user's interaction with the contract.
- `MessagePublication:Payload` The payload of the user's message. - `MessagePublication:Payload` The payload of the user's message.
- `Signatures:{GuardianAddress}` This column qualifier will be the address of the Guardian, and the data stored here will be the signature broadcast by the Guardian. There will be a column in this family for each Guardian address that appears in a Guardian set. The column qualifier is a part of the data that is recorded here. See the [BigTable design docs](https://cloud.google.com/bigtable/docs/schema-design#columns) for the thought process behind this approach.
- `VAAState:Signatures:{GuardianSetIndex}` a list of objects containing Guardian signatures and the index of the Guardian within the GuardianSet. This is the Signatures list from the Guardian's VAA struct. Note that a BigTable column can store many values (aka "cells") for a single row, unique by timestamp. This column will hold cells containing a list of signatures, with each cell list containing one more signature than the previous cell. This will show the order that signatures accumulate.
- `QuorumState:SignedVAA` the VAA with the signatures that contributed to quorum. - `QuorumState:SignedVAA` the VAA with the signatures that contributed to quorum.

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
node_common "github.com/certusone/wormhole/node/pkg/common" node_common "github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db" "github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
@ -205,19 +204,6 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
ConsistencyLevel: v.ConsistencyLevel, ConsistencyLevel: v.ConsistencyLevel,
} }
// report the individual signature
signatureReport := &reporter.VerifiedPeerSignature{
GuardianAddress: their_addr,
Signature: m.Signature,
EmitterChain: v.EmitterChain,
EmitterAddress: v.EmitterAddress,
Sequence: v.Sequence,
}
p.attestationEvents.ReportVerifiedPeerSignature(signatureReport)
// report the current VAAState
p.attestationEvents.ReportVAAStateUpdate(signed)
// 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
quorum := CalculateQuorum(len(gs.Keys)) quorum := CalculateQuorum(len(gs.Keys))

View File

@ -18,28 +18,11 @@ type (
// The native transaction identifier from the EmitterAddress interaction. // The native transaction identifier from the EmitterAddress interaction.
InitiatingTxID common.Hash InitiatingTxID common.Hash
} }
// VerifiedPeerSignature is a message observation from a Guardian that has been verified
// to be authentic and authorized to contribute to VAA quorum (ie. within the Guardian set).
VerifiedPeerSignature struct {
// The chain the transaction took place on
EmitterChain vaa.ChainID
// EmitterAddress of the contract that emitted the Message
EmitterAddress vaa.Address
// Sequence of the VAA
Sequence uint64
// The address of the Guardian that observed and signed the message
GuardianAddress common.Address
// Transaction Identifier of the initiating event
Signature []byte
}
) )
type lifecycleEventChannels struct { type lifecycleEventChannels struct {
// channel for each event // channel for each event
MessagePublicationC chan *MessagePublication MessagePublicationC chan *MessagePublication
VAAStateUpdateC chan *vaa.VAA
VerifiedSignatureC chan *VerifiedPeerSignature
VAAQuorumC chan *vaa.VAA VAAQuorumC chan *vaa.VAA
} }
@ -81,8 +64,6 @@ func (re *AttestationEventReporter) Subscribe() *activeSubscription {
re.logger.Debug("Subscribe for client", zap.Int("clientId", clientId)) re.logger.Debug("Subscribe for client", zap.Int("clientId", clientId))
channels := &lifecycleEventChannels{ channels := &lifecycleEventChannels{
MessagePublicationC: make(chan *MessagePublication, 50), MessagePublicationC: make(chan *MessagePublication, 50),
VAAStateUpdateC: make(chan *vaa.VAA, 50),
VerifiedSignatureC: make(chan *VerifiedPeerSignature, 50),
VAAQuorumC: make(chan *vaa.VAA, 50), VAAQuorumC: make(chan *vaa.VAA, 50),
} }
re.subs[clientId] = channels re.subs[clientId] = channels
@ -113,36 +94,6 @@ func (re *AttestationEventReporter) ReportMessagePublication(msg *MessagePublica
} }
} }
// ReportVerifiedPeerSignature is invoked after a SignedObservation is verified.
func (re *AttestationEventReporter) ReportVerifiedPeerSignature(msg *VerifiedPeerSignature) {
re.mu.RLock()
defer re.mu.RUnlock()
for client, sub := range re.subs {
select {
case sub.VerifiedSignatureC <- msg:
re.logger.Debug("published VerifiedPeerSignature to client", zap.Int("client", client))
default:
re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
}
}
}
// ReportVAAStateUpdate is invoked each time the local VAAState is updated.
func (re *AttestationEventReporter) ReportVAAStateUpdate(msg *vaa.VAA) {
re.mu.RLock()
defer re.mu.RUnlock()
for client, sub := range re.subs {
select {
case sub.VAAStateUpdateC <- msg:
re.logger.Debug("published VAAStateUpdate to client", zap.Int("client", client))
default:
re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
}
}
}
// ReportVAAQuorum is invoked when quorum is reached. // ReportVAAQuorum is invoked when quorum is reached.
func (re *AttestationEventReporter) ReportVAAQuorum(msg *vaa.VAA) { func (re *AttestationEventReporter) ReportVAAQuorum(msg *vaa.VAA) {
re.mu.RLock() re.mu.RLock()

View File

@ -1,9 +1,7 @@
package reporter package reporter
import ( import (
"bytes"
"context" "context"
"encoding/binary"
"fmt" "fmt"
"os" "os"
@ -96,52 +94,6 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable
zap.Error(err)) zap.Error(err))
errC <- err errC <- err
} }
case msg := <-sub.Channels.VerifiedSignatureC:
colFam := "Signatures"
mutation := bigtable.NewMutation()
ts := bigtable.Now()
addrHex := msg.GuardianAddress.Hex()
mutation.Set(colFam, addrHex, ts, msg.Signature)
// filter to see if this row already has this signature.
filter := bigtable.ChainFilters(
bigtable.FamilyFilter(colFam),
bigtable.ColumnFilter(addrHex))
conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
rowKey := makeRowKey(msg.EmitterChain, msg.EmitterAddress, msg.Sequence)
err := tbl.Apply(ctx, rowKey, conditionalMutation)
if err != nil {
logger.Warn("Failed to write signature to BigTable",
zap.String("rowKey", rowKey),
zap.String("columnFamily", colFam),
zap.Error(err))
errC <- err
}
case msg := <-sub.Channels.VAAStateUpdateC:
colFam := "VAAState"
mutation := bigtable.NewMutation()
ts := bigtable.Now()
buf := new(bytes.Buffer)
vaa.MustWrite(buf, binary.BigEndian, uint8(len(msg.Signatures)))
for _, sig := range msg.Signatures {
vaa.MustWrite(buf, binary.BigEndian, sig.Index)
buf.Write(sig.Signature[:])
}
mutation.Set(colFam, "Signatures", ts, buf.Bytes())
// TODO: conditional mutation that considers number of signatures in the VAA.
rowKey := makeRowKey(msg.EmitterChain, msg.EmitterAddress, msg.Sequence)
err := tbl.Apply(ctx, rowKey, mutation)
if err != nil {
logger.Warn("Failed to write VAA update to BigTable",
zap.String("rowKey", rowKey),
zap.String("columnFamily", colFam),
zap.Error(err))
errC <- err
}
case msg := <-sub.Channels.VAAQuorumC: case msg := <-sub.Channels.VAAQuorumC:
colFam := "QuorumState" colFam := "QuorumState"
mutation := bigtable.NewMutation() mutation := bigtable.NewMutation()