Adds the RPC request ID to the context which is passed to called method.

Update request/response data structs to include extra fields

Refactor the SendTransactionAsync method to just call the
SendTransaction method in a goroutine, instead of reimplementing logic
for a new transaction.
This commit is contained in:
Peter Fox 2018-09-20 12:13:09 +01:00
parent ffcb3f4635
commit 93793cff0b
2 changed files with 31 additions and 64 deletions

View File

@ -28,8 +28,6 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"net/http" "net/http"
"sync"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -1069,8 +1067,11 @@ type SendTxArgs struct {
Data hexutil.Bytes `json:"data"` Data hexutil.Bytes `json:"data"`
Nonce *hexutil.Uint64 `json:"nonce"` Nonce *hexutil.Uint64 `json:"nonce"`
//Quorum
PrivateFrom string `json:"privateFrom"` PrivateFrom string `json:"privateFrom"`
PrivateFor []string `json:"privateFor"` PrivateFor []string `json:"privateFor"`
PrivateTxType string `json:"restriction"`
//End-Quorum
} }
// prepareSendTxArgs is a helper function that fills in default values for unspecified tx fields. // prepareSendTxArgs is a helper function that fills in default values for unspecified tx fields.
@ -1095,6 +1096,11 @@ func (args *SendTxArgs) setDefaults(ctx context.Context, b Backend) error {
} }
args.Nonce = (*hexutil.Uint64)(&nonce) args.Nonce = (*hexutil.Uint64)(&nonce)
} }
//Quorum
if args.PrivateTxType == "" {
args.PrivateTxType = "restricted"
}
//End-Quorum
return nil return nil
} }
@ -1437,6 +1443,7 @@ func (s *PublicNetAPI) Version() string {
return fmt.Sprintf("%d", s.networkVersion) return fmt.Sprintf("%d", s.networkVersion)
} }
// Quorum
// Please note: This is a temporary integration to improve performance in high-latency // Please note: This is a temporary integration to improve performance in high-latency
// environments when sending many private transactions. It will be removed at a later // environments when sending many private transactions. It will be removed at a later
// date when account management is handled outside Ethereum. // date when account management is handled outside Ethereum.
@ -1447,17 +1454,19 @@ type AsyncSendTxArgs struct {
} }
type AsyncResult struct { type AsyncResult struct {
TxHash common.Hash `json:"txHash"` TxHash common.Hash `json:"txHash,omitempty"`
Error string `json:"error"` Error string `json:"error,omitempty"`
Id string `json:"id,omitempty"`
} }
type Async struct { func (s *PublicTransactionPoolAPI) send(ctx context.Context, asyncArgs AsyncSendTxArgs) {
sync.Mutex
sem chan struct{}
}
func (a *Async) send(ctx context.Context, s *PublicTransactionPoolAPI, asyncArgs AsyncSendTxArgs) {
res := new(AsyncResult) res := new(AsyncResult)
//don't need to nil check this since id is required for every geth rpc call
//even though this is stated in the specification as an "optional" parameter
id := ctx.Value("id").(*json.RawMessage)
res.Id = string(*id)
if asyncArgs.CallbackUrl != "" { if asyncArgs.CallbackUrl != "" {
defer func() { defer func() {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
@ -1473,58 +1482,14 @@ func (a *Async) send(ctx context.Context, s *PublicTransactionPoolAPI, asyncArgs
} }
}() }()
} }
args := asyncArgs.SendTxArgs
err := args.setDefaults(ctx, s.b) var err error
if err != nil { res.TxHash, err = s.SendTransaction(ctx, asyncArgs.SendTxArgs)
log.Info("Async.send: Error doing setDefaults: %v", err)
res.Error = err.Error()
return
}
b, err := private.P.Send([]byte(args.Data), args.PrivateFrom, args.PrivateFor)
if err != nil {
log.Info("Error running Private.P.Send", "err", err)
res.Error = err.Error()
return
}
res.TxHash, err = a.save(ctx, s, args, b)
if err != nil { if err != nil {
res.Error = err.Error() res.Error = err.Error()
} }
} }
func (a *Async) save(ctx context.Context, s *PublicTransactionPoolAPI, args SendTxArgs, data []byte) (common.Hash, error) {
a.Lock()
defer a.Unlock()
if args.Nonce == nil {
nonce, err := s.b.GetPoolNonce(ctx, args.From)
if err != nil {
return common.Hash{}, err
}
args.Nonce = (*hexutil.Uint64)(&nonce)
}
var tx *types.Transaction
if args.To == nil {
tx = types.NewContractCreation((uint64)(*args.Nonce), (*big.Int)(args.Value), (*big.Int)(args.Gas), (*big.Int)(args.GasPrice), data)
} else {
tx = types.NewTransaction((uint64)(*args.Nonce), *args.To, (*big.Int)(args.Value), (*big.Int)(args.Gas), (*big.Int)(args.GasPrice), data)
}
signed, err := s.sign(args.From, tx)
if err != nil {
return common.Hash{}, err
}
return submitTransaction(ctx, s.b, signed, args.PrivateFor != nil)
}
func newAsync(n int) *Async {
a := &Async{
sem: make(chan struct{}, n),
}
return a
}
var async = newAsync(100)
// SendTransactionAsync creates a transaction for the given argument, signs it, and // SendTransactionAsync creates a transaction for the given argument, signs it, and
// submits it to the transaction pool. This call returns immediately to allow sending // submits it to the transaction pool. This call returns immediately to allow sending
@ -1537,12 +1502,9 @@ var async = newAsync(100)
// Please note: This is a temporary integration to improve performance in high-latency // Please note: This is a temporary integration to improve performance in high-latency
// environments when sending many private transactions. It will be removed at a later // environments when sending many private transactions. It will be removed at a later
// date when account management is handled outside Ethereum. // date when account management is handled outside Ethereum.
func (s *PublicTransactionPoolAPI) SendTransactionAsync(ctx context.Context, args AsyncSendTxArgs) { func (s *PublicTransactionPoolAPI) SendTransactionAsync(ctx context.Context, args AsyncSendTxArgs) (common.Hash, error){
async.sem <- struct{}{} go s.send(ctx, args)
go func() { return common.Hash{}, nil
async.send(ctx, s, args)
<-async.sem
}()
} }
// GetQuorumPayload returns the contents of a private transaction // GetQuorumPayload returns the contents of a private transaction
@ -1569,3 +1531,4 @@ func (s *PublicBlockChainAPI) GetQuorumPayload(digestHex string) (string, error)
} }
return fmt.Sprintf("0x%x", data), nil return fmt.Sprintf("0x%x", data), nil
} }
//End-Quorum

View File

@ -299,9 +299,13 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
return codec.CreateErrorResponse(&req.id, rpcErr), nil return codec.CreateErrorResponse(&req.id, rpcErr), nil
} }
//Quorum
//Pass the request ID to the method as part of the context, in case the method needs it later
contextWithId := context.WithValue(ctx, "id", req.id)
//End-Quorum
arguments := []reflect.Value{req.callb.rcvr} arguments := []reflect.Value{req.callb.rcvr}
if req.callb.hasCtx { if req.callb.hasCtx {
arguments = append(arguments, reflect.ValueOf(ctx)) arguments = append(arguments, reflect.ValueOf(contextWithId))
} }
if len(req.args) > 0 { if len(req.args) > 0 {
arguments = append(arguments, req.args...) arguments = append(arguments, req.args...)