From 313adc84320140723721e3d11da9060b32bacec4 Mon Sep 17 00:00:00 2001 From: George Tankersley Date: Wed, 12 Dec 2018 23:27:29 +0000 Subject: [PATCH] frontend: implement GetBlockRange and GetTransaction --- frontend/service.go | 82 +++++++++++++++++++++++++++++++++++++++------ storage/sqlite3.go | 35 +++++++++---------- 2 files changed, 89 insertions(+), 28 deletions(-) diff --git a/frontend/service.go b/frontend/service.go index 217b831..054a99a 100644 --- a/frontend/service.go +++ b/frontend/service.go @@ -56,27 +56,87 @@ func (s *SqlStreamer) GetBlock(ctx context.Context, id *rpc.BlockID) (*rpc.Compa return nil, ErrUnspecified } + var blockBytes []byte + var err error + // Precedence: a hash is more specific than a height. If we have it, use it first. if id.Hash != nil { leHashString := hex.EncodeToString(id.Hash) - return storage.GetBlockByHash(ctx, s.db, leHashString) + blockBytes, err = storage.GetBlockByHash(ctx, s.db, leHashString) + } else { + blockBytes, err = storage.GetBlock(ctx, s.db, int(id.Height)) } - // we have a height and not a hash - if int(id.Height) > 0 { - return storage.GetBlock(ctx, s.db, int(id.Height)) + if err != nil { + return nil, err } - return nil, ErrUnspecified + cBlock := &rpc.CompactBlock{} + err = proto.Unmarshal(blockBytes, cBlock) + return cBlock, err } -func (s *SqlStreamer) GetBlockRange(*rpc.BlockRange, rpc.CompactTxStreamer_GetBlockRangeServer) error { - return ErrNoImpl +func (s *SqlStreamer) GetBlockRange(span *rpc.BlockRange, resp rpc.CompactTxStreamer_GetBlockRangeServer) error { + blocks := make(chan []byte) + errors := make(chan error) + done := make(chan bool) + + timeout := resp.Context().WithTimeout(1 * time.Second) + go GetBlockRange(timeout, s.db, blocks, errors, done, span.Start, span.End) + + for { + select { + case <-timeout.Done(): + return timeout.Err() + case err := <-errors: + return err + case blockBytes := <-blocks: + cBlock := &rpc.CompactBlock{} + err = proto.Unmarshal(blockBytes, cBlock) + if err != nil { + return err // TODO really need better logging in this whole service + } + err = resp.Send(cBlock) + if err != nil { + return err + } + } + } + + return nil } -func (s *SqlStreamer) GetTransaction(context.Context, *rpc.TxFilter) (*rpc.RawTransaction, error) { - return nil, ErrNoImpl -} -func (s *SqlStreamer) SendTransaction(context.Context, *rpc.RawTransaction) (*rpc.SendResponse, error) { +func (s *SqlStreamer) GetTransaction(ctx context.Context, txf *rpc.TxFilter) (*rpc.RawTransaction, error) { + var txBytes []byte + var err error + + if txf.Hash != nil { + leHashString := hex.EncodeToString(txf.Hash) + txBytes, err = storage.GetFullTxByHash(ctx, s.db, leHashString) + if err != nil { + return nil, err + } + return &rpc.RawTransaction{Data: txBytes}, nil + + } + + if txf.Block.Hash != nil { + leHashString := hex.EncodeToString(txf.Hash) + txBytes, err = storage.GetFullTxByHashAndIndex(ctx, s.db, leHashString, int(txf.Index)) + if err != nil { + return nil, err + } + return &rpc.RawTransaction{Data: txBytes}, nil + } + + // A totally unset protobuf will attempt to fetch the genesis coinbase tx. + txBytes, err = storage.GetFullTxByHeightAndIndex(ctx, s.db, int(txf.Block.Height), int(txf.Index)) + if err != nil { + return nil, err + } + return &rpc.RawTransaction{Data: txBytes}, nil +} + +func (s *SqlStreamer) SendTransaction(ctx context.Context, rawtx *rpc.RawTransaction) (*rpc.SendResponse, error) { return nil, ErrNoImpl } diff --git a/storage/sqlite3.go b/storage/sqlite3.go index 581970a..5da2123 100644 --- a/storage/sqlite3.go +++ b/storage/sqlite3.go @@ -83,35 +83,36 @@ func GetBlockByHash(ctx context.Context, db *sql.DB, hash string) ([]byte, error return blockBytes, err } -// [start, end] -func GetBlockRange(conn *sql.DB, start, end int) ([][]byte, error) { - // TODO sanity check range bounds - query := "SELECT compact_encoding from blocks WHERE (height BETWEEN ? AND ?)" - result, err := conn.Query(query, start, end) +// [start, end] inclusive +func GetBlockRange(ctx context.Context, db *sql.DB, blocks chan<- []byte, errors chan<- error, start, end int) { + // TODO sanity check ranges, rate limit? + numBlocks := (end - start) + 1 + query := "SELECT height, compact_encoding from blocks WHERE (height BETWEEN ? AND ?)" + result, err := db.QueryContext(ctx, query, start, end) if err != nil { - return nil, err + errors <- err + return } defer result.Close() - compactBlocks := make([][]byte, 0, (end-start)+1) + // My assumption here is that if the context is cancelled then result.Next() will fail. + + var blockBytes []byte // avoid a copy with *RawBytes for result.Next() { - var blockBytes []byte // avoid a copy with *RawBytes err = result.Scan(&blockBytes) if err != nil { - return nil, err + errors <- err + return } - compactBlocks = append(compactBlocks, blockBytes) + blocks <- blockBytes } - err = result.Err() - if err != nil { - return nil, err + if err := result.Err(); err != nil { + errors <- err } - if len(compactBlocks) == 0 { - return nil, ErrBadRange - } - return compactBlocks, nil + // done + errors <- nil } func StoreBlock(conn *sql.DB, height int, hash string, sapling bool, encoded []byte) error {