diff --git a/server/main.go b/cmd/server/main.go similarity index 62% rename from server/main.go rename to cmd/server/main.go index dd36bf9..09117fb 100644 --- a/server/main.go +++ b/cmd/server/main.go @@ -1,18 +1,14 @@ package main import ( - "context" - "database/sql" - "errors" "flag" "net" "os" "os/signal" "syscall" + "github.com/gtank/ctxd/frontend" "github.com/gtank/ctxd/rpc" - "github.com/gtank/ctxd/storage" - _ "github.com/mattn/go-sqlite3" "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -22,13 +18,9 @@ import ( var log *logrus.Entry var logger = logrus.New() -var ( - ErrNoImpl = errors.New("not yet implemented") -) - func init() { logger.SetFormatter(&logrus.TextFormatter{ - //DisableColors: true, + //DisableColors: true, FullTimestamp: true, DisableLevelTruncation: true, }) @@ -36,7 +28,6 @@ func init() { log = logger.WithFields(logrus.Fields{ "app": "frontend-grpc", }) - } type Options struct { @@ -76,12 +67,12 @@ func main() { log.WithFields(logrus.Fields{ "cert_file": opts.tlsCertPath, "key_path": opts.tlsKeyPath, - "error": err.Error(), + "error": err, }).Fatal("couldn't load TLS credentials") } - server = grpc.NewServer(grpc.Creds(transportCreds)) + server = grpc.NewServer(grpc.Creds(transportCreds), frontend.LoggingInterceptor()) } else { - server = grpc.NewServer() + server = grpc.NewServer(frontend.LoggingInterceptor()) } // Enable reflection for debugging @@ -90,11 +81,11 @@ func main() { } // Compact transaction service initialization - service, err := NewSQLiteStreamer(opts.dbPath) + service, err := frontend.NewSQLiteStreamer(opts.dbPath) if err != nil { log.WithFields(logrus.Fields{ "db_path": opts.dbPath, - "error": err.Error(), + "error": err, }).Fatal("couldn't create SQL streamer") } @@ -106,7 +97,7 @@ func main() { if err != nil { log.WithFields(logrus.Fields{ "bind_addr": opts.bindAddr, - "error": err.Error(), + "error": err, }).Fatal("couldn't create listener") } @@ -126,54 +117,7 @@ func main() { err = server.Serve(listener) if err != nil { log.WithFields(logrus.Fields{ - "error": err.Error(), + "error": err, }).Fatal("gRPC server exited") } } - -// the service type -type sqlStreamer struct { - db *sql.DB -} - -func NewSQLiteStreamer(dbPath string) (rpc.CompactTxStreamerServer, error) { - db, err := sql.Open("sqlite3", dbPath) - if err != nil { - return nil, err - } - - // Creates our tables if they don't already exist. - err = storage.CreateTables(db) - if err != nil { - return nil, err - } - - return &sqlStreamer{db}, nil -} - -func (s *sqlStreamer) GetLatestBlock(ctx context.Context, placeholder *rpc.ChainSpec) (*rpc.BlockID, error) { - // the ChainSpec type is an empty placeholder - height, err := storage.GetCurrentHeight(ctx, s.db) - if err != nil { - log.WithFields(logrus.Fields{ - "error": err.Error(), - "context": ctx, - }).Error("GetLatestBlock call failed") - return nil, err - } - // TODO: also return block hashes here - return &rpc.BlockID{Height: uint64(height)}, nil -} - -func (s *sqlStreamer) GetBlock(context.Context, *rpc.BlockID) (*rpc.CompactBlock, error) { - return nil, ErrNoImpl -} -func (s *sqlStreamer) GetBlockRange(*rpc.BlockRange, rpc.CompactTxStreamer_GetBlockRangeServer) error { - return ErrNoImpl -} -func (s *sqlStreamer) GetTransaction(context.Context, *rpc.TxFilter) (*rpc.RawTransaction, error) { - return nil, ErrNoImpl -} -func (s *sqlStreamer) SendTransaction(context.Context, *rpc.RawTransaction) (*rpc.SendResponse, error) { - return nil, ErrNoImpl -} diff --git a/frontend/log.go b/frontend/log.go new file mode 100644 index 0000000..69b7aef --- /dev/null +++ b/frontend/log.go @@ -0,0 +1,63 @@ +package frontend + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/peer" +) + +var log *logrus.Entry +var logger = logrus.New() + +func init() { + logger.SetFormatter(&logrus.TextFormatter{ + //DisableColors: true, + FullTimestamp: true, + DisableLevelTruncation: true, + }) + + log = logger.WithFields(logrus.Fields{ + "app": "frontend-grpc", + }) +} + +func LoggingInterceptor() grpc.ServerOption { + return grpc.UnaryInterceptor(logInterceptor) +} + +func logInterceptor( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (interface{}, error) { + reqLog := loggerFromContext(ctx) + start := time.Now() + + resp, err := handler(ctx, req) + + entry := reqLog.WithFields(logrus.Fields{ + "method": info.FullMethod, + "duration": time.Since(start), + "error": err, + }) + + if err != nil { + entry.Error("call failed") + } else { + entry.Info("method called") + } + + return resp, err +} + +func loggerFromContext(ctx context.Context) *logrus.Entry { + // TODO: anonymize the addresses. cryptopan? + if peerInfo, ok := peer.FromContext(ctx); ok { + return log.WithFields(logrus.Fields{"peer_addr": peerInfo.Addr}) + } + return log.WithFields(logrus.Fields{"peer_addr": "unknown"}) +} diff --git a/frontend/service.go b/frontend/service.go new file mode 100644 index 0000000..c708277 --- /dev/null +++ b/frontend/service.go @@ -0,0 +1,78 @@ +package frontend + +import ( + "context" + "database/sql" + "encoding/hex" + "errors" + + _ "github.com/mattn/go-sqlite3" + + "github.com/gtank/ctxd/rpc" + "github.com/gtank/ctxd/storage" +) + +var ( + ErrNoImpl = errors.New("not yet implemented") + ErrUnspecified = errors.New("request for unspecified identifier") +) + +// the service type +type sqlStreamer struct { + db *sql.DB +} + +func NewSQLiteStreamer(dbPath string) (rpc.CompactTxStreamerServer, error) { + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return nil, err + } + + // Creates our tables if they don't already exist. + err = storage.CreateTables(db) + if err != nil { + return nil, err + } + + return &sqlStreamer{db}, nil +} + +func (s *sqlStreamer) GetLatestBlock(ctx context.Context, placeholder *rpc.ChainSpec) (*rpc.BlockID, error) { + // the ChainSpec type is an empty placeholder + height, err := storage.GetCurrentHeight(ctx, s.db) + if err != nil { + return nil, err + } + // TODO: also return block hashes here + return &rpc.BlockID{Height: uint64(height)}, nil +} + +func (s *sqlStreamer) GetBlock(ctx context.Context, id *rpc.BlockID) (*rpc.CompactBlock, error) { + if id.Height == 0 && id.Hash == nil { + return nil, ErrUnspecified + } + + // Precedence: a hash is more specific than a height. If we have it, use it first. + if id.Hash != nil { + leHashString := hex.EncodeToString(id.Hash) + return storage.GetBlockByHash(ctx, s.db, leHashString) + } + + // we have a height and not a hash + if int(id.Height) > 0 { + return storage.GetBlock(ctx, s.db, int(id.Height)) + } + + return nil, ErrUnspecified +} + +func (s *sqlStreamer) GetBlockRange(*rpc.BlockRange, rpc.CompactTxStreamer_GetBlockRangeServer) error { + return ErrNoImpl +} + +func (s *sqlStreamer) GetTransaction(context.Context, *rpc.TxFilter) (*rpc.RawTransaction, error) { + return nil, ErrNoImpl +} +func (s *sqlStreamer) SendTransaction(context.Context, *rpc.RawTransaction) (*rpc.SendResponse, error) { + return nil, ErrNoImpl +} diff --git a/storage/sqlite3.go b/storage/sqlite3.go index b6e1e4b..53ef55a 100644 --- a/storage/sqlite3.go +++ b/storage/sqlite3.go @@ -40,10 +40,10 @@ func CreateTables(conn *sql.DB) error { return err } -func GetCurrentHeight(ctx context.Context, conn *sql.DB) (int, error) { +func GetCurrentHeight(ctx context.Context, db *sql.DB) (int, error) { var height int query := "SELECT current_height FROM state WHERE rowid = 1" - err := conn.QueryRowContext(ctx, query).Scan(&height) + err := db.QueryRowContext(ctx, query).Scan(&height) return height, err } @@ -75,10 +75,10 @@ func SetCurrentHeight(conn *sql.DB, height int) error { return nil } -func GetBlock(ctx context.Context, conn *sql.DB, height int) (*rpc.CompactBlock, error) { +func GetBlock(ctx context.Context, db *sql.DB, height int) (*rpc.CompactBlock, error) { var blockBytes []byte // avoid a copy with *RawBytes query := "SELECT compact_encoding from blocks WHERE height = ?" - err := conn.QueryRow(query, height).Scan(&blockBytes) + err := db.QueryRowContext(ctx, query, height).Scan(&blockBytes) if err != nil { return nil, err } @@ -87,6 +87,18 @@ func GetBlock(ctx context.Context, conn *sql.DB, height int) (*rpc.CompactBlock, return compactBlock, err } +func GetBlockByHash(ctx context.Context, db *sql.DB, hash string) (*rpc.CompactBlock, error) { + var blockBytes []byte // avoid a copy with *RawBytes + query := "SELECT compact_encoding from blocks WHERE hash = ?" + err := db.QueryRowContext(ctx, query, hash).Scan(&blockBytes) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("getting block with hash %s", hash)) + } + compactBlock := &rpc.CompactBlock{} + err = proto.Unmarshal(blockBytes, compactBlock) + return compactBlock, err +} + // [start, end] func GetBlockRange(conn *sql.DB, start, end int) ([]*rpc.CompactBlock, error) { // TODO sanity check range bounds @@ -123,18 +135,6 @@ func GetBlockRange(conn *sql.DB, start, end int) ([]*rpc.CompactBlock, error) { return compactBlocks, nil } -func GetBlockByHash(conn *sql.DB, hash string) (*rpc.CompactBlock, error) { - var blockBytes []byte // avoid a copy with *RawBytes - query := "SELECT compact_encoding from blocks WHERE hash = ?" - err := conn.QueryRow(query, hash).Scan(&blockBytes) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("getting block with hash %s", hash)) - } - compactBlock := &rpc.CompactBlock{} - err = proto.Unmarshal(blockBytes, compactBlock) - return compactBlock, err -} - func StoreBlock(conn *sql.DB, height int, hash string, sapling bool, version int, encoded []byte) error { insertBlock := "INSERT INTO blocks (height, hash, has_sapling_tx, encoding_version, compact_encoding) values (?, ?, ?, ?, ?)" _, err := conn.Exec(insertBlock, height, hash, sapling, version, encoded)