diff --git a/cmd/server/main.go b/cmd/server/main.go index edeb965..f49bf7f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -171,7 +171,7 @@ func main() { log.WithField("saplingHeight", saplingHeight).Info("Got sapling height ", saplingHeight, " chain ", chainName) // Compact transaction service initialization - service, err := frontend.NewSQLiteStreamer(opts.dbPath, rpcClient, log) + service, err := frontend.NewSQLiteStreamer(rpcClient, log) if err != nil { log.WithFields(logrus.Fields{ "db_path": opts.dbPath, diff --git a/common/common.go b/common/common.go index a59efb4..216f834 100644 --- a/common/common.go +++ b/common/common.go @@ -1,10 +1,13 @@ package common import ( + "encoding/hex" "encoding/json" "strconv" "strings" + "github.com/adityapk00/lightwalletd/parser" + "github.com/adityapk00/lightwalletd/walletrpc" "github.com/btcsuite/btcd/rpcclient" "github.com/pkg/errors" ) @@ -40,3 +43,62 @@ func GetSaplingInfo(rpcClient *rpcclient.Client) (int, string, error) { return int(saplingHeight), chainName, nil } + +func GetBlock(rpcClient *rpcclient.Client, height int) (*parser.Block, error) { + params := make([]json.RawMessage, 2) + params[0] = json.RawMessage("\"" + strconv.Itoa(height) + "\"") + params[1] = json.RawMessage("0") + result, rpcErr := rpcClient.RawRequest("getblock", params) + + var err error + var errCode int64 + + // For some reason, the error responses are not JSON + if rpcErr != nil { + errParts := strings.SplitN(rpcErr.Error(), ":", 2) + errCode, err = strconv.ParseInt(errParts[0], 10, 32) + //Check to see if we are requesting a height the zcashd doesn't have yet + if err == nil && errCode == -8 { + return nil, nil + } + return nil, errors.Wrap(rpcErr, "error requesting block") + } + + var blockDataHex string + err = json.Unmarshal(result, &blockDataHex) + if err != nil { + return nil, errors.Wrap(err, "error reading JSON response") + } + + blockData, err := hex.DecodeString(blockDataHex) + if err != nil { + return nil, errors.Wrap(err, "error decoding getblock output") + } + + block := parser.NewBlock() + rest, err := block.ParseFromSlice(blockData) + if err != nil { + return nil, errors.Wrap(err, "error parsing block") + } + if len(rest) != 0 { + return nil, errors.New("received overlong message") + } + return block, nil +} + +func GetBlockRange(rpcClient *rpcclient.Client, blockOut chan<- walletrpc.CompactBlock, + errOut chan<- error, start, end int) { + + // Go over [start, end] inclusive + for i := start; i <= end; i++ { + block, err := GetBlock(rpcClient, i) + if err != nil { + errOut <- err + return + } + + blockOut <- *block.ToCompact() + } + + errOut <- nil +} diff --git a/frontend/service.go b/frontend/service.go index d28bb3d..fba9bcd 100644 --- a/frontend/service.go +++ b/frontend/service.go @@ -2,24 +2,17 @@ package frontend import ( "context" - "database/sql" "encoding/hex" "encoding/json" "errors" - "fmt" "strconv" "strings" "time" "github.com/btcsuite/btcd/rpcclient" - "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" - // blank import for sqlite driver support - _ "github.com/mattn/go-sqlite3" - "github.com/adityapk00/lightwalletd/common" - "github.com/adityapk00/lightwalletd/storage" "github.com/adityapk00/lightwalletd/walletrpc" ) @@ -29,64 +22,45 @@ var ( // the service type type SqlStreamer struct { - db *sql.DB client *rpcclient.Client log *logrus.Entry } -func NewSQLiteStreamer(dbPath string, client *rpcclient.Client, log *logrus.Entry) (walletrpc.CompactTxStreamerServer, error) { - db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=10000&cache=shared", dbPath)) - db.SetMaxOpenConns(1) - if err != nil { - return nil, err - } - - // Creates our tables if they don't already exist. - err = storage.CreateTables(db) - if err != nil { - return nil, err - } - - return &SqlStreamer{db, client, log}, nil +func NewSQLiteStreamer(client *rpcclient.Client, log *logrus.Entry) (walletrpc.CompactTxStreamerServer, error) { + return &SqlStreamer{client, log}, nil } func (s *SqlStreamer) GracefulStop() error { - return s.db.Close() + return nil } func (s *SqlStreamer) GetLatestBlock(ctx context.Context, placeholder *walletrpc.ChainSpec) (*walletrpc.BlockID, error) { - // the ChainSpec type is an empty placeholder - height, err := storage.GetCurrentHeight(ctx, s.db) - if err != nil { - return nil, err - } - // TODO: also return block hashes here - return &walletrpc.BlockID{Height: uint64(height)}, nil -} + result, rpcErr := s.client.RawRequest("getinfo", make([]json.RawMessage, 0)) -func (s *SqlStreamer) GetBlock(ctx context.Context, id *walletrpc.BlockID) (*walletrpc.CompactBlock, error) { - if id.Height == 0 && id.Hash == nil { - return nil, ErrUnspecified - } - - var blockBytes []byte var err error + var errCode int64 - // Precedence: a hash is more specific than a height. If we have it, use it first. - if id.Hash != nil { - leHashString := hex.EncodeToString(id.Hash) - blockBytes, err = storage.GetBlockByHash(ctx, s.db, leHashString) - } else { - blockBytes, err = storage.GetBlock(ctx, s.db, int(id.Height)) + // For some reason, the error responses are not JSON + if rpcErr != nil { + errParts := strings.SplitN(rpcErr.Error(), ":", 2) + errCode, err = strconv.ParseInt(errParts[0], 10, 32) + //Check to see if we are requesting a height the zcashd doesn't have yet + if err == nil && errCode == -8 { + return nil, errors.New("Don't have the requested block") + } + return nil, err } + var f interface{} + err = json.Unmarshal(result, &f) if err != nil { return nil, err } - cBlock := &walletrpc.CompactBlock{} - err = proto.Unmarshal(blockBytes, cBlock) - return cBlock, err + latestBlock := f.(map[string]interface{})["blocks"].(float64) + + // TODO: also return block hashes here + return &walletrpc.BlockID{Height: uint64(latestBlock)}, nil } func (s *SqlStreamer) GetAddressTxids(addressBlockFilter *walletrpc.TransparentAddressBlockFilter, resp walletrpc.CompactTxStreamer_GetAddressTxidsServer) error { @@ -144,33 +118,41 @@ func (s *SqlStreamer) GetAddressTxids(addressBlockFilter *walletrpc.TransparentA return nil } +func (s *SqlStreamer) GetBlock(ctx context.Context, id *walletrpc.BlockID) (*walletrpc.CompactBlock, error) { + if id.Height == 0 && id.Hash == nil { + return nil, ErrUnspecified + } + + // Precedence: a hash is more specific than a height. If we have it, use it first. + if id.Hash != nil { + // TODO: Get block by hash + + return nil, errors.New("GetBlock by Hash is not yet implemented") + } else { + cBlock, err := common.GetBlock(s.client, int(id.Height)) + + if err != nil { + return nil, err + } + + return cBlock.ToCompact(), err + } + +} + func (s *SqlStreamer) GetBlockRange(span *walletrpc.BlockRange, resp walletrpc.CompactTxStreamer_GetBlockRangeServer) error { - blockChan := make(chan []byte) + blockChan := make(chan walletrpc.CompactBlock) errChan := make(chan error) - // TODO configure or stress-test this timeout - timeout, cancel := context.WithTimeout(resp.Context(), 30*time.Second) - defer cancel() - go storage.GetBlockRange(timeout, - s.db, - blockChan, - errChan, - int(span.Start.Height), - int(span.End.Height), - ) + go common.GetBlockRange(s.client, blockChan, errChan, int(span.Start.Height), int(span.End.Height)) for { select { case err := <-errChan: // this will also catch context.DeadlineExceeded from the timeout return err - case blockBytes := <-blockChan: - cBlock := &walletrpc.CompactBlock{} - err := proto.Unmarshal(blockBytes, cBlock) - if err != nil { - return err // TODO really need better logging in this whole service - } - err = resp.Send(cBlock) + case cBlock := <-blockChan: + err := resp.Send(&cBlock) if err != nil { return err }