fixes for relay

This commit is contained in:
Ethan Buchman 2017-05-21 22:22:58 -04:00
parent 7b83dea0d6
commit 444b96dfe7
8 changed files with 119 additions and 64 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/tendermint/go-wire"
"github.com/tendermint/merkleeyes/iavl"
"github.com/tendermint/tendermint/rpc/client"
tmtypes "github.com/tendermint/tendermint/types"
)
@ -120,7 +121,8 @@ func accountCmd(cmd *cobra.Command, args []string) error {
return errors.Errorf("Account address (%v) is invalid hex: %v\n", addrHex, err)
}
acc, err := getAcc(nodeFlag, addr)
httpClient := client.NewHTTP(nodeFlag, "/websocket")
acc, err := getAccWithClient(httpClient, addr)
if err != nil {
return err
}

View File

@ -52,7 +52,7 @@ func init() {
}
func loop(addr1, addr2, id1, id2 string) {
latestSeq := -1
nextSeq := 0
// load the priv key
privKey, err := LoadKey(fromFlag)
@ -62,7 +62,11 @@ func loop(addr1, addr2, id1, id2 string) {
}
// relay from chain1 to chain2
thisRelayer := relayer{privKey, id2, addr2}
thisRelayer := newRelayer(privKey, id2, addr2)
logger.Info(fmt.Sprintf("Relaying from chain %v on %v to chain %v on %v", id1, addr1, id2, addr2))
httpClient := client.NewHTTP(addr1, "/websocket")
OUTER:
for {
@ -71,69 +75,83 @@ OUTER:
// get the latest ibc packet sequence number
key := fmt.Sprintf("ibc,egress,%v,%v", id1, id2)
query, err := Query(addr1, []byte(key))
query, err := queryWithClient(httpClient, []byte(key))
if err != nil {
logger.Error(err.Error())
logger.Error("Error querying for latest sequence", "key", key, "error", err.Error())
continue OUTER
}
if len(query.Value) == 0 {
// nothing yet
continue OUTER
}
seq, err := strconv.ParseUint(string(query.Value), 10, 64)
if err != nil {
logger.Error(err.Error())
logger.Error("Error parsing sequence number from query", "query.Value", query.Value, "error", err.Error())
continue OUTER
}
// if there's a new packet, relay the header and commit data
if latestSeq < int(seq) {
header, commit, err := getHeaderAndCommit(addr1, int(query.Height))
if err != nil {
logger.Error(err.Error())
continue OUTER
}
// update the chain state on the other chain
ibcTx := ibc.IBCUpdateChainTx{
Header: *header,
Commit: *commit,
}
if err := thisRelayer.appTx(ibcTx); err != nil {
logger.Error(err.Error())
continue OUTER
}
}
// get all packets since the last one we relayed
for ; latestSeq < int(seq); latestSeq++ {
key := fmt.Sprintf("ibc,egress,%v,%v,%d", id1, id2, latestSeq)
query, err := Query(addr1, []byte(key))
for ; nextSeq <= int(seq); nextSeq++ {
key := fmt.Sprintf("ibc,egress,%v,%v,%d", id1, id2, nextSeq)
query, err := queryWithClient(httpClient, []byte(key))
if err != nil {
logger.Error(err.Error())
logger.Error("Error querying for packet", "seqeuence", nextSeq, "key", key, "error", err.Error())
continue OUTER
}
var packet ibc.Packet
err = wire.ReadBinaryBytes(query.Value, &packet)
if err != nil {
logger.Error(err.Error())
logger.Error("Error unmarshalling packet", "key", key, "query.Value", query.Value, "error", err.Error())
continue OUTER
}
proof := new(iavl.IAVLProof)
err = wire.ReadBinaryBytes(query.Proof, proof)
err = wire.ReadBinaryBytes(query.Proof, &proof)
if err != nil {
logger.Error(err.Error())
logger.Error("Error unmarshalling proof", "query.Proof", query.Proof, "error", err.Error())
continue OUTER
}
// query.Height is actually for the next block,
// so wait a block before we fetch the header & commit
if err := waitForBlock(httpClient); err != nil {
logger.Error("Error waiting for a block", "addr", addr1, "error", err.Error())
continue OUTER
}
// get the header and commit from the height the query was done at
res, err := httpClient.Commit(int(query.Height))
if err != nil {
logger.Error("Error fetching header and commits", "height", query.Height, "error", err.Error())
continue OUTER
}
// update the chain state on the other chain
updateTx := ibc.IBCUpdateChainTx{
Header: *res.Header,
Commit: *res.Commit,
}
logger.Info("Updating chain", "src-chain", id1, "height", res.Header.Height, "appHash", res.Header.AppHash)
if err := thisRelayer.appTx(updateTx); err != nil {
logger.Error("Error creating/sending IBCUpdateChainTx", "error", err.Error())
continue OUTER
}
// relay the packet and proof
ibcTx := ibc.IBCPacketPostTx{
logger.Info("Relaying packet", "src-chain", id1, "height", query.Height, "sequence", nextSeq)
postTx := ibc.IBCPacketPostTx{
FromChainID: id1,
FromChainHeight: uint64(query.Height),
FromChainHeight: query.Height,
Packet: packet,
Proof: proof,
}
if err := thisRelayer.appTx(ibcTx); err != nil {
logger.Error(err.Error())
continue OUTER
if err := thisRelayer.appTx(postTx); err != nil {
logger.Error("Error creating/sending IBCPacketPostTx", "error", err.Error())
// dont `continue OUTER` here. the error might be eg. Already exists
// TODO: catch this programmatically ?
}
}
}
@ -143,22 +161,36 @@ type relayer struct {
privKey *Key
chainID string
nodeAddr string
client *client.HTTP
}
func newRelayer(privKey *Key, chainID, nodeAddr string) *relayer {
httpClient := client.NewHTTP(nodeAddr, "/websocket")
return &relayer{
privKey: privKey,
chainID: chainID,
nodeAddr: nodeAddr,
client: httpClient,
}
}
func (r *relayer) appTx(ibcTx ibc.IBCTx) error {
sequence, err := getSeq(r.privKey.Address[:])
acc, err := getAccWithClient(r.client, r.privKey.Address[:])
if err != nil {
return err
}
sequence := acc.Sequence + 1
data := []byte(wire.BinaryBytes(struct {
ibc.IBCTx `json:"unwrap"`
}{ibcTx}))
input := types.NewTxInput(r.privKey.PubKey, types.Coins{}, sequence)
smallCoins := types.Coin{"mycoin", 1}
input := types.NewTxInput(r.privKey.PubKey, types.Coins{smallCoins}, sequence)
tx := &types.AppTx{
Gas: 0,
Fee: types.Coin{"mycoin", 1},
Fee: smallCoins,
Name: "IBC",
Input: input,
Data: data,
@ -169,17 +201,16 @@ func (r *relayer) appTx(ibcTx ibc.IBCTx) error {
types.Tx `json:"unwrap"`
}{tx}))
data, log, err := broadcastTxToNode(r.nodeAddr, txBytes)
data, log, err := broadcastTxWithClient(r.client, txBytes)
if err != nil {
return err
}
fmt.Printf("Response: %X ; %s\n", data, log)
_, _ = data, log
return nil
}
// broadcast the transaction to tendermint
func broadcastTxToNode(nodeAddr string, tx tmtypes.Tx) ([]byte, string, error) {
httpClient := client.NewHTTP(nodeAddr, "/websocket")
func broadcastTxWithClient(httpClient *client.HTTP, tx tmtypes.Tx) ([]byte, string, error) {
res, err := httpClient.BroadcastTxCommit(tx)
if err != nil {
return nil, "", errors.Errorf("Error on broadcast tx: %v", err)

View File

@ -220,12 +220,12 @@ func broadcastTx(tx types.Tx) ([]byte, string, error) {
// if the sequence flag is set, return it;
// else, fetch the account by querying the app and return the sequence number
func getSeq(address []byte) (int, error) {
if seqFlag >= 0 {
return seqFlag, nil
}
acc, err := getAcc(txNodeFlag, address)
httpClient := client.NewHTTP(txNodeFlag, "/websocket")
acc, err := getAccWithClient(httpClient, address)
if err != nil {
return 0, err
}

View File

@ -100,6 +100,10 @@ func StripHex(s string) string {
func Query(tmAddr string, key []byte) (*abci.ResultQuery, error) {
httpClient := client.NewHTTP(tmAddr, "/websocket")
return queryWithClient(httpClient, key)
}
func queryWithClient(httpClient *client.HTTP, key []byte) (*abci.ResultQuery, error) {
res, err := httpClient.ABCIQuery("/key", key, true)
if err != nil {
return nil, errors.Errorf("Error calling /abci_query: %v", err)
@ -111,10 +115,10 @@ func Query(tmAddr string, key []byte) (*abci.ResultQuery, error) {
}
// fetch the account by querying the app
func getAcc(tmAddr string, address []byte) (*types.Account, error) {
func getAccWithClient(httpClient *client.HTTP, address []byte) (*types.Account, error) {
key := types.AccountKey(address)
response, err := Query(tmAddr, key)
response, err := queryWithClient(httpClient, key)
if err != nil {
return nil, err
}
@ -146,3 +150,23 @@ func getHeaderAndCommit(tmAddr string, height int) (*tmtypes.Header, *tmtypes.Co
return header, commit, nil
}
func waitForBlock(httpClient *client.HTTP) error {
res, err := httpClient.Status()
if err != nil {
return err
}
lastHeight := res.LatestBlockHeight
for {
res, err := httpClient.Status()
if err != nil {
return err
}
if res.LatestBlockHeight > lastHeight {
break
}
}
return nil
}

View File

@ -123,8 +123,8 @@ echo ""
echo "... creating egress packet on chain1"
echo ""
# create a packet on chain1 destined for chain2
PAYLOAD="DEADBEEF" #TODO
basecoin tx ibc --amount 10mycoin $CHAIN_FLAGS1 packet create --ibc_from $CHAIN_ID1 --to $CHAIN_ID2 --type coin --payload $PAYLOAD --ibc_sequence 1
PAYLOAD="010104DEADBEEF" #TODO
basecoin tx ibc --amount 10mycoin $CHAIN_FLAGS1 packet create --ibc_from $CHAIN_ID1 --to $CHAIN_ID2 --type coin --payload $PAYLOAD --ibc_sequence 0
ifExit
echo ""

16
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: 997e4cc3339141ee01aa2adf656425a49ebf117e6ca9e81ba72b8f94fee3e86e
updated: 2017-05-17T12:25:00.580569867+02:00
hash: 9d06ae13959cbb2835f5ae400a4b65e4bc329a567c949aec4aeab318c271da39
updated: 2017-05-21T21:31:06.796806933-04:00
imports:
- name: github.com/bgentry/speakeasy
version: 4aabc24848ce5fd31929f7d1e4ea74d3709c14cd
@ -101,7 +101,7 @@ imports:
- leveldb/table
- leveldb/util
- name: github.com/tendermint/abci
version: 5dabeffb35c027d7087a12149685daa68989168b
version: 864d1f80b36b440bde030a5c18d8ac3aa8c2949d
subpackages:
- client
- example/dummy
@ -113,7 +113,7 @@ imports:
- edwards25519
- extra25519
- name: github.com/tendermint/go-crypto
version: 438b16f1f84ef002d7408ecd6fc3a3974cbc9559
version: 7dff40942a64cdeefefa9446b2d104750b349f8a
subpackages:
- cmd
- keys
@ -122,7 +122,7 @@ imports:
- keys/server/types
- keys/storage/filestorage
- name: github.com/tendermint/go-wire
version: 97beaedf0f4dbc035309157c92be3b30cc6e5d74
version: 5f88da3dbc1a72844e6dfaf274ce87f851d488eb
subpackages:
- data
- data/base58
@ -139,13 +139,13 @@ imports:
- commands/txs
- proofs
- name: github.com/tendermint/merkleeyes
version: c722818b460381bc5b82e38c73ff6e22a9df624d
version: a0e73e1ac3e18e12a007520a4ea2c9822256e307
subpackages:
- app
- client
- iavl
- name: github.com/tendermint/tendermint
version: 11b5d11e9eec170e1d3dce165f0270d5c0759d69
version: 267f134d44e76efb2adef5f0c993da8a5d5bd1b8
subpackages:
- blockchain
- config
@ -170,7 +170,7 @@ imports:
- types
- version
- name: github.com/tendermint/tmlibs
version: 8af1c70a8be17543eb33e9bfbbcdd8371e3201cc
version: 306795ae1d8e4f4a10dcc8bdb32a00455843c9d5
subpackages:
- autofile
- cli

View File

@ -6,17 +6,14 @@ import:
- package: github.com/spf13/pflag
- package: github.com/spf13/viper
- package: github.com/tendermint/abci
version: develop
subpackages:
- server
- types
- package: github.com/tendermint/go-crypto
version: develop
subpackages:
- cmd
- keys
- package: github.com/tendermint/go-wire
version: develop
subpackages:
- data
- package: github.com/tendermint/light-client
@ -28,7 +25,6 @@ import:
- commands/txs
- proofs
- package: github.com/tendermint/merkleeyes
version: develop
subpackages:
- client
- iavl
@ -44,7 +40,6 @@ import:
- rpc/lib/types
- types
- package: github.com/tendermint/tmlibs
version: develop
subpackages:
- cli
- common

View File

@ -418,6 +418,9 @@ func (sm *IBCStateMachine) runPacketCreateTx(tx IBCPacketCreateTx) {
// Save new Packet
save(sm.store, packetKey, packet)
// set the sequence number
SetSequenceNumber(sm.store, packet.SrcChainID, packet.DstChainID, packet.Sequence)
}
func (sm *IBCStateMachine) runPacketPostTx(tx IBCPacketPostTx) {
@ -473,7 +476,7 @@ func (sm *IBCStateMachine) runPacketPostTx(tx IBCPacketPostTx) {
ok := proof.Verify(packetKeyEgress, packetBytes, header.AppHash)
if !ok {
sm.res.Code = IBCCodeInvalidProof
sm.res.Log = "Proof is invalid"
sm.res.Log = fmt.Sprintf("Proof is invalid. key: %s; packetByes %X; header %v; proof %v", packetKeyEgress, packetBytes, header, proof)
return
}