frontend: implement GetBlockRange and GetTransaction
This commit is contained in:
parent
5c2e5479a3
commit
313adc8432
|
@ -56,27 +56,87 @@ func (s *SqlStreamer) GetBlock(ctx context.Context, id *rpc.BlockID) (*rpc.Compa
|
|||
return nil, ErrUnspecified
|
||||
}
|
||||
|
||||
var blockBytes []byte
|
||||
var err error
|
||||
|
||||
// Precedence: a hash is more specific than a height. If we have it, use it first.
|
||||
if id.Hash != nil {
|
||||
leHashString := hex.EncodeToString(id.Hash)
|
||||
return storage.GetBlockByHash(ctx, s.db, leHashString)
|
||||
blockBytes, err = storage.GetBlockByHash(ctx, s.db, leHashString)
|
||||
} else {
|
||||
blockBytes, err = storage.GetBlock(ctx, s.db, int(id.Height))
|
||||
}
|
||||
|
||||
// we have a height and not a hash
|
||||
if int(id.Height) > 0 {
|
||||
return storage.GetBlock(ctx, s.db, int(id.Height))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, ErrUnspecified
|
||||
cBlock := &rpc.CompactBlock{}
|
||||
err = proto.Unmarshal(blockBytes, cBlock)
|
||||
return cBlock, err
|
||||
}
|
||||
|
||||
func (s *SqlStreamer) GetBlockRange(*rpc.BlockRange, rpc.CompactTxStreamer_GetBlockRangeServer) error {
|
||||
return ErrNoImpl
|
||||
func (s *SqlStreamer) GetBlockRange(span *rpc.BlockRange, resp rpc.CompactTxStreamer_GetBlockRangeServer) error {
|
||||
blocks := make(chan []byte)
|
||||
errors := make(chan error)
|
||||
done := make(chan bool)
|
||||
|
||||
timeout := resp.Context().WithTimeout(1 * time.Second)
|
||||
go GetBlockRange(timeout, s.db, blocks, errors, done, span.Start, span.End)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timeout.Done():
|
||||
return timeout.Err()
|
||||
case err := <-errors:
|
||||
return err
|
||||
case blockBytes := <-blocks:
|
||||
cBlock := &rpc.CompactBlock{}
|
||||
err = proto.Unmarshal(blockBytes, cBlock)
|
||||
if err != nil {
|
||||
return err // TODO really need better logging in this whole service
|
||||
}
|
||||
err = resp.Send(cBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SqlStreamer) GetTransaction(context.Context, *rpc.TxFilter) (*rpc.RawTransaction, error) {
|
||||
return nil, ErrNoImpl
|
||||
}
|
||||
func (s *SqlStreamer) SendTransaction(context.Context, *rpc.RawTransaction) (*rpc.SendResponse, error) {
|
||||
func (s *SqlStreamer) GetTransaction(ctx context.Context, txf *rpc.TxFilter) (*rpc.RawTransaction, error) {
|
||||
var txBytes []byte
|
||||
var err error
|
||||
|
||||
if txf.Hash != nil {
|
||||
leHashString := hex.EncodeToString(txf.Hash)
|
||||
txBytes, err = storage.GetFullTxByHash(ctx, s.db, leHashString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rpc.RawTransaction{Data: txBytes}, nil
|
||||
|
||||
}
|
||||
|
||||
if txf.Block.Hash != nil {
|
||||
leHashString := hex.EncodeToString(txf.Hash)
|
||||
txBytes, err = storage.GetFullTxByHashAndIndex(ctx, s.db, leHashString, int(txf.Index))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rpc.RawTransaction{Data: txBytes}, nil
|
||||
}
|
||||
|
||||
// A totally unset protobuf will attempt to fetch the genesis coinbase tx.
|
||||
txBytes, err = storage.GetFullTxByHeightAndIndex(ctx, s.db, int(txf.Block.Height), int(txf.Index))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rpc.RawTransaction{Data: txBytes}, nil
|
||||
}
|
||||
|
||||
func (s *SqlStreamer) SendTransaction(ctx context.Context, rawtx *rpc.RawTransaction) (*rpc.SendResponse, error) {
|
||||
return nil, ErrNoImpl
|
||||
}
|
||||
|
|
|
@ -83,35 +83,36 @@ func GetBlockByHash(ctx context.Context, db *sql.DB, hash string) ([]byte, error
|
|||
return blockBytes, err
|
||||
}
|
||||
|
||||
// [start, end]
|
||||
func GetBlockRange(conn *sql.DB, start, end int) ([][]byte, error) {
|
||||
// TODO sanity check range bounds
|
||||
query := "SELECT compact_encoding from blocks WHERE (height BETWEEN ? AND ?)"
|
||||
result, err := conn.Query(query, start, end)
|
||||
// [start, end] inclusive
|
||||
func GetBlockRange(ctx context.Context, db *sql.DB, blocks chan<- []byte, errors chan<- error, start, end int) {
|
||||
// TODO sanity check ranges, rate limit?
|
||||
numBlocks := (end - start) + 1
|
||||
query := "SELECT height, compact_encoding from blocks WHERE (height BETWEEN ? AND ?)"
|
||||
result, err := db.QueryContext(ctx, query, start, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
defer result.Close()
|
||||
|
||||
compactBlocks := make([][]byte, 0, (end-start)+1)
|
||||
// My assumption here is that if the context is cancelled then result.Next() will fail.
|
||||
|
||||
var blockBytes []byte // avoid a copy with *RawBytes
|
||||
for result.Next() {
|
||||
var blockBytes []byte // avoid a copy with *RawBytes
|
||||
err = result.Scan(&blockBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
errors <- err
|
||||
return
|
||||
}
|
||||
compactBlocks = append(compactBlocks, blockBytes)
|
||||
blocks <- blockBytes
|
||||
}
|
||||
|
||||
err = result.Err()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err := result.Err(); err != nil {
|
||||
errors <- err
|
||||
}
|
||||
|
||||
if len(compactBlocks) == 0 {
|
||||
return nil, ErrBadRange
|
||||
}
|
||||
return compactBlocks, nil
|
||||
// done
|
||||
errors <- nil
|
||||
}
|
||||
|
||||
func StoreBlock(conn *sql.DB, height int, hash string, sapling bool, encoded []byte) error {
|
||||
|
|
Loading…
Reference in New Issue