diff --git a/bridge/pkg/processor/lockup.go b/bridge/pkg/processor/lockup.go new file mode 100644 index 000000000..ae7993337 --- /dev/null +++ b/bridge/pkg/processor/lockup.go @@ -0,0 +1,114 @@ +package processor + +import ( + "context" + "encoding/hex" + "time" + + common2 "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/certusone/wormhole/bridge/pkg/common" + "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/bridge/pkg/supervisor" + "github.com/certusone/wormhole/bridge/pkg/vaa" +) + +// handleLockup processes a lockup received from a chain and instantiates our deterministic copy of the VAA +func (p *Processor) handleLockup(ctx context.Context, k *common.ChainLock) { + supervisor.Logger(ctx).Info("lockup confirmed", + zap.Stringer("source_chain", k.SourceChain), + zap.Stringer("target_chain", k.TargetChain), + zap.Stringer("source_addr", k.SourceAddress), + zap.Stringer("target_addr", k.TargetAddress), + zap.Stringer("token_chain", k.TokenChain), + zap.Stringer("token_addr", k.TokenAddress), + zap.Stringer("amount", k.Amount), + zap.Stringer("txhash", k.TxHash), + zap.Time("timestamp", k.Timestamp), + ) + + us, ok := p.gs.KeyIndex(p.ourAddr) + if !ok { + p.logger.Error("we're not in the guardian set - refusing to sign", + zap.Uint32("index", p.gs.Index), + zap.Stringer("our_addr", p.ourAddr), + zap.Any("set", p.gs.KeysAsHexStrings())) + return + } + + // All nodes will create the exact same VAA and sign its digest. + // Consensus is established on this digest. + + v := &vaa.VAA{ + Version: vaa.SupportedVAAVersion, + GuardianSetIndex: p.gs.Index, + Signatures: nil, + Timestamp: k.Timestamp, + Payload: &vaa.BodyTransfer{ + Nonce: k.Nonce, + SourceChain: k.SourceChain, + TargetChain: k.TargetChain, + SourceAddress: k.SourceAddress, + TargetAddress: k.TargetAddress, + Asset: &vaa.AssetMeta{ + Chain: k.TokenChain, + Address: k.TokenAddress, + Decimals: k.TokenDecimals, + }, + Amount: k.Amount, + }, + } + + // Generate digest of the unsigned VAA. + digest, err := v.SigningMsg() + if err != nil { + panic(err) + } + + // Sign the digest using our node's guardian key. + s, err := crypto.Sign(digest.Bytes(), p.gk) + if err != nil { + panic(err) + } + + p.logger.Info("observed and signed confirmed lockup", + zap.Stringer("source_chain", k.SourceChain), + zap.Stringer("target_chain", k.TargetChain), + zap.Stringer("txhash", k.TxHash), + zap.String("digest", hex.EncodeToString(digest.Bytes())), + zap.String("signature", hex.EncodeToString(s)), + zap.Int("our_index", us)) + + obsv := gossipv1.LockupObservation{ + Addr: crypto.PubkeyToAddress(p.gk.PublicKey).Bytes(), + Hash: digest.Bytes(), + Signature: s, + } + + w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_LockupObservation{LockupObservation: &obsv}} + + msg, err := proto.Marshal(&w) + if err != nil { + panic(err) + } + + p.sendC <- msg + + // Store our VAA in case we're going to submit it to Solana + hash := hex.EncodeToString(digest.Bytes()) + + if p.state.vaaSignatures[hash] == nil { + p.state.vaaSignatures[hash] = &vaaState{ + firstObserved: time.Now(), + signatures: map[common2.Address][]byte{}, + } + } + + p.state.vaaSignatures[hash].ourVAA = v + + // Fast path for our own signature + go func() { p.obsvC <- &obsv }() +} diff --git a/bridge/pkg/processor/observation.go b/bridge/pkg/processor/observation.go new file mode 100644 index 000000000..00f8d7fd6 --- /dev/null +++ b/bridge/pkg/processor/observation.go @@ -0,0 +1,185 @@ +package processor + +import ( + "context" + "encoding/hex" + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "go.uber.org/zap" + + "github.com/certusone/wormhole/bridge/pkg/devnet" + "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/bridge/pkg/vaa" +) + +func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.LockupObservation) { + // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) + // + // Note that observations are never tied to the (verified) p2p identity key - the p2p network + // identity is completely decoupled from the guardian identity, p2p is just transport. + + p.logger.Info("received lockup observation", + zap.String("digest", hex.EncodeToString(m.Hash)), + zap.String("signature", hex.EncodeToString(m.Signature)), + zap.String("addr", hex.EncodeToString(m.Addr))) + + // Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers + // the public key that was used to sign the payload. + pk, err := crypto.Ecrecover(m.Hash, m.Signature) + if err != nil { + p.logger.Warn("failed to verify signature on lockup observation", + zap.String("digest", hex.EncodeToString(m.Hash)), + zap.String("signature", hex.EncodeToString(m.Signature)), + zap.String("addr", hex.EncodeToString(m.Addr)), + zap.Error(err)) + return + } + + // Verify that m.Addr matches the public key that signed m.Hash. + their_addr := common.BytesToAddress(m.Addr) + signer_pk := common.BytesToAddress(crypto.Keccak256(pk[1:])[12:]) + + if their_addr != signer_pk { + p.logger.Info("invalid lockup observation - address does not match pubkey", + zap.String("digest", hex.EncodeToString(m.Hash)), + zap.String("signature", hex.EncodeToString(m.Signature)), + zap.String("addr", hex.EncodeToString(m.Addr)), + zap.String("pk", signer_pk.Hex())) + return + } + + // Verify that m.Addr is included in the current guardian set. + _, ok := p.gs.KeyIndex(their_addr) + if !ok { + p.logger.Warn("received observation by unknown guardian - is our guardian set outdated?", + zap.String("their_addr", their_addr.Hex()), + zap.Any("current_set", p.gs.KeysAsHexStrings()), + ) + return + } + + // Hooray! Now, we have verified all fields on LockupObservation and know that it includes + // a valid signature by an active guardian. We still don't fully trust them, as they may be + // byzantine, but now we know who we're dealing with. + + // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. + hash := hex.EncodeToString(m.Hash) + + if p.state.vaaSignatures[hash] == nil { + // We haven't yet seen this lockup ourselves, and therefore do not know what the VAA looks like. + // However, we have established that a valid guardian has signed it, and therefore we can + // already start aggregating signatures for it. + // + // TODO: a malicious guardian can DoS this by creating fake lockups + p.state.vaaSignatures[hash] = &vaaState{ + firstObserved: time.Now(), + signatures: map[common.Address][]byte{}, + } + } + + p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature + + // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. + agg := make([]bool, len(p.gs.Keys)) + var sigs []*vaa.Signature + for i, a := range p.gs.Keys { + s, ok := p.state.vaaSignatures[hash].signatures[a] + + if ok { + var bs [65]byte + if n := copy(bs[:], s); n != 65 { + panic(fmt.Sprintf("invalid sig len: %d", n)) + } + + sigs = append(sigs, &vaa.Signature{ + Index: uint8(i), + Signature: bs, + }) + } + + agg[i] = ok + } + + if p.state.vaaSignatures[hash].ourVAA != nil { + // We have seen it on chain! + // Deep copy the VAA and add signatures + v := p.state.vaaSignatures[hash].ourVAA + signed := &vaa.VAA{ + Version: v.Version, + GuardianSetIndex: v.GuardianSetIndex, + Signatures: sigs, + Timestamp: v.Timestamp, + Payload: v.Payload, + } + + // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. + quorum := CalculateQuorum(len(p.gs.Keys)) + + p.logger.Info("aggregation state for VAA", + zap.String("digest", hash), + zap.Any("set", p.gs.KeysAsHexStrings()), + zap.Uint32("index", p.gs.Index), + zap.Bools("aggregation", agg), + zap.Int("required_sigs", quorum), + zap.Int("have_sigs", len(sigs)), + ) + + if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted { + vaaBytes, err := signed.Marshal() + if err != nil { + panic(err) + } + + if t, ok := v.Payload.(*vaa.BodyTransfer); ok { + switch { + case t.TargetChain == vaa.ChainIDEthereum: + // In dev mode, submit VAA to Ethereum. For production, the bridge won't + // have an Ethereum account and the user retrieves the VAA and submits the transactions themselves. + if p.devnetMode { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + tx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, signed) + cancel() + if err != nil { + if strings.Contains(err.Error(), "VAA was already executed") { + p.logger.Info("lockup already submitted to Ethereum by another node, ignoring", + zap.Error(err), zap.String("digest", hash)) + } else { + p.logger.Error("failed to submit lockup to Ethereum", + zap.Error(err), zap.String("digest", hash)) + } + break + } + p.logger.Info("lockup submitted to Ethereum", zap.Any("tx", tx), zap.String("digest", hash)) + } + + // Cross-submit to Solana for data availability + fallthrough + case t.TargetChain == vaa.ChainIDSolana: + p.logger.Info("submitting signed VAA to Solana", + zap.String("digest", hash), + zap.Any("vaa", signed), + zap.String("bytes", hex.EncodeToString(vaaBytes))) + + p.vaaC <- signed + default: + p.logger.Error("we don't know how to submit this VAA", + zap.String("digest", hash), + zap.Any("vaa", signed), + zap.String("bytes", hex.EncodeToString(vaaBytes)), + zap.Stringer("target_chain", t.TargetChain)) + } + + p.state.vaaSignatures[hash].submitted = true + } else { + panic(fmt.Sprintf("unknown VAA payload type: %+v", v)) + } + } else { + p.logger.Info("quorum not met, doing nothing", + zap.String("digest", hash)) + } + } +} diff --git a/bridge/pkg/processor/processor.go b/bridge/pkg/processor/processor.go index 01c270961..7ed4e18f7 100644 --- a/bridge/pkg/processor/processor.go +++ b/bridge/pkg/processor/processor.go @@ -3,15 +3,12 @@ package processor import ( "context" "crypto/ecdsa" - "encoding/hex" "fmt" - "strings" "time" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "go.uber.org/zap" - "google.golang.org/protobuf/proto" "github.com/certusone/wormhole/bridge/pkg/common" "github.com/certusone/wormhole/bridge/pkg/devnet" @@ -67,6 +64,8 @@ type Processor struct { gs *common.GuardianSet // state is the current runtime VAA view state *aggregationState + // gk pk as eth address + ourAddr ethcommon.Address } func NewProcessor( @@ -92,14 +91,13 @@ func NewProcessor( devnetNumGuardians: devnetNumGuardians, devnetEthRPC: devnetEthRPC, - logger: supervisor.Logger(ctx), - state: &aggregationState{vaaMap{}}, + logger: supervisor.Logger(ctx), + state: &aggregationState{vaaMap{}}, + ourAddr: crypto.PubkeyToAddress(gk.PublicKey), } } func (p *Processor) Run(ctx context.Context) error { - ourAddr := crypto.PubkeyToAddress(p.gk.PublicKey) - for { select { case <-ctx.Done(): @@ -115,265 +113,9 @@ func (p *Processor) Run(ctx context.Context) error { return err } case k := <-p.lockC: - supervisor.Logger(ctx).Info("lockup confirmed", - zap.Stringer("source_chain", k.SourceChain), - zap.Stringer("target_chain", k.TargetChain), - zap.Stringer("source_addr", k.SourceAddress), - zap.Stringer("target_addr", k.TargetAddress), - zap.Stringer("token_chain", k.TokenChain), - zap.Stringer("token_addr", k.TokenAddress), - zap.Stringer("amount", k.Amount), - zap.Stringer("txhash", k.TxHash), - zap.Time("timestamp", k.Timestamp), - ) - - us, ok := p.gs.KeyIndex(ourAddr) - if !ok { - p.logger.Error("we're not in the guardian set - refusing to sign", - zap.Uint32("index", p.gs.Index), - zap.Stringer("our_addr", ourAddr), - zap.Any("set", p.gs.KeysAsHexStrings())) - break - } - - // All nodes will create the exact same VAA and sign its digest. - // Consensus is established on this digest. - - v := &vaa.VAA{ - Version: vaa.SupportedVAAVersion, - GuardianSetIndex: p.gs.Index, - Signatures: nil, - Timestamp: k.Timestamp, - Payload: &vaa.BodyTransfer{ - Nonce: k.Nonce, - SourceChain: k.SourceChain, - TargetChain: k.TargetChain, - SourceAddress: k.SourceAddress, - TargetAddress: k.TargetAddress, - Asset: &vaa.AssetMeta{ - Chain: k.TokenChain, - Address: k.TokenAddress, - Decimals: k.TokenDecimals, - }, - Amount: k.Amount, - }, - } - - // Generate digest of the unsigned VAA. - digest, err := v.SigningMsg() - if err != nil { - panic(err) - } - - // Sign the digest using our node's guardian key. - s, err := crypto.Sign(digest.Bytes(), p.gk) - if err != nil { - panic(err) - } - - p.logger.Info("observed and signed confirmed lockup", - zap.Stringer("source_chain", k.SourceChain), - zap.Stringer("target_chain", k.TargetChain), - zap.Stringer("txhash", k.TxHash), - zap.String("digest", hex.EncodeToString(digest.Bytes())), - zap.String("signature", hex.EncodeToString(s)), - zap.Int("our_index", us)) - - obsv := gossipv1.LockupObservation{ - Addr: crypto.PubkeyToAddress(p.gk.PublicKey).Bytes(), - Hash: digest.Bytes(), - Signature: s, - } - - w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_LockupObservation{LockupObservation: &obsv}} - - msg, err := proto.Marshal(&w) - if err != nil { - panic(err) - } - - p.sendC <- msg - - // Store our VAA in case we're going to submit it to Solana - hash := hex.EncodeToString(digest.Bytes()) - - if p.state.vaaSignatures[hash] == nil { - p.state.vaaSignatures[hash] = &vaaState{ - firstObserved: time.Now(), - signatures: map[ethcommon.Address][]byte{}, - } - } - - p.state.vaaSignatures[hash].ourVAA = v - - // Fast path for our own signature - go func() { p.obsvC <- &obsv }() + p.handleLockup(ctx, k) case m := <-p.obsvC: - // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) - // - // Note that observations are never tied to the (verified) p2p identity key - the p2p network - // identity is completely decoupled from the guardian identity, p2p is just transport. - - p.logger.Info("received lockup observation", - zap.String("digest", hex.EncodeToString(m.Hash)), - zap.String("signature", hex.EncodeToString(m.Signature)), - zap.String("addr", hex.EncodeToString(m.Addr))) - - // Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers - // the public key that was used to sign the payload. - pk, err := crypto.Ecrecover(m.Hash, m.Signature) - if err != nil { - p.logger.Warn("failed to verify signature on lockup observation", - zap.String("digest", hex.EncodeToString(m.Hash)), - zap.String("signature", hex.EncodeToString(m.Signature)), - zap.String("addr", hex.EncodeToString(m.Addr)), - zap.Error(err)) - break - } - - // Verify that m.Addr matches the public key that signed m.Hash. - their_addr := ethcommon.BytesToAddress(m.Addr) - signer_pk := ethcommon.BytesToAddress(crypto.Keccak256(pk[1:])[12:]) - - if their_addr != signer_pk { - p.logger.Info("invalid lockup observation - address does not match pubkey", - zap.String("digest", hex.EncodeToString(m.Hash)), - zap.String("signature", hex.EncodeToString(m.Signature)), - zap.String("addr", hex.EncodeToString(m.Addr)), - zap.String("pk", signer_pk.Hex())) - break - } - - // Verify that m.Addr is included in the current guardian set. - _, ok := p.gs.KeyIndex(their_addr) - if !ok { - p.logger.Warn("received observation by unknown guardian - is our guardian set outdated?", - zap.String("their_addr", their_addr.Hex()), - zap.Any("current_set", p.gs.KeysAsHexStrings()), - ) - break - } - - // Hooray! Now, we have verified all fields on LockupObservation and know that it includes - // a valid signature by an active guardian. We still don't fully trust them, as they may be - // byzantine, but now we know who we're dealing with. - - // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. - hash := hex.EncodeToString(m.Hash) - - if p.state.vaaSignatures[hash] == nil { - // We haven't yet seen this lockup ourselves, and therefore do not know what the VAA looks like. - // However, we have established that a valid guardian has signed it, and therefore we can - // already start aggregating signatures for it. - // - // TODO: a malicious guardian can DoS this by creating fake lockups - p.state.vaaSignatures[hash] = &vaaState{ - firstObserved: time.Now(), - signatures: map[ethcommon.Address][]byte{}, - } - } - - p.state.vaaSignatures[hash].signatures[their_addr] = m.Signature - - // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. - agg := make([]bool, len(p.gs.Keys)) - var sigs []*vaa.Signature - for i, a := range p.gs.Keys { - s, ok := p.state.vaaSignatures[hash].signatures[a] - - if ok { - var bs [65]byte - if n := copy(bs[:], s); n != 65 { - panic(fmt.Sprintf("invalid sig len: %d", n)) - } - - sigs = append(sigs, &vaa.Signature{ - Index: uint8(i), - Signature: bs, - }) - } - - agg[i] = ok - } - - if p.state.vaaSignatures[hash].ourVAA != nil { - // We have seen it on chain! - // Deep copy the VAA and add signatures - v := p.state.vaaSignatures[hash].ourVAA - signed := &vaa.VAA{ - Version: v.Version, - GuardianSetIndex: v.GuardianSetIndex, - Signatures: sigs, - Timestamp: v.Timestamp, - Payload: v.Payload, - } - - // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. - quorum := CalculateQuorum(len(p.gs.Keys)) - - p.logger.Info("aggregation state for VAA", - zap.String("digest", hash), - zap.Any("set", p.gs.KeysAsHexStrings()), - zap.Uint32("index", p.gs.Index), - zap.Bools("aggregation", agg), - zap.Int("required_sigs", quorum), - zap.Int("have_sigs", len(sigs)), - ) - - if len(sigs) >= quorum && !p.state.vaaSignatures[hash].submitted { - vaaBytes, err := signed.Marshal() - if err != nil { - panic(err) - } - - if t, ok := v.Payload.(*vaa.BodyTransfer); ok { - switch { - case t.TargetChain == vaa.ChainIDEthereum: - // In dev mode, submit VAA to Ethereum. For production, the bridge won't - // have an Ethereum account and the user retrieves the VAA and submits the transactions themselves. - if p.devnetMode { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - tx, err := devnet.SubmitVAA(timeout, p.devnetEthRPC, signed) - cancel() - if err != nil { - if strings.Contains(err.Error(), "VAA was already executed") { - p.logger.Info("lockup already submitted to Ethereum by another node, ignoring", - zap.Error(err), zap.String("digest", hash)) - } else { - p.logger.Error("failed to submit lockup to Ethereum", - zap.Error(err), zap.String("digest", hash)) - } - break - } - p.logger.Info("lockup submitted to Ethereum", zap.Any("tx", tx), zap.String("digest", hash)) - } - - // Cross-submit to Solana for data availability - fallthrough - case t.TargetChain == vaa.ChainIDSolana: - p.logger.Info("submitting signed VAA to Solana", - zap.String("digest", hash), - zap.Any("vaa", signed), - zap.String("bytes", hex.EncodeToString(vaaBytes))) - - p.vaaC <- signed - default: - p.logger.Error("we don't know how to submit this VAA", - zap.String("digest", hash), - zap.Any("vaa", signed), - zap.String("bytes", hex.EncodeToString(vaaBytes)), - zap.Stringer("target_chain", t.TargetChain)) - } - - p.state.vaaSignatures[hash].submitted = true - } else { - panic(fmt.Sprintf("unknown VAA payload type: %+v", v)) - } - } else { - p.logger.Info("quorum not met, doing nothing", - zap.String("digest", hash)) - } - } + p.handleObservation(ctx, m) } } }