From b663e2dc560d30c0823d24bb8b3d7e756e8c66c0 Mon Sep 17 00:00:00 2001 From: Leo Date: Fri, 21 Aug 2020 13:00:40 +0200 Subject: [PATCH] Generalize token lockup processor --- bridge/cmd/guardiand/main.go | 12 +- bridge/cmd/guardiand/p2p.go | 6 +- .../guardiand/{ethlockup.go => processor.go} | 145 ++++++++++-------- bridge/pkg/ethereum/watcher.go | 10 +- bridge/pkg/solana/watcher.go | 12 +- bridge/pkg/vaa/structs.go | 20 ++- ethereum/src/send-lockups.js | 2 +- proto/gossip/v1/gossip.proto | 10 +- 8 files changed, 129 insertions(+), 88 deletions(-) rename bridge/cmd/guardiand/{ethlockup.go => processor.go} (57%) diff --git a/bridge/cmd/guardiand/main.go b/bridge/cmd/guardiand/main.go index 3893d61be..8f2efbfb7 100644 --- a/bridge/cmd/guardiand/main.go +++ b/bridge/cmd/guardiand/main.go @@ -164,17 +164,17 @@ func main() { // Outbound gossip message queue sendC := make(chan []byte) - // Inbound ETH observations - ethObsvC := make(chan *gossipv1.EthLockupObservation, 50) // TODO: is this an acceptable mitigation for bursts? + // Inbound observations + obsvC := make(chan *gossipv1.LockupObservation) // VAAs to submit to Solana - vaaC := make(chan *vaa.VAA) + solanaVaaC := make(chan *vaa.VAA) // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { // TODO: use a dependency injection framework like wire? - if err := supervisor.Run(ctx, "p2p", p2p(ethObsvC, sendC)); err != nil { + if err := supervisor.Run(ctx, "p2p", p2p(obsvC, sendC)); err != nil { return err } @@ -182,13 +182,13 @@ func main() { // TODO: on-demand fetching of guardian set to avoid restarting ethwatch? if err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{ "ethwatch": ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, *ethConfirmations, lockC, setC).Run, - "ethlockup": ethLockupProcessor(lockC, setC, gk, sendC, ethObsvC, vaaC), + "processor": vaaConsensusProcessor(lockC, setC, gk, sendC, obsvC, solanaVaaC), }); err != nil { return err } if err := supervisor.Run(ctx, "solana", - solana.NewSolanaBridgeWatcher(*agentRPC, lockC, vaaC).Run); err != nil { + solana.NewSolanaBridgeWatcher(*agentRPC, lockC, solanaVaaC).Run); err != nil { return err } diff --git a/bridge/cmd/guardiand/p2p.go b/bridge/cmd/guardiand/p2p.go index 82a032abd..bb91c4894 100644 --- a/bridge/cmd/guardiand/p2p.go +++ b/bridge/cmd/guardiand/p2p.go @@ -26,7 +26,7 @@ import ( "github.com/certusone/wormhole/bridge/pkg/supervisor" ) -func p2p(ethObsvC chan *gossipv1.EthLockupObservation, sendC chan []byte) func(ctx context.Context) error { +func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx context.Context) error { return func(ctx context.Context) (re error) { logger := supervisor.Logger(ctx) @@ -240,8 +240,8 @@ func p2p(ethObsvC chan *gossipv1.EthLockupObservation, sendC chan []byte) func(c logger.Info("heartbeat received", zap.Any("value", m.Heartbeat), zap.String("from", envl.GetFrom().String())) - case *gossipv1.GossipMessage_EthLockupObservation: - ethObsvC <- m.EthLockupObservation + case *gossipv1.GossipMessage_LockupObservation: + obsvC <- m.LockupObservation default: logger.Warn("received unknown message type (running outdated software?)", zap.Any("payload", msg.Message), diff --git a/bridge/cmd/guardiand/ethlockup.go b/bridge/cmd/guardiand/processor.go similarity index 57% rename from bridge/cmd/guardiand/ethlockup.go rename to bridge/cmd/guardiand/processor.go index b05cc0e3d..d861104dc 100644 --- a/bridge/cmd/guardiand/ethlockup.go +++ b/bridge/cmd/guardiand/processor.go @@ -3,7 +3,6 @@ package main import ( "context" "crypto/ecdsa" - "crypto/sha256" "encoding/hex" "fmt" "math" @@ -23,32 +22,35 @@ import ( // aggregationState represents a single node's aggregation of guardian signatures. type ( - lockupState struct { + vaaState struct { firstObserved time.Time ourVAA *vaa.VAA signatures map[ethcommon.Address][]byte } - lockupMap map[string]*lockupState + vaaMap map[string]*vaaState aggregationState struct { - lockupSignatures lockupMap + vaaSignatures vaaMap } ) -func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.GuardianSet, gk *ecdsa.PrivateKey, sendC chan []byte, obsvC chan *gossipv1.EthLockupObservation, vaaC chan *vaa.VAA) func(ctx context.Context) error { +func vaaConsensusProcessor(lockC chan *common.ChainLock, setC chan *common.GuardianSet, gk *ecdsa.PrivateKey, sendC chan []byte, obsvC chan *gossipv1.LockupObservation, vaaC chan *vaa.VAA) func(ctx context.Context) error { return func(ctx context.Context) error { logger := supervisor.Logger(ctx) our_addr := crypto.PubkeyToAddress(gk.PublicKey) - state := &aggregationState{lockupMap{}} + state := &aggregationState{vaaMap{}} + + // Get initial validator set from Ethereum. We could also fetch it from Solana, + // because both sets are synchronized, we simply made an arbitrary decision to use Ethereum. - // Get initial validator set logger.Info("waiting for initial validator set to be fetched from Ethereum") gs := <-setC logger.Info("current guardian set received", zap.Strings("set", gs.KeysAsHexStrings()), zap.Uint32("index", gs.Index)) + // In debug mode, node 0 submits a VAA that configures the desired number of guardians to both chains. if *unsafeDevMode { idx, err := devnet.GetDevnetIndex() if err != nil { @@ -56,19 +58,20 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian } if idx == 0 && (uint(len(gs.Keys)) != *devNumGuardians) { - vaa := devnet.DevnetGuardianSetVSS(*devNumGuardians) + v := devnet.DevnetGuardianSetVSS(*devNumGuardians) logger.Info(fmt.Sprintf("guardian set has %d members, expecting %d - submitting VAA", len(gs.Keys), *devNumGuardians), - zap.Any("vaa", vaa)) + zap.Any("v", v)) timeout, _ := context.WithTimeout(ctx, 15*time.Second) - tx, err := devnet.SubmitVAA(timeout, *ethRPC, vaa) + tx, err := devnet.SubmitVAA(timeout, *ethRPC, v) if err != nil { logger.Error("failed to submit devnet guardian set change", zap.Error(err)) } + logger.Info("devnet guardian set change submitted to Ethereum", zap.Any("tx", tx)) - logger.Info("devnet guardian set change submitted", zap.Any("tx", tx)) + // TODO: submit to Solana } } @@ -82,10 +85,14 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian zap.Uint32("index", gs.Index)) case k := <-lockC: supervisor.Logger(ctx).Info("lockup confirmed", - zap.String("source", hex.EncodeToString(k.SourceAddress[:])), - zap.String("target", hex.EncodeToString(k.TargetAddress[:])), - zap.String("amount", k.Amount.String()), - zap.String("txhash", k.TxHash.String()), + zap.Stringer("source_chain", k.SourceChain), + zap.Stringer("target_chain", k.TargetChain), + zap.Stringer("source_addr", k.SourceAddress), + zap.Stringer("target_addr", k.TargetAddress), + zap.Stringer("token_chain", k.TokenChain), + zap.Stringer("token_addr", k.TokenAddress), + zap.Stringer("amount", k.Amount), + zap.Stringer("txhash", k.TxHash), zap.Time("timestamp", k.Timestamp), ) @@ -93,12 +100,12 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian if !ok { logger.Error("we're not in the guardian set - refusing to sign", zap.Uint32("index", gs.Index), - zap.String("our_addr", our_addr.Hex()), + zap.Stringer("our_addr", our_addr), zap.Any("set", gs.KeysAsHexStrings())) break } - // All nodes will create the exact same VAA and sign its SHA256 digest. + // All nodes will create the exact same VAA and sign its digest. // Consensus is established on this digest. v := &vaa.VAA{ @@ -107,49 +114,46 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian Signatures: nil, Timestamp: k.Timestamp, Payload: &vaa.BodyTransfer{ - Nonce: 0, // TODO - SourceChain: vaa.ChainIDEthereum, - TargetChain: vaa.ChainIDSolana, + Nonce: k.Nonce, + SourceChain: k.SourceChain, + TargetChain: k.TargetChain, SourceAddress: k.SourceAddress, TargetAddress: k.TargetAddress, Asset: &vaa.AssetMeta{ - Chain: vaa.ChainIDEthereum, + Chain: k.TokenChain, Address: k.TokenAddress, }, Amount: k.Amount, }, } - b, err := v.Marshal() + // Generate digest of the unsigned VAA. + digest, err := v.SigningMsg() if err != nil { panic(err) } - h := sha256.Sum256(b) // TODO: use SigningMsg? - - signData, err := v.SigningMsg() - if err != nil { - panic(err) - } - s, err := crypto.Sign(signData.Bytes(), gk) + // Sign the digest using our node's guardian key. + s, err := crypto.Sign(digest.Bytes(), gk) if err != nil { panic(err) } - - logger.Info("observed and signed confirmed lockup on Ethereum", - zap.String("txhash", k.TxHash.String()), - zap.String("vaahash", hex.EncodeToString(h[:])), + logger.Info("observed and signed confirmed lockup", + zap.Stringer("source_chain", k.SourceChain), + zap.Stringer("target_chain", k.TargetChain), + zap.Stringer("txhash", k.TxHash), + zap.String("digest", hex.EncodeToString(digest.Bytes())), zap.String("signature", hex.EncodeToString(s)), - zap.Int("us", us)) + zap.Int("our_index", us)) - obsv := gossipv1.EthLockupObservation{ + obsv := gossipv1.LockupObservation{ Addr: crypto.PubkeyToAddress(gk.PublicKey).Bytes(), - Hash: h[:], + Hash: digest.Bytes(), Signature: s, } - w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_EthLockupObservation{EthLockupObservation: &obsv}} + w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_LockupObservation{LockupObservation: &obsv}} msg, err := proto.Marshal(&w) if err != nil { @@ -159,27 +163,26 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian sendC <- msg // Store our VAA in case we're going to submit it to Solana - hash := hex.EncodeToString(h[:]) + hash := hex.EncodeToString(digest.Bytes()) - if state.lockupSignatures[hash] == nil { - state.lockupSignatures[hash] = &lockupState{ + if state.vaaSignatures[hash] == nil { + state.vaaSignatures[hash] = &vaaState{ firstObserved: time.Now(), signatures: map[ethcommon.Address][]byte{}, } - // TODO: do we receive and add our own signature below? } - state.lockupSignatures[hash].ourVAA = v + state.vaaSignatures[hash].ourVAA = v case m := <-obsvC: - logger.Info("received eth lockup observation", - zap.String("hash", hex.EncodeToString(m.Hash)), - zap.Binary("signature", m.Signature), - zap.Binary("addr", m.Addr)) + logger.Info("received lockup observation", + zap.String("digest", hex.EncodeToString(m.Hash)), + zap.String("signature", hex.EncodeToString(m.Signature)), + zap.String("addr", hex.EncodeToString(m.Addr))) their_addr := ethcommon.BytesToAddress(m.Addr) _, ok := gs.KeyIndex(their_addr) if !ok { - logger.Warn("received eth observation by unknown guardian - is our guardian set outdated?", + logger.Warn("received observation by unknown guardian - is our guardian set outdated?", zap.String("their_addr", their_addr.Hex()), zap.Any("current_set", gs.KeysAsHexStrings()), ) @@ -187,25 +190,26 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian } // TODO: timeout/garbage collection for lockup state + // TODO: rebroadcast signatures for VAAs that fail to reach consensus // []byte isn't hashable in a map. Paying a small extra cost to for encoding for easier debugging. hash := hex.EncodeToString(m.Hash) - if state.lockupSignatures[hash] == nil { - state.lockupSignatures[hash] = &lockupState{ + if state.vaaSignatures[hash] == nil { + state.vaaSignatures[hash] = &vaaState{ firstObserved: time.Now(), signatures: map[ethcommon.Address][]byte{}, } } - state.lockupSignatures[hash].signatures[their_addr] = m.Signature + state.vaaSignatures[hash].signatures[their_addr] = m.Signature // Enumerate guardian set and check for signatures agg := make([]bool, len(gs.Keys)) var sigs []*vaa.Signature for i, a := range gs.Keys { // TODO: verify signature - s, ok := state.lockupSignatures[hash].signatures[a] + s, ok := state.vaaSignatures[hash].signatures[a] if ok { var bs [65]byte @@ -222,10 +226,10 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian agg[i] = ok } - if state.lockupSignatures[hash].ourVAA != nil { + if state.vaaSignatures[hash].ourVAA != nil { // We have seen it on chain! // Deep copy the VAA and add signatures - v := state.lockupSignatures[hash].ourVAA + v := state.vaaSignatures[hash].ourVAA signed := &vaa.VAA{ Version: v.Version, GuardianSetIndex: v.GuardianSetIndex, @@ -237,14 +241,14 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. quorum := int(math.Ceil((float64(len(gs.Keys)) / 3) * 2)) - logger.Info("aggregation state for eth lockup", - zap.String("vaahash", hash), + logger.Info("aggregation state for VAA", + zap.String("digest", hash), zap.Any("set", gs.KeysAsHexStrings()), zap.Uint32("index", gs.Index), zap.Bools("aggregation", agg), zap.Int("required_sigs", quorum), zap.Int("have_sigs", len(sigs)), - ) + ) if *unsafeDevMode && len(sigs) >= quorum { idx, err := devnet.GetDevnetIndex() @@ -257,20 +261,31 @@ func ethLockupProcessor(lockC chan *common.ChainLock, setC chan *common.Guardian panic(err) } - logger.Info("submitting signed VAA to Solana", - zap.String("vaahash", hash), - zap.Any("vaa", signed), - zap.Binary("bytes", vaaBytes)) + if t, ok := v.Payload.(*vaa.BodyTransfer); ok { + if t.TargetChain == vaa.ChainIDSolana { + logger.Info("submitting signed VAA to Solana", + zap.String("digest", hash), + zap.Any("vaa", signed), + zap.String("bytes", hex.EncodeToString(vaaBytes))) - // TODO: actually submit to Solana once the agent works and has a reasonable key - if idx == 0 { - vaaC <- state.lockupSignatures[hash].ourVAA + if idx == 0 { + vaaC <- state.vaaSignatures[hash].ourVAA + } + } else { + logger.Error("we don't know how to submit this VAA", + zap.String("digest", hash), + zap.Any("vaa", signed), + zap.String("bytes", hex.EncodeToString(vaaBytes)), + zap.Stringer("target_chain", t.TargetChain)) + } + } else { + panic(fmt.Sprintf("unknown VAA payload type: %+v", v)) } } else if !*unsafeDevMode { - panic("not implemented") // TODO + panic("not implemented") // TODO } else { logger.Info("quorum not met, doing nothing", - zap.String("vaahash", hash)) + zap.String("digest", hash)) } } } diff --git a/bridge/pkg/ethereum/watcher.go b/bridge/pkg/ethereum/watcher.go index c5df913d5..1480014e5 100644 --- a/bridge/pkg/ethereum/watcher.go +++ b/bridge/pkg/ethereum/watcher.go @@ -115,7 +115,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { } logger.Info("found new lockup transaction", zap.Stringer("tx", ev.Raw.TxHash), - zap.Uint64("number", ev.Raw.BlockNumber)) + zap.Uint64("block", ev.Raw.BlockNumber)) e.pendingLocksGuard.Lock() e.pendingLocks[ev.Raw.TxHash] = &pendingLock{ lock: lock, @@ -159,7 +159,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { return case ev := <-headSink: start := time.Now() - logger.Info("processing new header", zap.Stringer("number", ev.Number)) + logger.Info("processing new header", zap.Stringer("block", ev.Number)) e.pendingLocksGuard.Lock() blockNumberU := ev.Number.Uint64() @@ -168,7 +168,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { // Transaction was dropped and never picked up again if pLock.height+4*e.minConfirmations <= blockNumberU { logger.Debug("lockup timed out", zap.Stringer("tx", pLock.lock.TxHash), - zap.Stringer("number", ev.Number)) + zap.Stringer("block", ev.Number)) delete(e.pendingLocks, hash) continue } @@ -176,14 +176,14 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { // Transaction is now ready if pLock.height+e.minConfirmations <= ev.Number.Uint64() { logger.Debug("lockup confirmed", zap.Stringer("tx", pLock.lock.TxHash), - zap.Stringer("number", ev.Number)) + zap.Stringer("block", ev.Number)) delete(e.pendingLocks, hash) e.lockChan <- pLock.lock } } e.pendingLocksGuard.Unlock() - logger.Info("processed new header", zap.Stringer("number", ev.Number), + logger.Info("processed new header", zap.Stringer("block", ev.Number), zap.Duration("took", time.Since(start))) } } diff --git a/bridge/pkg/solana/watcher.go b/bridge/pkg/solana/watcher.go index aedef9b0d..575693825 100644 --- a/bridge/pkg/solana/watcher.go +++ b/bridge/pkg/solana/watcher.go @@ -2,6 +2,7 @@ package ethereum import ( "context" + "encoding/hex" "fmt" "time" @@ -90,15 +91,22 @@ func (e *SolanaBridgeWatcher) Run(ctx context.Context) error { panic(err) } + // Calculate digest so we can log it (TODO: refactor to vaa method? we do this in different places) + m, err := v.SigningMsg() + if err != nil { + panic(err) + } + h := hex.EncodeToString(m.Bytes()) + timeout, _ := context.WithTimeout(ctx, 15*time.Second) res, err := c.SubmitVAA(timeout, &agentv1.SubmitVAARequest{Vaa: vaaBytes}) if err != nil { - logger.Error("failed to submit VAA", zap.Error(err)) + logger.Error("failed to submit VAA", zap.Error(err), zap.String("digest", h)) break } logger.Info("submitted VAA", - zap.String("signature", res.Signature)) + zap.String("signature", res.Signature), zap.String("digest", h)) } } }() diff --git a/bridge/pkg/vaa/structs.go b/bridge/pkg/vaa/structs.go index d1efbdd11..097b22483 100644 --- a/bridge/pkg/vaa/structs.go +++ b/bridge/pkg/vaa/structs.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/ecdsa" "encoding/binary" + "encoding/hex" "fmt" "io" "math/big" @@ -43,7 +44,7 @@ type ( // Index of the validator Index uint8 // Signature data - Signature [65]byte + Signature [65]byte // TODO: hex marshaller } // AssetMeta describes an asset within the Wormhole protocol @@ -84,6 +85,21 @@ type ( } ) +func (a Address) String() string { + return hex.EncodeToString(a[:]) +} + +func (c ChainID) String() string { + switch c { + case ChainIDSolana: + return "solana" + case ChainIDEthereum: + return "ethereum" + default: + return fmt.Sprintf("unknown chain ID: %d", c) + } +} + const ( ActionGuardianSetUpdate Action = 0x01 ActionTransfer Action = 0x10 @@ -97,6 +113,8 @@ const ( SupportedVAAVersion = 0x01 ) + + // Unmarshal deserializes the binary representation of a VAA func Unmarshal(data []byte) (*VAA, error) { if len(data) < minVAALength { diff --git a/ethereum/src/send-lockups.js b/ethereum/src/send-lockups.js index 5ca87e51f..6ca79286d 100644 --- a/ethereum/src/send-lockups.js +++ b/ethereum/src/send-lockups.js @@ -32,7 +32,7 @@ module.exports = function(callback) { await token.approve(bridge.address, "1000000000000000000"); while (true) { - let ev = await bridge.lockAssets(token.address, "1000", "0x1230000000000000000000000000000000000000000000000000000000000000", 3, 0); + let ev = await bridge.lockAssets(token.address, "1000", "0x1230000000000000000000000000000000000000000000000000000000000000", 1 /* Solana */, 0); let block = await web3.eth.getBlock('latest'); console.log("block", block.number, "with txs", block.transactions, "and time", block.timestamp); await advanceBlock(); diff --git a/proto/gossip/v1/gossip.proto b/proto/gossip/v1/gossip.proto index 317b5c700..c43484327 100644 --- a/proto/gossip/v1/gossip.proto +++ b/proto/gossip/v1/gossip.proto @@ -7,7 +7,7 @@ option go_package = "proto/gossip/v1;gossipv1"; message GossipMessage { oneof message { Heartbeat heartbeat = 1; - EthLockupObservation eth_lockup_observation = 2; + LockupObservation lockup_observation = 2; } } @@ -31,8 +31,8 @@ message Heartbeat { // TODO: software version/release } -// An EthLockupObservation is a signed statement by a given guardian node -// that they observed a finalized lockup on Ethereum. +// A LockupObservation is a signed statement by a given guardian node +// that they observed a finalized lockup on a chain. // // The lockup is uniquely identified by its hashed (tx_hash, nonce, values...) tuple. // @@ -40,10 +40,10 @@ message Heartbeat { // guardians submitting valid signatures for a given hash, they can be assembled into a VAA. // // Messages without valid signature are dropped unceremoniously. -message EthLockupObservation { +message LockupObservation { // Guardian pubkey as truncated eth address. bytes addr = 1; - // The lockup's deterministic, unique hash. See pkg/common/chainlock.go. + // The lockup's deterministic, unique hash. bytes hash = 2; // ECSDA signature of the hash using the node's guardian key. bytes signature = 3;