tendermint/rpc/core/mempool.go

264 lines
6.5 KiB
Go
Raw Normal View History

package core
import (
"fmt"
2016-06-27 17:43:09 -07:00
"time"
abci "github.com/tendermint/abci/types"
2017-04-21 15:13:25 -07:00
data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
2015-04-01 17:30:16 -07:00
"github.com/tendermint/tendermint/types"
)
//-----------------------------------------------------------------------------
2016-06-27 17:43:09 -07:00
// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
2016-06-27 17:43:09 -07:00
// Returns right away, with no response
//
// ```shell
// curl 'localhost:46657/broadcast_tx_async?tx="123"'
// ```
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket")
// result, err := client.BroadcastTxAsync("123")
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "error": "",
// "result": {
// "hash": "E39AAB7A537ABAA237831742DCE1117F187C3C52",
// "log": "",
// "data": "",
// "code": 0
// },
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
//
// ### Query Parameters
//
// | Parameter | Type | Default | Required | Description |
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
2016-02-08 00:48:58 -08:00
func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
2016-10-14 18:36:42 -07:00
err := mempool.CheckTx(tx, nil)
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
2017-04-21 08:39:02 -07:00
return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
}
// Returns with the response from CheckTx.
//
// ```shell
// curl 'localhost:46657/broadcast_tx_sync?tx="456"'
// ```
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket")
// result, err := client.BroadcastTxSync("456")
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "jsonrpc": "2.0",
// "id": "",
// "result": {
// "code": 0,
// "data": "",
// "log": "",
// "hash": "0D33F2F03A5234F38706E43004489E061AC40A2E"
// },
// "error": ""
// }
// ```
//
// ### Query Parameters
//
// | Parameter | Type | Default | Required | Description |
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
2016-02-08 00:48:58 -08:00
func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
2017-01-12 12:53:32 -08:00
resCh := make(chan *abci.Response, 1)
err := mempool.CheckTx(tx, func(res *abci.Response) {
2016-02-08 00:48:58 -08:00
resCh <- res
})
if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
res := <-resCh
2016-05-14 09:33:27 -07:00
r := res.GetCheckTx()
2016-02-08 00:48:58 -08:00
return &ctypes.ResultBroadcastTx{
2016-05-14 09:33:27 -07:00
Code: r.Code,
Data: r.Data,
Log: r.Log,
2017-04-21 08:39:02 -07:00
Hash: tx.Hash(),
2016-02-08 00:48:58 -08:00
}, nil
}
// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app)
2016-11-30 14:28:41 -08:00
// or if we timeout waiting for tx to commit.
2017-01-12 12:55:03 -08:00
// If CheckTx or DeliverTx fail, no error will be returned, but the returned result
2017-01-12 12:53:32 -08:00
// will contain a non-OK ABCI code.
//
// ```shell
// curl 'localhost:46657/broadcast_tx_commit?tx="789"'
// ```
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket")
// result, err := client.BroadcastTxCommit("789")
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "error": "",
// "result": {
// "height": 26682,
// "hash": "75CA0F856A4DA078FC4911580360E70CEFB2EBEE",
// "deliver_tx": {
// "log": "",
// "data": "",
// "code": 0
// },
// "check_tx": {
// "log": "",
// "data": "",
// "code": 0
// }
// },
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
//
// ### Query Parameters
//
// | Parameter | Type | Default | Required | Description |
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
2016-06-27 17:43:09 -07:00
// subscribe to tx being committed in block
2017-01-12 12:55:03 -08:00
deliverTxResCh := make(chan types.EventDataTx, 1)
2016-10-09 23:58:13 -07:00
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
2017-04-28 14:57:06 -07:00
deliverTxResCh <- data.Unwrap().(types.EventDataTx)
2016-06-27 17:43:09 -07:00
})
// broadcast the tx and register checktx callback
2017-01-12 12:53:32 -08:00
checkTxResCh := make(chan *abci.Response, 1)
err := mempool.CheckTx(tx, func(res *abci.Response) {
2016-06-27 17:43:09 -07:00
checkTxResCh <- res
})
if err != nil {
2017-05-02 00:53:32 -07:00
logger.Error("err", "err", err)
2016-06-27 17:43:09 -07:00
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
}
checkTxRes := <-checkTxResCh
checkTxR := checkTxRes.GetCheckTx()
2017-01-12 12:53:32 -08:00
if checkTxR.Code != abci.CodeType_OK {
2016-06-27 17:43:09 -07:00
// CheckTx failed!
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(),
DeliverTx: abci.Result{},
2017-04-21 08:39:02 -07:00
Hash: tx.Hash(),
}, nil
2016-06-27 17:43:09 -07:00
}
// Wait for the tx to be included in a block,
// timeout after something reasonable.
2017-05-20 21:43:00 -07:00
// TODO: configurable?
2016-11-30 14:28:41 -08:00
timer := time.NewTimer(60 * 2 * time.Second)
2016-06-27 17:43:09 -07:00
select {
2017-01-12 12:55:03 -08:00
case deliverTxRes := <-deliverTxResCh:
2016-06-27 17:43:09 -07:00
// The tx was included in a block.
2017-01-12 12:55:03 -08:00
deliverTxR := &abci.ResponseDeliverTx{
Code: deliverTxRes.Code,
Data: deliverTxRes.Data,
Log: deliverTxRes.Log,
2016-11-30 14:28:41 -08:00
}
2017-05-02 00:53:32 -07:00
logger.Info("DeliverTx passed ", "tx", data.Bytes(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(),
DeliverTx: deliverTxR.Result(),
2017-04-21 08:39:02 -07:00
Hash: tx.Hash(),
Height: deliverTxRes.Height,
2016-06-27 17:43:09 -07:00
}, nil
case <-timer.C:
2017-05-02 00:53:32 -07:00
logger.Error("failed to include tx")
return &ctypes.ResultBroadcastTxCommit{
CheckTx: checkTxR.Result(),
DeliverTx: abci.Result{},
2017-04-21 08:39:02 -07:00
Hash: tx.Hash(),
2016-06-27 17:43:09 -07:00
}, fmt.Errorf("Timed out waiting for transaction to be included in a block")
}
panic("Should never happen!")
}
// Get unconfirmed transactions including their number.
//
// ```shell
// curl 'localhost:46657/unconfirmed_txs'
// ```
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket")
// result, err := client.UnconfirmedTxs()
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "error": "",
// "result": {
// "txs": [],
// "n_txs": 0
// },
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
2016-02-08 00:48:58 -08:00
func UnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
2016-10-14 18:36:42 -07:00
txs := mempool.Reap(-1)
2016-02-14 17:00:33 -08:00
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil
2015-04-25 13:26:36 -07:00
}
// Get number of unconfirmed transactions.
//
// ```shell
// curl 'localhost:46657/num_unconfirmed_txs'
// ```
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:46657", "/websocket")
// result, err := client.UnconfirmedTxs()
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "error": "",
// "result": {
// "txs": null,
// "n_txs": 0
// },
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
2016-10-14 18:36:42 -07:00
return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil
}